/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.aws.inbound.kinesis;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordsFetcherFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.SimpleRecordsFetcherFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.model.Record;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.springframework.core.AttributeAccessor;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.serializer.support.DeserializingConverter;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.support.ExecutorServiceAdapter;
import org.springframework.integration.aws.inbound.kinesis.CheckpointMode;
import org.springframework.integration.aws.inbound.kinesis.ListenerMode;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mapping.InboundMessageMapper;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

@ManagedResource
@IntegrationManagedResource
public class KclMessageDrivenChannelAdapter
extends MessageProducerSupport {
    private static final ThreadLocal<AttributeAccessor> attributesHolder = new ThreadLocal();
    private static final long DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(1L).toMillis();
    private static final long DEFAULT_COMPLETED_LEASE_CLEANUP_THRESHOLD_MILLIS = Duration.ofMinutes(5L).toMillis();
    private static final long DEFAULT_GARBAGE_LEASE_CLEANUP_THRESHOLD_MILLIS = Duration.ofMinutes(30L).toMillis();
    private final RecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
    private final String stream;
    private final AmazonKinesis kinesisClient;
    private final AWSCredentialsProvider kinesisProxyCredentialsProvider;
    private final AmazonCloudWatch cloudWatchClient;
    private final AmazonDynamoDB dynamoDBClient;
    private TaskExecutor executor = new SimpleAsyncTaskExecutor();
    private String consumerGroup = "SpringIntegration";
    private InboundMessageMapper<byte[]> embeddedHeadersMapper;
    private KinesisClientLibConfiguration config;
    private InitialPositionInStream streamInitialSequence = InitialPositionInStream.LATEST;
    private int idleBetweenPolls = 1000;
    private int consumerBackoff = 1000;
    private Converter<byte[], Object> converter = new DeserializingConverter();
    private ListenerMode listenerMode = ListenerMode.record;
    private long checkpointsInterval = 5000L;
    private CheckpointMode checkpointMode = CheckpointMode.batch;
    private String workerId = UUID.randomUUID().toString();
    private boolean bindSourceRecord;
    private volatile Worker scheduler;

    public KclMessageDrivenChannelAdapter(String streams) {
        this(streams, AmazonKinesisClientBuilder.defaultClient(), AmazonCloudWatchClientBuilder.defaultClient(), AmazonDynamoDBClientBuilder.defaultClient(), (AWSCredentialsProvider)new DefaultAWSCredentialsProviderChain());
    }

    public KclMessageDrivenChannelAdapter(String streams, Regions region) {
        this(streams, (AmazonKinesis)((AmazonKinesisClientBuilder)AmazonKinesisClient.builder().withRegion(region)).build(), (AmazonCloudWatch)((AmazonCloudWatchClientBuilder)AmazonCloudWatchClient.builder().withRegion(region)).build(), (AmazonDynamoDB)((AmazonDynamoDBClientBuilder)AmazonDynamoDBClient.builder().withRegion(region)).build(), (AWSCredentialsProvider)new DefaultAWSCredentialsProviderChain());
    }

    public KclMessageDrivenChannelAdapter(String stream, AmazonKinesis kinesisClient, AmazonCloudWatch cloudWatchClient, AmazonDynamoDB dynamoDBClient, AWSCredentialsProvider kinesisProxyCredentialsProvider) {
        Assert.notNull((Object)stream, (String)"'stream' must not be null.");
        Assert.notNull((Object)kinesisClient, (String)"'kinesisClient' must not be null.");
        Assert.notNull((Object)cloudWatchClient, (String)"'cloudWatchClient' must not be null.");
        Assert.notNull((Object)dynamoDBClient, (String)"'dynamoDBClient' must not be null.");
        Assert.notNull((Object)kinesisProxyCredentialsProvider, (String)"'kinesisProxyCredentialsProvider' must not be null.");
        this.stream = stream;
        this.kinesisClient = kinesisClient;
        this.cloudWatchClient = cloudWatchClient;
        this.dynamoDBClient = dynamoDBClient;
        this.kinesisProxyCredentialsProvider = kinesisProxyCredentialsProvider;
    }

    public KclMessageDrivenChannelAdapter(KinesisClientLibConfiguration kinesisClientLibConfiguration) {
        this(kinesisClientLibConfiguration, AmazonKinesisClientBuilder.defaultClient(), AmazonCloudWatchClientBuilder.defaultClient(), AmazonDynamoDBClientBuilder.defaultClient());
    }

    public KclMessageDrivenChannelAdapter(KinesisClientLibConfiguration kinesisClientLibConfiguration, AmazonKinesis kinesisClient, AmazonCloudWatch cloudWatchClient, AmazonDynamoDB dynamoDBClient) {
        Assert.notNull((Object)kinesisClientLibConfiguration, (String)"'kinesisClientLibConfiguration' must not be null.");
        Assert.notNull((Object)kinesisClient, (String)"'kinesisClient' must not be null.");
        Assert.notNull((Object)cloudWatchClient, (String)"'cloudWatchClient' must not be null.");
        Assert.notNull((Object)dynamoDBClient, (String)"'dynamoDBClient' must not be null.");
        this.config = kinesisClientLibConfiguration;
        this.stream = this.config.getStreamName();
        this.kinesisClient = kinesisClient;
        this.cloudWatchClient = cloudWatchClient;
        this.dynamoDBClient = dynamoDBClient;
        this.kinesisProxyCredentialsProvider = null;
    }

    public void setExecutor(TaskExecutor executor) {
        Assert.notNull((Object)executor, (String)"'executor' must not be null.");
        this.executor = executor;
    }

    public void setConsumerGroup(String consumerGroup) {
        Assert.hasText((String)consumerGroup, (String)"'consumerGroup' must not be empty");
        Assert.isNull((Object)this.config, (String)"'consumerGroup' must be configured as an application name on the provided KinesisClientLibConfiguration");
        this.consumerGroup = consumerGroup;
    }

    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    public void setEmbeddedHeadersMapper(InboundMessageMapper<byte[]> embeddedHeadersMapper) {
        this.embeddedHeadersMapper = embeddedHeadersMapper;
    }

    public void setStreamInitialSequence(InitialPositionInStream streamInitialSequence) {
        Assert.notNull((Object)streamInitialSequence, (String)"'streamInitialSequence' must not be null");
        Assert.isNull((Object)this.config, (String)"'streamInitialSequence' must be configured as an 'initialPositionInStream' on the provided KinesisClientLibConfiguration");
        this.streamInitialSequence = streamInitialSequence;
    }

    public void setIdleBetweenPolls(int idleBetweenPolls) {
        Assert.isNull((Object)this.config, (String)"'idleBetweenPolls' must be configured as an 'idleTimeBetweenReadsInMillis' on the provided KinesisClientLibConfiguration");
        this.idleBetweenPolls = Math.max(250, idleBetweenPolls);
    }

    public void setConsumerBackoff(int consumerBackoff) {
        Assert.isNull((Object)this.config, (String)"'consumerBackoff' must be configured as an 'taskBackoffTimeMillis' on the provided KinesisClientLibConfiguration");
        this.consumerBackoff = Math.max(1000, consumerBackoff);
    }

    public void setConverter(Converter<byte[], Object> converter) {
        this.converter = converter;
    }

    public void setListenerMode(ListenerMode listenerMode) {
        Assert.notNull((Object)((Object)listenerMode), (String)"'listenerMode' must not be null");
        this.listenerMode = listenerMode;
    }

    public void setCheckpointsInterval(long checkpointsInterval) {
        this.checkpointsInterval = checkpointsInterval;
    }

    public void setCheckpointMode(CheckpointMode checkpointMode) {
        Assert.notNull((Object)((Object)checkpointMode), (String)"'checkpointMode' must not be null");
        this.checkpointMode = checkpointMode;
    }

    public void setWorkerId(String workerId) {
        Assert.hasText((String)workerId, (String)"'workerId' must not be null or empty");
        Assert.isNull((Object)this.config, (String)"'workerId' must be configured on the provided KinesisClientLibConfiguration");
        this.workerId = workerId;
    }

    public void setBindSourceRecord(boolean bindSourceRecord) {
        this.bindSourceRecord = bindSourceRecord;
    }

    protected void onInit() {
        super.onInit();
        if (this.config == null) {
            this.config = new KinesisClientLibConfiguration(this.consumerGroup, this.stream, null, null, this.streamInitialSequence, this.kinesisProxyCredentialsProvider, null, null, 10000L, this.workerId, 10000, (long)this.idleBetweenPolls, false, 10000L, 60000L, true, new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(), (long)this.consumerBackoff, 10000L, 10000, true, null, 5000L, KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE, (RecordsFetcherFactory)new SimpleRecordsFetcherFactory(), DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS, DEFAULT_COMPLETED_LEASE_CLEANUP_THRESHOLD_MILLIS, DEFAULT_GARBAGE_LEASE_CLEANUP_THRESHOLD_MILLIS);
        }
        this.consumerGroup = this.config.getApplicationName();
    }

    protected void doStart() {
        super.doStart();
        if (ListenerMode.batch.equals((Object)this.listenerMode) && CheckpointMode.record.equals((Object)this.checkpointMode)) {
            this.checkpointMode = CheckpointMode.batch;
            this.logger.warn((CharSequence)"The 'checkpointMode' is overridden from [CheckpointMode.record] to [CheckpointMode.batch] because it does not make sense in case of [ListenerMode.batch].");
        }
        this.scheduler = new Worker.Builder().kinesisClient(this.kinesisClient).dynamoDBClient(this.dynamoDBClient).cloudWatchClient(this.cloudWatchClient).recordProcessorFactory((IRecordProcessorFactory)this.recordProcessorFactory).execService((ExecutorService)new ExecutorServiceAdapter(this.executor)).config(this.config).build();
        this.executor.execute((Runnable)this.scheduler);
    }

    protected void doStop() {
        super.doStop();
        this.scheduler.shutdown();
    }

    public void destroy() {
        super.destroy();
        if (this.isRunning()) {
            this.scheduler.shutdown();
        }
    }

    protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
        AttributeAccessor attributes = attributesHolder.get();
        if (attributes == null) {
            return super.getErrorMessageAttributes(message);
        }
        return attributes;
    }

    public String toString() {
        return "KclMessageDrivenChannelAdapter{consumerGroup='" + this.consumerGroup + '\'' + ", stream='" + this.stream + "'}";
    }

    private class RecordProcessorFactory
    implements IRecordProcessorFactory {
        private RecordProcessorFactory() {
        }

        public IRecordProcessor createProcessor() {
            return new RecordProcessor();
        }
    }

    private class RecordProcessor
    implements IRecordProcessor {
        private String shardId;
        private long nextCheckpointTimeInMillis;

        private RecordProcessor() {
        }

        public void initialize(String shardId) {
            this.shardId = shardId;
            KclMessageDrivenChannelAdapter.this.logger.info(() -> "Initializing record processor for shard: " + this.shardId);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
            KclMessageDrivenChannelAdapter.this.logger.debug(() -> "Processing " + records.size() + " records from " + this.shardId);
            try {
                if (ListenerMode.record.equals((Object)KclMessageDrivenChannelAdapter.this.listenerMode)) {
                    for (Record record : records) {
                        this.processSingleRecord(record, checkpointer);
                        this.checkpointIfRecordMode(checkpointer, record);
                        this.checkpointIfPeriodicMode(checkpointer, record);
                    }
                } else if (ListenerMode.batch.equals((Object)KclMessageDrivenChannelAdapter.this.listenerMode)) {
                    this.processMultipleRecords(records, checkpointer);
                    this.checkpointIfPeriodicMode(checkpointer, null);
                }
                this.checkpointIfBatchMode(checkpointer);
            }
            finally {
                attributesHolder.remove();
            }
        }

        private void processSingleRecord(Record record, IRecordProcessorCheckpointer checkpointer) {
            this.performSend(this.prepareMessageForRecord(record), record, checkpointer);
        }

        private void processMultipleRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
            AbstractIntegrationMessageBuilder messageBuilder = KclMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(records);
            if (KclMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) {
                List payload = records.stream().map(this::prepareMessageForRecord).map(AbstractIntegrationMessageBuilder::build).collect(Collectors.toList());
                messageBuilder = KclMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(payload);
            } else if (KclMessageDrivenChannelAdapter.this.converter != null) {
                ArrayList partitionKeys = new ArrayList();
                ArrayList sequenceNumbers = new ArrayList();
                List payload = records.stream().map(r -> {
                    partitionKeys.add(r.getPartitionKey());
                    sequenceNumbers.add(r.getSequenceNumber());
                    return KclMessageDrivenChannelAdapter.this.converter.convert((Object)r.getData().array());
                }).collect(Collectors.toList());
                messageBuilder = KclMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(payload).setHeader("aws_receivedPartitionKey", partitionKeys).setHeader("aws_receivedSequenceNumber", sequenceNumbers);
            }
            this.performSend(messageBuilder, records, checkpointer);
        }

        private AbstractIntegrationMessageBuilder<Object> prepareMessageForRecord(Record record) {
            Object payload = record.getData().array();
            Message messageToUse = null;
            if (KclMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) {
                try {
                    messageToUse = KclMessageDrivenChannelAdapter.this.embeddedHeadersMapper.toMessage(payload);
                    if (messageToUse == null) {
                        throw new IllegalStateException("The 'embeddedHeadersMapper' returned null for payload: " + Arrays.toString(payload));
                    }
                    payload = messageToUse.getPayload();
                }
                catch (Exception ex) {
                    KclMessageDrivenChannelAdapter.this.logger.warn((Throwable)ex, (CharSequence)"Could not parse embedded headers. Remain payload untouched.");
                }
            }
            if (payload instanceof byte[] && KclMessageDrivenChannelAdapter.this.converter != null) {
                payload = KclMessageDrivenChannelAdapter.this.converter.convert(payload);
            }
            AbstractIntegrationMessageBuilder messageBuilder = KclMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(payload).setHeader("aws_receivedPartitionKey", (Object)record.getPartitionKey()).setHeader("aws_receivedSequenceNumber", (Object)record.getSequenceNumber());
            if (KclMessageDrivenChannelAdapter.this.bindSourceRecord) {
                messageBuilder.setHeader("sourceData", (Object)record);
            }
            if (messageToUse != null) {
                messageBuilder.copyHeadersIfAbsent((Map)messageToUse.getHeaders());
            }
            return messageBuilder;
        }

        private void performSend(AbstractIntegrationMessageBuilder<?> messageBuilder, Object rawRecord, IRecordProcessorCheckpointer checkpointer) {
            messageBuilder.setHeader("aws_receivedStream", (Object)KclMessageDrivenChannelAdapter.this.stream).setHeader("aws_shard", (Object)this.shardId);
            if (CheckpointMode.manual.equals((Object)KclMessageDrivenChannelAdapter.this.checkpointMode)) {
                messageBuilder.setHeader("aws_checkpointer", (Object)checkpointer);
            }
            Message messageToSend = messageBuilder.build();
            this.setAttributesIfNecessary(rawRecord, messageToSend);
            try {
                KclMessageDrivenChannelAdapter.this.sendMessage(messageToSend);
            }
            catch (Exception ex) {
                KclMessageDrivenChannelAdapter.this.logger.error((Throwable)ex, () -> "Got an exception during sending a '" + messageToSend + "'\nfor the '" + rawRecord + "'.\nConsider to use 'errorChannel' flow for the compensation logic.");
            }
        }

        private void setAttributesIfNecessary(Object record, Message<?> message) {
            if (KclMessageDrivenChannelAdapter.this.getErrorChannel() != null) {
                AttributeAccessor attributes = ErrorMessageUtils.getAttributeAccessor(message, null);
                attributesHolder.set(attributes);
                attributes.setAttribute("aws_rawRecord", record);
            }
        }

        private void checkpoint(IRecordProcessorCheckpointer checkpointer, @Nullable Record record) {
            KclMessageDrivenChannelAdapter.this.logger.info(() -> "Checkpointing shard " + this.shardId);
            try {
                if (record == null) {
                    checkpointer.checkpoint();
                } else {
                    checkpointer.checkpoint(record);
                }
            }
            catch (ShutdownException se) {
                KclMessageDrivenChannelAdapter.this.logger.info((Throwable)se, (CharSequence)"Caught shutdown exception, skipping checkpoint.");
            }
            catch (ThrottlingException ex) {
                KclMessageDrivenChannelAdapter.this.logger.info((Throwable)ex, (CharSequence)"Transient issue when checkpointing");
            }
            catch (InvalidStateException ex) {
                KclMessageDrivenChannelAdapter.this.logger.error((Throwable)ex, (CharSequence)"Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client.");
            }
        }

        private void checkpointIfBatchMode(IRecordProcessorCheckpointer checkpointer) {
            if (CheckpointMode.batch.equals((Object)KclMessageDrivenChannelAdapter.this.checkpointMode)) {
                this.checkpoint(checkpointer, null);
            }
        }

        private void checkpointIfRecordMode(IRecordProcessorCheckpointer checkpointer, Record record) {
            if (CheckpointMode.record.equals((Object)KclMessageDrivenChannelAdapter.this.checkpointMode)) {
                this.checkpoint(checkpointer, record);
            }
        }

        private void checkpointIfPeriodicMode(IRecordProcessorCheckpointer checkpointer, @Nullable Record record) {
            if (CheckpointMode.periodic.equals((Object)KclMessageDrivenChannelAdapter.this.checkpointMode) && System.currentTimeMillis() > this.nextCheckpointTimeInMillis) {
                this.checkpoint(checkpointer, record);
                this.nextCheckpointTimeInMillis = System.currentTimeMillis() + KclMessageDrivenChannelAdapter.this.checkpointsInterval;
            }
        }

        public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
            KclMessageDrivenChannelAdapter.this.logger.info(() -> "Scheduler is shutting down for reason '" + reason + "'; checkpointing...");
            try {
                checkpointer.checkpoint();
            }
            catch (InvalidStateException | ShutdownException ex) {
                KclMessageDrivenChannelAdapter.this.logger.error(ex, (CharSequence)"Exception while checkpointing at requested shutdown. Giving up");
            }
        }
    }
}

