/*
 * Decompiled with CFR 0.152.
 */
package io.awspring.cloud.sqs.listener.source;

import io.awspring.cloud.sqs.ConfigUtils;
import io.awspring.cloud.sqs.listener.BackPressureHandler;
import io.awspring.cloud.sqs.listener.BatchAwareBackPressureHandler;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.IdentifiableContainerComponent;
import io.awspring.cloud.sqs.listener.MessageProcessingContext;
import io.awspring.cloud.sqs.listener.TaskExecutorAware;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementCallback;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback;
import io.awspring.cloud.sqs.listener.acknowledgement.ExecutingAcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.sink.MessageSink;
import io.awspring.cloud.sqs.listener.source.AbstractMessageConvertingMessageSource;
import io.awspring.cloud.sqs.listener.source.PollingMessageSource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public abstract class AbstractPollingMessageSource<T, S>
extends AbstractMessageConvertingMessageSource<T, S>
implements PollingMessageSource<T>,
IdentifiableContainerComponent {
    private static final Logger logger = LoggerFactory.getLogger(AbstractPollingMessageSource.class);
    private String pollingEndpointName;
    private Duration shutdownTimeout;
    private TaskExecutor taskExecutor;
    private BatchAwareBackPressureHandler backPressureHandler;
    private AcknowledgementProcessor<T> acknowledgmentProcessor;
    private MessageSink<T> messageSink;
    private volatile boolean running;
    private final Object lifecycleMonitor = new Object();
    private final Collection<CompletableFuture<?>> pollingFutures = Collections.synchronizedCollection(new ArrayList());
    private String id;
    private AsyncAcknowledgementResultCallback<T> acknowledgementResultCallback;

    @Override
    protected void configureMessageSource(ContainerOptions<?, ?> containerOptions) {
        this.shutdownTimeout = containerOptions.getListenerShutdownTimeout();
        this.doConfigure(containerOptions);
    }

    protected abstract void doConfigure(ContainerOptions<?, ?> var1);

    @Override
    public void setId(String id) {
        Assert.notNull((Object)id, (String)"id cannot be null");
        this.id = id;
    }

    @Override
    public void setPollingEndpointName(String pollingEndpointName) {
        Assert.isTrue((boolean)StringUtils.hasText((String)pollingEndpointName), (String)"pollingEndpointName must have text");
        this.pollingEndpointName = pollingEndpointName;
    }

    @Override
    public void setBackPressureHandler(BackPressureHandler backPressureHandler) {
        Assert.notNull((Object)backPressureHandler, (String)"backPressureHandler cannot be null");
        Assert.isInstanceOf(BatchAwareBackPressureHandler.class, (Object)backPressureHandler, (String)(this.getClass().getSimpleName() + " requires a " + BatchAwareBackPressureHandler.class));
        this.backPressureHandler = (BatchAwareBackPressureHandler)backPressureHandler;
    }

    @Override
    public void setAcknowledgementProcessor(AcknowledgementProcessor<T> acknowledgementProcessor) {
        Assert.notNull(acknowledgementProcessor, (String)"acknowledgementProcessor cannot be null");
        this.acknowledgmentProcessor = acknowledgementProcessor;
    }

    @Override
    public void setAcknowledgementResultCallback(AsyncAcknowledgementResultCallback<T> acknowledgementResultCallback) {
        Assert.notNull(acknowledgementResultCallback, (String)"acknowledgementResultCallback must not be null");
        this.acknowledgementResultCallback = acknowledgementResultCallback;
    }

    @Override
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        Assert.notNull((Object)taskExecutor, (String)"taskExecutor cannot be null");
        this.taskExecutor = taskExecutor;
    }

    @Override
    public void setMessageSink(MessageSink<T> messageSink) {
        Assert.notNull(messageSink, (String)"messageSink cannot be null");
        this.messageSink = messageSink;
    }

    @Override
    public String getId() {
        return this.id;
    }

    public boolean isRunning() {
        return this.running;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        if (this.isRunning()) {
            logger.debug("{} for queue {} already running", (Object)this.getClass().getSimpleName(), (Object)this.pollingEndpointName);
            return;
        }
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            Assert.notNull((Object)this.id, (String)"id not set");
            Assert.notNull(this.messageSink, (String)"messageSink not set");
            Assert.notNull((Object)this.backPressureHandler, (String)"backPressureHandler not set");
            Assert.notNull(this.acknowledgmentProcessor, (String)"acknowledgmentProcessor not set");
            logger.debug("Starting {} for queue {}", (Object)this.getClass().getSimpleName(), (Object)this.pollingEndpointName);
            this.running = true;
            ConfigUtils.INSTANCE.acceptIfInstance(this.backPressureHandler, IdentifiableContainerComponent.class, icc -> icc.setId(this.id)).acceptIfInstance(this.acknowledgmentProcessor, IdentifiableContainerComponent.class, icc -> icc.setId(this.id)).acceptIfInstance(this.acknowledgmentProcessor, ExecutingAcknowledgementProcessor.class, eap -> eap.setAcknowledgementResultCallback(this.acknowledgementResultCallback)).acceptIfInstance(this.acknowledgmentProcessor, TaskExecutorAware.class, ea -> ea.setTaskExecutor(this.taskExecutor));
            this.doStart();
            this.setupAcknowledgementForConversion(this.acknowledgmentProcessor.getAcknowledgementCallback());
            this.acknowledgmentProcessor.start();
            this.startPollingThread();
        }
    }

    protected void doStart() {
    }

    private void startPollingThread() {
        this.taskExecutor.execute(this::pollAndEmitMessages);
    }

    private void pollAndEmitMessages() {
        while (this.isRunning()) {
            try {
                if (!this.isRunning()) continue;
                logger.trace("Requesting permits for queue {}", (Object)this.pollingEndpointName);
                int acquiredPermits = this.backPressureHandler.requestBatch();
                if (acquiredPermits == 0) {
                    logger.trace("No permits acquired for queue {}", (Object)this.pollingEndpointName);
                    continue;
                }
                logger.trace("{} permits acquired for queue {}", (Object)acquiredPermits, (Object)this.pollingEndpointName);
                if (!this.isRunning()) {
                    logger.debug("MessageSource was stopped after permits where acquired. Returning {} permits", (Object)acquiredPermits);
                    this.backPressureHandler.release(acquiredPermits);
                    continue;
                }
                ((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)this.managePollingFuture(this.doPollForMessages(acquiredPermits)).exceptionally(this::handlePollingException)).thenApply(msgs -> this.releaseUnusedPermits(acquiredPermits, (Collection<S>)msgs))).thenApply(this::convertMessages)).thenCompose(this::emitMessagesToPipeline)).exceptionally(this::handleSinkException);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException("MessageSource thread interrupted for endpoint " + this.pollingEndpointName, e);
            }
            catch (Exception e) {
                logger.error("Error in MessageSource for queue {}. Resuming", (Object)this.pollingEndpointName, (Object)e);
            }
        }
        logger.debug("Execution thread stopped for queue {}", (Object)this.pollingEndpointName);
    }

    protected abstract CompletableFuture<Collection<S>> doPollForMessages(int var1);

    public Collection<S> releaseUnusedPermits(int permits, Collection<S> msgs) {
        if (msgs.isEmpty() && permits == this.backPressureHandler.getBatchSize()) {
            this.backPressureHandler.releaseBatch();
            logger.trace("Released batch of unused permits for queue {}", (Object)this.pollingEndpointName);
        } else {
            int permitsToRelease = permits - msgs.size();
            this.backPressureHandler.release(permitsToRelease);
            logger.trace("Released {} unused permits for queue {}", (Object)permitsToRelease, (Object)this.pollingEndpointName);
        }
        return msgs;
    }

    private CompletableFuture<Void> emitMessagesToPipeline(Collection<Message<T>> messages) {
        if (messages.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        return this.messageSink.emit(messages, this.createContext());
    }

    protected MessageProcessingContext<T> createContext() {
        return MessageProcessingContext.create().setBackPressureReleaseCallback(this::releaseBackPressure).setAcknowledgmentCallback(this.getAcknowledgementCallback());
    }

    protected AcknowledgementCallback<T> getAcknowledgementCallback() {
        return this.acknowledgmentProcessor.getAcknowledgementCallback();
    }

    private void releaseBackPressure() {
        logger.debug("Releasing permit for queue {}", (Object)this.pollingEndpointName);
        this.backPressureHandler.release(1);
    }

    private Void handleSinkException(Throwable t) {
        logger.error("Error processing message", t instanceof CompletionException ? t.getCause() : t);
        return null;
    }

    private Collection<S> handlePollingException(Throwable t) {
        logger.error("Error polling for messages in queue {}", (Object)this.pollingEndpointName, (Object)t);
        return Collections.emptyList();
    }

    private <F> CompletableFuture<F> managePollingFuture(CompletableFuture<F> pollingFuture) {
        this.pollingFutures.add(pollingFuture);
        pollingFuture.thenRun(() -> this.pollingFutures.remove(pollingFuture));
        return pollingFuture;
    }

    protected String getPollingEndpointName() {
        return this.pollingEndpointName;
    }

    protected AcknowledgementProcessor<T> getAcknowledgmentProcessor() {
        return this.acknowledgmentProcessor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        if (!this.isRunning()) {
            logger.debug("{} for queue {} not running", (Object)this.getClass().getSimpleName(), (Object)this.pollingEndpointName);
        }
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            logger.debug("Stopping {} for queue {}", (Object)this.getClass().getSimpleName(), (Object)this.pollingEndpointName);
            this.running = false;
            if (!this.waitExistingTasksToFinish()) {
                logger.warn("Tasks did not finish in {} seconds for queue {}, proceeding with shutdown", (Object)this.shutdownTimeout.getSeconds(), (Object)this.pollingEndpointName);
                this.pollingFutures.forEach(pollingFuture -> pollingFuture.cancel(true));
            }
            this.doStop();
            this.acknowledgmentProcessor.stop();
            logger.debug("{} for queue {} stopped", (Object)this.getClass().getSimpleName(), (Object)this.pollingEndpointName);
        }
    }

    protected void doStop() {
    }

    private boolean waitExistingTasksToFinish() {
        if (this.shutdownTimeout.isZero()) {
            logger.debug("Shutdown timeout set to zero for queue {} - not waiting for tasks to finish", (Object)this.pollingEndpointName);
            return false;
        }
        return this.backPressureHandler.drain(this.shutdownTimeout);
    }
}

