/*
 * Decompiled with CFR 0.152.
 */
package io.awspring.cloud.sqs.listener;

import io.awspring.cloud.sqs.ConfigUtils;
import io.awspring.cloud.sqs.listener.ContainerComponentFactory;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.ListenerMode;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementOrdering;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.acknowledgement.BatchingAcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.acknowledgement.ImmediateAcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.sink.BatchMessageSink;
import io.awspring.cloud.sqs.listener.sink.FanOutMessageSink;
import io.awspring.cloud.sqs.listener.sink.MessageSink;
import io.awspring.cloud.sqs.listener.source.MessageSource;
import io.awspring.cloud.sqs.listener.source.SqsMessageSource;
import java.time.Duration;
import java.util.Collection;
import org.springframework.util.Assert;

public class StandardSqsComponentFactory<T>
implements ContainerComponentFactory<T> {
    private static final Duration DEFAULT_STANDARD_SQS_ACK_INTERVAL = Duration.ofSeconds(1L);
    private static final Integer DEFAULT_STANDARD_SQS_ACK_THRESHOLD = 10;
    private static final AcknowledgementOrdering DEFAULT_STANDARD_SQS_ACK_ORDERING = AcknowledgementOrdering.PARALLEL;

    @Override
    public boolean supports(Collection<String> queueNames, ContainerOptions options) {
        return queueNames.stream().noneMatch(name -> name.endsWith(".fifo"));
    }

    @Override
    public MessageSource<T> createMessageSource(ContainerOptions options) {
        return new SqsMessageSource();
    }

    @Override
    public MessageSink<T> createMessageSink(ContainerOptions options) {
        return ListenerMode.SINGLE_MESSAGE.equals((Object)options.getListenerMode()) ? new FanOutMessageSink() : new BatchMessageSink();
    }

    @Override
    public AcknowledgementProcessor<T> createAcknowledgementProcessor(ContainerOptions options) {
        this.validateAcknowledgementOrdering(options);
        return options.getAcknowledgementInterval() == Duration.ZERO && options.getAcknowledgementThreshold() == 0 ? this.createAndConfigureImmediateProcessor(options) : this.createAndConfigureBatchingProcessor(options);
    }

    private void validateAcknowledgementOrdering(ContainerOptions options) {
        Assert.isTrue((!AcknowledgementOrdering.ORDERED_BY_GROUP.equals((Object)options.getAcknowledgementOrdering()) ? 1 : 0) != 0, (String)("Standard SQS queues are not compatible with " + (Object)((Object)AcknowledgementOrdering.ORDERED_BY_GROUP)));
    }

    private AcknowledgementProcessor<T> createAndConfigureBatchingProcessor(ContainerOptions options) {
        return this.configureBatchingAcknowledgementProcessor(options, this.createBatchingProcessorInstance());
    }

    protected ImmediateAcknowledgementProcessor<T> createAndConfigureImmediateProcessor(ContainerOptions options) {
        return this.configureImmediateAcknowledgementProcessor(this.createImmediateProcessorInstance(), options);
    }

    protected ImmediateAcknowledgementProcessor<T> createImmediateProcessorInstance() {
        return new ImmediateAcknowledgementProcessor();
    }

    protected BatchingAcknowledgementProcessor<T> createBatchingProcessorInstance() {
        return new BatchingAcknowledgementProcessor();
    }

    protected ImmediateAcknowledgementProcessor<T> configureImmediateAcknowledgementProcessor(ImmediateAcknowledgementProcessor<T> processor, ContainerOptions options) {
        processor.setMaxAcknowledgementsPerBatch(10);
        ContainerOptions.Builder builder = options.toBuilder();
        ConfigUtils.INSTANCE.acceptIfNotNullOrElse(builder::acknowledgementOrdering, options.getAcknowledgementOrdering(), DEFAULT_STANDARD_SQS_ACK_ORDERING);
        processor.configure(builder.build());
        return processor;
    }

    protected BatchingAcknowledgementProcessor<T> configureBatchingAcknowledgementProcessor(ContainerOptions options, BatchingAcknowledgementProcessor<T> processor) {
        processor.setMaxAcknowledgementsPerBatch(10);
        ContainerOptions.Builder builder = options.toBuilder();
        ConfigUtils.INSTANCE.acceptIfNotNullOrElse(builder::acknowledgementInterval, options.getAcknowledgementInterval(), DEFAULT_STANDARD_SQS_ACK_INTERVAL).acceptIfNotNullOrElse(builder::acknowledgementThreshold, options.getAcknowledgementThreshold(), DEFAULT_STANDARD_SQS_ACK_THRESHOLD).acceptIfNotNullOrElse(builder::acknowledgementOrdering, options.getAcknowledgementOrdering(), DEFAULT_STANDARD_SQS_ACK_ORDERING);
        processor.configure(builder.build());
        return processor;
    }
}

