/*
 * Decompiled with CFR 0.152.
 */
package com.katanox.tabour.integration.sqs.core.consumer;

import com.amazonaws.services.sqs.model.Message;
import com.katanox.tabour.config.EventPollerProperties;
import com.katanox.tabour.exception.ExceptionHandler;
import com.katanox.tabour.integration.sqs.config.SqsConfiguration;
import com.katanox.tabour.integration.sqs.core.consumer.SqsEventFetcher;
import com.katanox.tabour.integration.sqs.core.consumer.SqsEventHandler;
import com.katanox.tabour.integration.sqs.core.consumer.SqsEventPoller$WhenMappings;
import com.katanox.tabour.integration.sqs.core.consumer.SqsEventPollerKt;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import kotlin.Metadata;
import kotlin.jvm.internal.Intrinsics;
import kotlin.text.Charsets;
import org.jetbrains.annotations.NotNull;

@Metadata(mv={1, 4, 2}, bv={1, 0, 3}, k=1, d1={"\u0000J\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0000\n\u0002\u0010\u000e\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0006\u0018\u00002\u00020\u0001BE\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t\u0012\u0006\u0010\n\u001a\u00020\u000b\u0012\u0006\u0010\f\u001a\u00020\r\u0012\u0006\u0010\u000e\u001a\u00020\u000f\u0012\u0006\u0010\u0010\u001a\u00020\u0011\u00a2\u0006\u0002\u0010\u0012J\u0010\u0010\u0013\u001a\u00020\u00142\u0006\u0010\u0015\u001a\u00020\u0016H\u0002J\u0010\u0010\u0017\u001a\u00020\u00142\u0006\u0010\u0018\u001a\u00020\u0016H\u0002J\b\u0010\u0019\u001a\u00020\u0014H\u0002J\u0006\u0010\u001a\u001a\u00020\u0014J\u0006\u0010\u001b\u001a\u00020\u0014R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0010\u001a\u00020\u0011X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\n\u001a\u00020\u000bX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\f\u001a\u00020\rX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u000e\u001a\u00020\u000fX\u0082\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006\u001c"}, d2={"Lcom/katanox/tabour/integration/sqs/core/consumer/SqsEventPoller;", "", "name", "", "eventHandler", "Lcom/katanox/tabour/integration/sqs/core/consumer/SqsEventHandler;", "eventFetcher", "Lcom/katanox/tabour/integration/sqs/core/consumer/SqsEventFetcher;", "pollerThreadPool", "Ljava/util/concurrent/ScheduledThreadPoolExecutor;", "handlerThreadPool", "Ljava/util/concurrent/ThreadPoolExecutor;", "pollingProperties", "Lcom/katanox/tabour/config/EventPollerProperties;", "sqsConfiguration", "Lcom/katanox/tabour/integration/sqs/config/SqsConfiguration;", "exceptionHandler", "Lcom/katanox/tabour/exception/ExceptionHandler;", "(Ljava/lang/String;Lcom/katanox/tabour/integration/sqs/core/consumer/SqsEventHandler;Lcom/katanox/tabour/integration/sqs/core/consumer/SqsEventFetcher;Ljava/util/concurrent/ScheduledThreadPoolExecutor;Ljava/util/concurrent/ThreadPoolExecutor;Lcom/katanox/tabour/config/EventPollerProperties;Lcom/katanox/tabour/integration/sqs/config/SqsConfiguration;Lcom/katanox/tabour/exception/ExceptionHandler;)V", "acknowledgeMessage", "", "message", "Lcom/amazonaws/services/sqs/model/Message;", "handleMessage", "sqsMessage", "pollMessages", "start", "stop", "tabour"})
public final class SqsEventPoller {
    private final String name;
    private final SqsEventHandler eventHandler;
    private final SqsEventFetcher eventFetcher;
    private final ScheduledThreadPoolExecutor pollerThreadPool;
    private final ThreadPoolExecutor handlerThreadPool;
    private final EventPollerProperties pollingProperties;
    private final SqsConfiguration sqsConfiguration;
    private final ExceptionHandler exceptionHandler;

    /*
     * WARNING - void declaration
     */
    public final void start() {
        SqsEventPollerKt.access$getLogger$p().info("starting SqsMessagePoller");
        int n = 0;
        int n2 = this.pollerThreadPool.getCorePoolSize();
        while (n < n2) {
            void i;
            SqsEventPollerKt.access$getLogger$p().info("starting SqsMessagePoller ({}) - thread {}", (Object)this.name, (Object)((int)i));
            this.pollerThreadPool.scheduleWithFixedDelay(new Runnable(this){
                final /* synthetic */ SqsEventPoller this$0;

                public final void run() {
                    SqsEventPoller.access$pollMessages(this.this$0);
                }
                {
                    this.this$0 = sqsEventPoller;
                }
            }, this.pollingProperties.getPollDelay().getSeconds(), this.pollingProperties.getPollDelay().getSeconds(), TimeUnit.SECONDS);
            ++i;
        }
    }

    public final void stop() {
        SqsEventPollerKt.access$getLogger$p().info("stopping SqsMessagePoller");
        this.pollerThreadPool.shutdownNow();
        this.handlerThreadPool.shutdownNow();
    }

    private final void pollMessages() {
        try {
            List<Message> messages = this.eventFetcher.fetchMessages();
            for (Message sqsMessage : messages) {
                this.handleMessage(sqsMessage);
            }
        }
        catch (Exception e) {
            SqsEventPollerKt.access$getLogger$p().error("error fetching messages from queue {}:", (Object)this.eventHandler.getSqsQueueUrl(), (Object)e);
        }
    }

    private final void handleMessage(Message sqsMessage) {
        SqsEventPollerKt.access$getLogger$p().info("Received message ID {}", (Object)sqsMessage.getMessageId());
        String message = sqsMessage.getBody();
        this.handlerThreadPool.submit(new Runnable(this, message, sqsMessage){
            final /* synthetic */ SqsEventPoller this$0;
            final /* synthetic */ String $message;
            final /* synthetic */ Message $sqsMessage;

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             * Enabled aggressive block sorting
             * Enabled unnecessary exception pruning
             * Enabled aggressive exception aggregation
             */
            public final void run() {
                try {
                    SqsEventHandler sqsEventHandler = SqsEventPoller.access$getEventHandler$p(this.this$0);
                    String string = this.$message;
                    Intrinsics.checkNotNullExpressionValue((Object)string, (String)"message");
                    String string2 = string;
                    Charset charset = Charsets.UTF_8;
                    boolean bl = false;
                    String string3 = string2;
                    if (string3 == null) {
                        throw new NullPointerException("null cannot be cast to non-null type java.lang.String");
                    }
                    byte[] byArray = string3.getBytes(charset);
                    Intrinsics.checkNotNullExpressionValue((Object)byArray, (String)"(this as java.lang.String).getBytes(charset)");
                    sqsEventHandler.onBeforeHandle(byArray);
                    SqsEventHandler sqsEventHandler2 = SqsEventPoller.access$getEventHandler$p(this.this$0);
                    String string4 = this.$message;
                    Intrinsics.checkNotNullExpressionValue((Object)string4, (String)"message");
                    string2 = string4;
                    charset = Charsets.UTF_8;
                    bl = false;
                    String string5 = string2;
                    if (string5 == null) {
                        throw new NullPointerException("null cannot be cast to non-null type java.lang.String");
                    }
                    byte[] byArray2 = string5.getBytes(charset);
                    Intrinsics.checkNotNullExpressionValue((Object)byArray2, (String)"(this as java.lang.String).getBytes(charset)");
                    sqsEventHandler2.handle(byArray2);
                    SqsEventPoller.access$acknowledgeMessage(this.this$0, this.$sqsMessage);
                    SqsEventPollerKt.access$getLogger$p().debug("message {} processed successfully - message has been deleted from SQS", (Object)this.$sqsMessage.getMessageId());
                    return;
                }
                catch (Exception e) {
                    switch (SqsEventPoller$WhenMappings.$EnumSwitchMapping$0[SqsEventPoller.access$getExceptionHandler$p(this.this$0).handleException(this.$sqsMessage, e).ordinal()]) {
                        case 1: {
                            return;
                        }
                        case 2: {
                            SqsEventPoller.access$acknowledgeMessage(this.this$0, this.$sqsMessage);
                            return;
                        }
                    }
                    return;
                }
                finally {
                    SqsEventHandler sqsEventHandler = SqsEventPoller.access$getEventHandler$p(this.this$0);
                    String string = this.$message;
                    Intrinsics.checkNotNullExpressionValue((Object)string, (String)"message");
                    String string6 = string;
                    Charset charset = Charsets.UTF_8;
                    boolean bl = false;
                    String string7 = string6;
                    if (string7 == null) {
                        throw new NullPointerException("null cannot be cast to non-null type java.lang.String");
                    }
                    byte[] byArray = string7.getBytes(charset);
                    Intrinsics.checkNotNullExpressionValue((Object)byArray, (String)"(this as java.lang.String).getBytes(charset)");
                    sqsEventHandler.onAfterHandle(byArray);
                }
            }
            {
                this.this$0 = sqsEventPoller;
                this.$message = string;
                this.$sqsMessage = message;
            }
        });
    }

    private final void acknowledgeMessage(Message message) {
        this.sqsConfiguration.amazonSQSAsync().deleteMessage(this.eventHandler.getSqsQueueUrl(), message.getReceiptHandle());
    }

    public SqsEventPoller(@NotNull String name, @NotNull SqsEventHandler eventHandler, @NotNull SqsEventFetcher eventFetcher, @NotNull ScheduledThreadPoolExecutor pollerThreadPool, @NotNull ThreadPoolExecutor handlerThreadPool, @NotNull EventPollerProperties pollingProperties, @NotNull SqsConfiguration sqsConfiguration, @NotNull ExceptionHandler exceptionHandler) {
        Intrinsics.checkNotNullParameter((Object)name, (String)"name");
        Intrinsics.checkNotNullParameter((Object)eventHandler, (String)"eventHandler");
        Intrinsics.checkNotNullParameter((Object)eventFetcher, (String)"eventFetcher");
        Intrinsics.checkNotNullParameter((Object)pollerThreadPool, (String)"pollerThreadPool");
        Intrinsics.checkNotNullParameter((Object)handlerThreadPool, (String)"handlerThreadPool");
        Intrinsics.checkNotNullParameter((Object)pollingProperties, (String)"pollingProperties");
        Intrinsics.checkNotNullParameter((Object)sqsConfiguration, (String)"sqsConfiguration");
        Intrinsics.checkNotNullParameter((Object)exceptionHandler, (String)"exceptionHandler");
        this.name = name;
        this.eventHandler = eventHandler;
        this.eventFetcher = eventFetcher;
        this.pollerThreadPool = pollerThreadPool;
        this.handlerThreadPool = handlerThreadPool;
        this.pollingProperties = pollingProperties;
        this.sqsConfiguration = sqsConfiguration;
        this.exceptionHandler = exceptionHandler;
    }

    public static final /* synthetic */ void access$pollMessages(SqsEventPoller $this) {
        $this.pollMessages();
    }

    public static final /* synthetic */ SqsEventHandler access$getEventHandler$p(SqsEventPoller $this) {
        return $this.eventHandler;
    }

    public static final /* synthetic */ void access$acknowledgeMessage(SqsEventPoller $this, Message message) {
        $this.acknowledgeMessage(message);
    }

    public static final /* synthetic */ ExceptionHandler access$getExceptionHandler$p(SqsEventPoller $this) {
        return $this.exceptionHandler;
    }
}

