/*
 * Decompiled with CFR 0.152.
 */
package io.awspring.cloud.sqs.listener;

import io.awspring.cloud.sqs.ConfigUtils;
import io.awspring.cloud.sqs.LifecycleHandler;
import io.awspring.cloud.sqs.MessageExecutionThreadFactory;
import io.awspring.cloud.sqs.listener.AbstractMessageListenerContainer;
import io.awspring.cloud.sqs.listener.BackPressureHandler;
import io.awspring.cloud.sqs.listener.ContainerComponentFactory;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.IdentifiableContainerComponent;
import io.awspring.cloud.sqs.listener.SemaphoreBackPressureHandler;
import io.awspring.cloud.sqs.listener.TaskExecutorAware;
import io.awspring.cloud.sqs.listener.pipeline.AcknowledgementHandlerExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.AfterProcessingContextInterceptorExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.AfterProcessingInterceptorExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.BeforeProcessingContextInterceptorExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.BeforeProcessingInterceptorExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.ErrorHandlerExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.MessageListenerExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.MessageProcessingConfiguration;
import io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipeline;
import io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder;
import io.awspring.cloud.sqs.listener.sink.MessageProcessingPipelineSink;
import io.awspring.cloud.sqs.listener.sink.MessageSink;
import io.awspring.cloud.sqs.listener.source.AcknowledgementProcessingMessageSource;
import io.awspring.cloud.sqs.listener.source.MessageSource;
import io.awspring.cloud.sqs.listener.source.PollingMessageSource;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public abstract class AbstractPipelineMessageListenerContainer<T>
extends AbstractMessageListenerContainer<T> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractPipelineMessageListenerContainer.class);
    private Collection<MessageSource<T>> messageSources;
    private MessageSink<T> messageSink;
    private TaskExecutor componentsTaskExecutor;

    protected AbstractPipelineMessageListenerContainer(ContainerOptions options) {
        super(options);
    }

    @Override
    protected void doStart() {
        ContainerComponentFactory<T> componentFactory = this.determineComponentFactory();
        this.messageSources = this.createMessageSources(componentFactory);
        this.messageSink = componentFactory.createMessageSink(this.getContainerOptions());
        this.configureComponents(componentFactory);
        LifecycleHandler.get().start(this.messageSink, this.messageSources);
    }

    private ContainerComponentFactory<T> determineComponentFactory() {
        return this.getComponentFactories().stream().filter(factory -> factory.supports(this.getQueueNames(), this.getContainerOptions())).findFirst().orElseThrow(() -> new IllegalArgumentException("No ContainerComponentFactory found for queues " + this.getQueueNames()));
    }

    private Collection<ContainerComponentFactory<T>> getComponentFactories() {
        return this.getContainerComponentFactories() != null ? this.getContainerComponentFactories() : this.getDefaultComponentFactories();
    }

    protected abstract Collection<ContainerComponentFactory<T>> getDefaultComponentFactories();

    protected Collection<MessageSource<T>> createMessageSources(ContainerComponentFactory<T> componentFactory) {
        ArrayList<String> queueNames = new ArrayList<String>(this.getQueueNames());
        return IntStream.range(0, queueNames.size()).mapToObj(index -> this.createMessageSource((String)queueNames.get(index), index, componentFactory)).collect(Collectors.toList());
    }

    protected MessageSource<T> createMessageSource(String queueName, int index, ContainerComponentFactory<T> componentFactory) {
        MessageSource<T> messageSource = componentFactory.createMessageSource(this.getContainerOptions());
        ConfigUtils.INSTANCE.acceptIfInstance(messageSource, PollingMessageSource.class, pms -> pms.setPollingEndpointName(queueName)).acceptIfInstance(messageSource, IdentifiableContainerComponent.class, icc -> icc.setId(this.getId() + "-" + index));
        return messageSource;
    }

    private void configureComponents(ContainerComponentFactory<T> componentFactory) {
        this.getContainerOptions().configure(this.messageSources).configure(this.messageSink);
        this.componentsTaskExecutor = this.resolveComponentsTaskExecutor();
        this.configureMessageSources(componentFactory);
        this.configureMessageSink(this.createMessageProcessingPipeline(componentFactory));
        this.configureContainerComponents();
    }

    protected void configureMessageSources(ContainerComponentFactory<T> componentFactory) {
        TaskExecutor taskExecutor = this.createSourcesTaskExecutor();
        ConfigUtils.INSTANCE.acceptMany(this.messageSources, source -> source.setMessageSink(this.messageSink)).acceptManyIfInstance(this.messageSources, PollingMessageSource.class, pms -> pms.setBackPressureHandler(this.createBackPressureHandler())).acceptManyIfInstance(this.messageSources, AcknowledgementProcessingMessageSource.class, ams -> ams.setAcknowledgementProcessor(componentFactory.createAcknowledgementProcessor(this.getContainerOptions()))).acceptManyIfInstance(this.messageSources, AcknowledgementProcessingMessageSource.class, ams -> ams.setAcknowledgementResultCallback(this.getAcknowledgementResultCallback())).acceptManyIfInstance(this.messageSources, TaskExecutorAware.class, teac -> teac.setTaskExecutor(taskExecutor));
        this.doConfigureMessageSources(this.messageSources);
    }

    protected void doConfigureMessageSources(Collection<MessageSource<T>> messageSources) {
    }

    protected void configureMessageSink(MessageProcessingPipeline<T> messageProcessingPipeline) {
        ConfigUtils.INSTANCE.acceptIfInstance(this.messageSink, IdentifiableContainerComponent.class, icc -> icc.setId(this.getId())).acceptIfInstance(this.messageSink, TaskExecutorAware.class, teac -> teac.setTaskExecutor(this.getComponentsTaskExecutor())).acceptIfInstance(this.messageSink, MessageProcessingPipelineSink.class, mls -> mls.setMessagePipeline(messageProcessingPipeline));
        this.doConfigureMessageSink(this.messageSink);
    }

    protected void doConfigureMessageSink(MessageSink<T> messageSink) {
    }

    protected void configureContainerComponents() {
        ConfigUtils.INSTANCE.acceptManyIfInstance(this.getMessageInterceptors(), TaskExecutorAware.class, teac -> teac.setTaskExecutor(this.getComponentsTaskExecutor())).acceptIfInstance(this.getMessageListener(), TaskExecutorAware.class, teac -> teac.setTaskExecutor(this.getComponentsTaskExecutor())).acceptIfInstance(this.getErrorHandler(), TaskExecutorAware.class, teac -> teac.setTaskExecutor(this.getComponentsTaskExecutor())).acceptIfInstance(this.getAcknowledgementResultCallback(), TaskExecutorAware.class, teac -> teac.setTaskExecutor(this.getComponentsTaskExecutor()));
    }

    protected MessageProcessingPipeline<T> createMessageProcessingPipeline(ContainerComponentFactory<T> componentFactory) {
        return MessageProcessingPipelineBuilder.first(BeforeProcessingContextInterceptorExecutionStage::new).then(BeforeProcessingInterceptorExecutionStage::new).then(MessageListenerExecutionStage::new).thenInTheFuture(ErrorHandlerExecutionStage::new).thenInTheFuture(AfterProcessingInterceptorExecutionStage::new).thenInTheFuture(AfterProcessingContextInterceptorExecutionStage::new).thenInTheFuture(AcknowledgementHandlerExecutionStage::new).build(MessageProcessingConfiguration.builder().interceptors(this.getMessageInterceptors()).messageListener(this.getMessageListener()).errorHandler(this.getErrorHandler()).ackHandler(componentFactory.createAcknowledgementHandler(this.getContainerOptions())).build());
    }

    private TaskExecutor resolveComponentsTaskExecutor() {
        return this.getContainerOptions().getComponentsTaskExecutor() != null ? this.getContainerOptions().getComponentsTaskExecutor() : this.createComponentsTaskExecutor();
    }

    protected BackPressureHandler createBackPressureHandler() {
        return SemaphoreBackPressureHandler.builder().batchSize(this.getContainerOptions().getMaxMessagesPerPoll()).totalPermits(this.getContainerOptions().getMaxInFlightMessagesPerQueue()).acquireTimeout(this.getContainerOptions().getPermitAcquireTimeout()).throughputConfiguration(this.getContainerOptions().getBackPressureMode()).build();
    }

    protected TaskExecutor createSourcesTaskExecutor() {
        SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
        executor.setThreadNamePrefix(this.getId() + "#message_source-");
        return executor;
    }

    protected TaskExecutor createComponentsTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        int poolSize = this.getContainerOptions().getMaxInFlightMessagesPerQueue() * this.messageSources.size();
        executor.setMaxPoolSize(poolSize);
        executor.setCorePoolSize(this.getContainerOptions().getMaxMessagesPerPoll());
        executor.setQueueCapacity(0);
        executor.setAllowCoreThreadTimeOut(true);
        executor.setThreadFactory(this.createThreadFactory());
        executor.afterPropertiesSet();
        return executor;
    }

    protected ThreadFactory createThreadFactory() {
        MessageExecutionThreadFactory threadFactory = new MessageExecutionThreadFactory();
        threadFactory.setThreadNamePrefix(this.getId() + "-");
        return threadFactory;
    }

    @Override
    protected void doStop() {
        LifecycleHandler.get().stop(this.messageSources, this.messageSink);
        this.shutdownComponentsTaskExecutor();
        logger.debug("Container {} stopped", (Object)this.getId());
    }

    protected TaskExecutor getComponentsTaskExecutor() {
        return this.componentsTaskExecutor;
    }

    private void shutdownComponentsTaskExecutor() {
        if (!this.componentsTaskExecutor.equals(this.getContainerOptions().getComponentsTaskExecutor())) {
            LifecycleHandler.get().dispose(this.getComponentsTaskExecutor());
        }
    }
}

