/*
 * Decompiled with CFR 0.152.
 */
package io.awspring.cloud.sqs.listener;

import io.awspring.cloud.sqs.ConfigUtils;
import io.awspring.cloud.sqs.FifoUtils;
import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.listener.ContainerComponentFactory;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.ListenerMode;
import io.awspring.cloud.sqs.listener.SqsContainerOptions;
import io.awspring.cloud.sqs.listener.SqsContainerOptionsBuilder;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementOrdering;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.acknowledgement.BatchingAcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.acknowledgement.ImmediateAcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.sink.BatchMessageSink;
import io.awspring.cloud.sqs.listener.sink.MessageSink;
import io.awspring.cloud.sqs.listener.sink.OrderedMessageSink;
import io.awspring.cloud.sqs.listener.sink.adapter.MessageGroupingSinkAdapter;
import io.awspring.cloud.sqs.listener.sink.adapter.MessageVisibilityExtendingSinkAdapter;
import io.awspring.cloud.sqs.listener.source.FifoSqsMessageSource;
import io.awspring.cloud.sqs.listener.source.MessageSource;
import java.time.Duration;
import java.util.Collection;
import java.util.function.Function;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;

public class FifoSqsComponentFactory<T>
implements ContainerComponentFactory<T, SqsContainerOptions> {
    private static final Duration DEFAULT_FIFO_SQS_ACK_INTERVAL = Duration.ZERO;
    private static final Integer DEFAULT_FIFO_SQS_ACK_THRESHOLD = 0;
    private static final AcknowledgementOrdering DEFAULT_FIFO_SQS_ACK_ORDERING_IMMEDIATE = AcknowledgementOrdering.PARALLEL;
    private static final AcknowledgementOrdering DEFAULT_FIFO_SQS_ACK_ORDERING_BATCHING = AcknowledgementOrdering.ORDERED;

    @Override
    public boolean supports(Collection<String> queueNames, SqsContainerOptions options) {
        return FifoUtils.areAllFifo(queueNames);
    }

    @Override
    public MessageSource<T> createMessageSource(SqsContainerOptions options) {
        return new FifoSqsMessageSource();
    }

    @Override
    public MessageSink<T> createMessageSink(SqsContainerOptions options) {
        MessageSink<T> deliverySink = this.createDeliverySink(options.getListenerMode());
        return new MessageGroupingSinkAdapter<T>(this.maybeWrapWithVisibilityAdapter(deliverySink, options.getMessageVisibility()), this.getMessageGroupingFunction());
    }

    private MessageSink<T> createDeliverySink(ListenerMode listenerMode) {
        return ListenerMode.SINGLE_MESSAGE.equals((Object)listenerMode) ? new OrderedMessageSink() : new BatchMessageSink();
    }

    private MessageSink<T> maybeWrapWithVisibilityAdapter(MessageSink<T> deliverySink, @Nullable Duration messageVisibility) {
        return messageVisibility != null ? this.addMessageVisibilityExtendingSinkAdapter(deliverySink, messageVisibility) : deliverySink;
    }

    private MessageVisibilityExtendingSinkAdapter<T> addMessageVisibilityExtendingSinkAdapter(MessageSink<T> deliverySink, Duration messageVisibility) {
        MessageVisibilityExtendingSinkAdapter<T> visibilityAdapter = new MessageVisibilityExtendingSinkAdapter<T>(deliverySink);
        visibilityAdapter.setMessageVisibility(messageVisibility);
        return visibilityAdapter;
    }

    private Function<Message<T>, String> getMessageGroupingFunction() {
        return message -> MessageHeaderUtils.getHeaderAsString(message, "Sqs_Msa_MessageGroupId");
    }

    @Override
    public AcknowledgementProcessor<T> createAcknowledgementProcessor(SqsContainerOptions options) {
        this.validateFifoOptions(options);
        return this.hasNoAcknowledgementIntervalSet(options) && this.hasNoAcknowledgementThresholdSet(options) ? this.createAndConfigureImmediateProcessor(options) : this.createAndConfigureBatchingAckProcessor(options);
    }

    private void validateFifoOptions(SqsContainerOptions options) {
        Assert.isTrue((options.getMessageSystemAttributeNames().contains(QueueAttributeName.ALL.toString()) || options.getMessageSystemAttributeNames().contains(MessageSystemAttributeName.MESSAGE_GROUP_ID.toString()) ? 1 : 0) != 0, (String)"MessageSystemAttributeName.MESSAGE_GROUP_ID is required for FIFO queues.");
    }

    private boolean hasNoAcknowledgementThresholdSet(SqsContainerOptions options) {
        return options.getAcknowledgementThreshold() == null || DEFAULT_FIFO_SQS_ACK_THRESHOLD.equals(options.getAcknowledgementThreshold());
    }

    private boolean hasNoAcknowledgementIntervalSet(SqsContainerOptions options) {
        return options.getAcknowledgementInterval() == null || DEFAULT_FIFO_SQS_ACK_INTERVAL.equals(options.getAcknowledgementInterval());
    }

    private ImmediateAcknowledgementProcessor<T> createAndConfigureImmediateProcessor(SqsContainerOptions options) {
        return this.configureImmediateProcessor(this.createImmediateProcessorInstance(), options);
    }

    private BatchingAcknowledgementProcessor<T> createAndConfigureBatchingAckProcessor(SqsContainerOptions options) {
        return this.configureBatchingAckProcessor(options, this.createBatchingProcessorInstance());
    }

    protected ImmediateAcknowledgementProcessor<T> createImmediateProcessorInstance() {
        return new ImmediateAcknowledgementProcessor();
    }

    protected BatchingAcknowledgementProcessor<T> createBatchingProcessorInstance() {
        return new BatchingAcknowledgementProcessor();
    }

    protected ImmediateAcknowledgementProcessor<T> configureImmediateProcessor(ImmediateAcknowledgementProcessor<T> processor, SqsContainerOptions options) {
        processor.setMaxAcknowledgementsPerBatch(10);
        if (AcknowledgementOrdering.ORDERED_BY_GROUP.equals((Object)options.getAcknowledgementOrdering())) {
            processor.setMessageGroupingFunction(this.getMessageGroupingFunction());
        }
        SqsContainerOptionsBuilder builder = options.toBuilder();
        ConfigUtils.INSTANCE.acceptIfNotNullOrElse(builder::acknowledgementOrdering, options.getAcknowledgementOrdering(), DEFAULT_FIFO_SQS_ACK_ORDERING_IMMEDIATE);
        processor.configure((ContainerOptions<?, ?>)builder.build());
        return processor;
    }

    protected BatchingAcknowledgementProcessor<T> configureBatchingAckProcessor(SqsContainerOptions options, BatchingAcknowledgementProcessor<T> processor) {
        SqsContainerOptionsBuilder builder = options.toBuilder();
        ConfigUtils.INSTANCE.acceptIfNotNullOrElse(builder::acknowledgementInterval, options.getAcknowledgementInterval(), DEFAULT_FIFO_SQS_ACK_INTERVAL).acceptIfNotNullOrElse(builder::acknowledgementThreshold, options.getAcknowledgementThreshold(), DEFAULT_FIFO_SQS_ACK_THRESHOLD).acceptIfNotNullOrElse(builder::acknowledgementOrdering, options.getAcknowledgementOrdering(), DEFAULT_FIFO_SQS_ACK_ORDERING_BATCHING);
        processor.setMaxAcknowledgementsPerBatch(10);
        if (AcknowledgementOrdering.ORDERED_BY_GROUP.equals((Object)options.getAcknowledgementOrdering())) {
            processor.setMessageGroupingFunction(this.getMessageGroupingFunction());
        }
        processor.configure((ContainerOptions<?, ?>)builder.build());
        return processor;
    }
}

