/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.aws.inbound.kinesis;

import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.AttributeAccessor;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.log.LogMessage;
import org.springframework.core.serializer.support.DeserializingConverter;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.aws.event.KinesisShardEndedEvent;
import org.springframework.integration.aws.inbound.kinesis.CheckpointMode;
import org.springframework.integration.aws.inbound.kinesis.ListenerMode;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mapping.InboundMessageMapper;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.coordinator.CoordinatorConfig;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.exceptions.ThrottlingException;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
import software.amazon.kinesis.processor.MultiStreamTracker;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.SingleStreamTracker;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.RetrievalSpecificConfig;
import software.amazon.kinesis.retrieval.fanout.FanOutConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

@ManagedResource
@IntegrationManagedResource
public class KclMessageDrivenChannelAdapter
extends MessageProducerSupport
implements ApplicationEventPublisherAware {
    private static final ThreadLocal<AttributeAccessor> attributesHolder = new ThreadLocal();
    private final ShardRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
    private final String[] streams;
    private final KinesisAsyncClient kinesisClient;
    private final CloudWatchAsyncClient cloudWatchClient;
    private final DynamoDbAsyncClient dynamoDBClient;
    private TaskExecutor executor = new SimpleAsyncTaskExecutor();
    private String consumerGroup = "SpringIntegration";
    @Nullable
    private String leaseTableName;
    private InboundMessageMapper<byte[]> embeddedHeadersMapper;
    private ConfigsBuilder config;
    private InitialPositionInStreamExtended streamInitialSequence = InitialPositionInStreamExtended.newInitialPosition((InitialPositionInStream)InitialPositionInStream.LATEST);
    private int consumerBackoff = 1000;
    private Converter<byte[], Object> converter = new DeserializingConverter();
    private ListenerMode listenerMode = ListenerMode.record;
    private long checkpointsInterval = 5000L;
    private CheckpointMode checkpointMode = CheckpointMode.batch;
    private String workerId = UUID.randomUUID().toString();
    private GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer;
    private boolean bindSourceRecord;
    private boolean fanOut = true;
    private ApplicationEventPublisher applicationEventPublisher;
    private volatile Scheduler scheduler;
    private MetricsLevel metricsLevel = MetricsLevel.DETAILED;
    private Consumer<CoordinatorConfig> coordinatorConfigCustomizer = config -> {};
    private Consumer<LifecycleConfig> lifecycleConfigCustomizer = config -> {};
    private Consumer<MetricsConfig> metricsConfigCustomizer = config -> {};
    private Consumer<LeaseManagementConfig> leaseManagementConfigCustomizer = config -> {};
    private boolean emptyRecordList;
    private int pollingMaxRecords = 10000;
    private long pollingIdleTime = 1500L;
    private long gracefulShutdownTimeout;

    public KclMessageDrivenChannelAdapter(String ... streams) {
        this(KinesisAsyncClient.create(), CloudWatchAsyncClient.create(), DynamoDbAsyncClient.create(), streams);
    }

    public KclMessageDrivenChannelAdapter(Region region, String ... streams) {
        this((KinesisAsyncClient)((KinesisAsyncClientBuilder)KinesisAsyncClient.builder().region(region)).build(), (CloudWatchAsyncClient)((CloudWatchAsyncClientBuilder)CloudWatchAsyncClient.builder().region(region)).build(), (DynamoDbAsyncClient)((DynamoDbAsyncClientBuilder)DynamoDbAsyncClient.builder().region(region)).build(), streams);
    }

    public KclMessageDrivenChannelAdapter(KinesisAsyncClient kinesisClient, CloudWatchAsyncClient cloudWatchClient, DynamoDbAsyncClient dynamoDBClient, String ... streams) {
        Assert.notNull((Object)kinesisClient, (String)"'kinesisClient' must not be null.");
        Assert.notNull((Object)cloudWatchClient, (String)"'cloudWatchClient' must not be null.");
        Assert.notNull((Object)dynamoDBClient, (String)"'dynamoDBClient' must not be null.");
        Assert.notEmpty((Object[])streams, (String)"'streams' must not be empty.");
        this.streams = streams;
        this.kinesisClient = kinesisClient;
        this.cloudWatchClient = cloudWatchClient;
        this.dynamoDBClient = dynamoDBClient;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    public void setExecutor(TaskExecutor executor) {
        Assert.notNull((Object)executor, (String)"'executor' must not be null.");
        this.executor = executor;
    }

    public void setConsumerGroup(String consumerGroup) {
        Assert.hasText((String)consumerGroup, (String)"'consumerGroup' must not be empty");
        this.consumerGroup = consumerGroup;
    }

    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    public void setLeaseTableName(String leaseTableName) {
        this.leaseTableName = leaseTableName;
    }

    public void setEmbeddedHeadersMapper(InboundMessageMapper<byte[]> embeddedHeadersMapper) {
        this.embeddedHeadersMapper = embeddedHeadersMapper;
    }

    public void setStreamInitialSequence(InitialPositionInStreamExtended streamInitialSequence) {
        Assert.notNull((Object)streamInitialSequence, (String)"'streamInitialSequence' must not be null");
        this.streamInitialSequence = streamInitialSequence;
    }

    public void setConsumerBackoff(int consumerBackoff) {
        this.consumerBackoff = Math.max(1000, consumerBackoff);
    }

    public void setConverter(Converter<byte[], Object> converter) {
        this.converter = converter;
    }

    public void setListenerMode(ListenerMode listenerMode) {
        Assert.notNull((Object)((Object)listenerMode), (String)"'listenerMode' must not be null");
        this.listenerMode = listenerMode;
    }

    public void setCheckpointsInterval(long checkpointsInterval) {
        this.checkpointsInterval = checkpointsInterval;
    }

    public void setCheckpointMode(CheckpointMode checkpointMode) {
        Assert.notNull((Object)((Object)checkpointMode), (String)"'checkpointMode' must not be null");
        this.checkpointMode = checkpointMode;
    }

    public void setWorkerId(String workerId) {
        Assert.hasText((String)workerId, (String)"'workerId' must not be null or empty");
        this.workerId = workerId;
    }

    public void setGlueSchemaRegistryDeserializer(GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer) {
        this.glueSchemaRegistryDeserializer = glueSchemaRegistryDeserializer;
    }

    public void setBindSourceRecord(boolean bindSourceRecord) {
        this.bindSourceRecord = bindSourceRecord;
    }

    public void setFanOut(boolean fanOut) {
        this.fanOut = fanOut;
    }

    public void setMetricsLevel(MetricsLevel metricsLevel) {
        Assert.notNull((Object)metricsLevel, (String)"'metricsLevel' must not be null");
        this.metricsLevel = metricsLevel;
    }

    public void setCoordinatorConfigCustomizer(Consumer<CoordinatorConfig> coordinatorConfigCustomizer) {
        Assert.notNull(coordinatorConfigCustomizer, (String)"'coordinatorConfigCustomizer' must not be null");
        this.coordinatorConfigCustomizer = coordinatorConfigCustomizer;
    }

    public void setLifecycleConfigCustomizer(Consumer<LifecycleConfig> lifecycleConfigCustomizer) {
        Assert.notNull(lifecycleConfigCustomizer, (String)"'lifecycleConfigCustomizer' must not be null");
        this.lifecycleConfigCustomizer = lifecycleConfigCustomizer;
    }

    public void setMetricsConfigCustomizer(Consumer<MetricsConfig> metricsConfigCustomizer) {
        Assert.notNull(metricsConfigCustomizer, (String)"'metricsConfigCustomizer' must not be null");
        this.metricsConfigCustomizer = metricsConfigCustomizer;
    }

    public void setLeaseManagementConfigCustomizer(Consumer<LeaseManagementConfig> leaseManagementConfigCustomizer) {
        Assert.notNull(leaseManagementConfigCustomizer, (String)"'leaseManagementConfigCustomizer' must not be null");
        this.leaseManagementConfigCustomizer = leaseManagementConfigCustomizer;
    }

    public void setEmptyRecordList(boolean emptyRecordList) {
        this.emptyRecordList = emptyRecordList;
    }

    public void setPollingMaxRecords(int pollingMaxRecords) {
        this.pollingMaxRecords = pollingMaxRecords;
    }

    public void setPollingIdleTime(long pollingIdleTime) {
        this.pollingIdleTime = pollingIdleTime;
    }

    public void setGracefulShutdownTimeout(long gracefulShutdownTimeout) {
        this.gracefulShutdownTimeout = gracefulShutdownTimeout;
    }

    protected void onInit() {
        super.onInit();
        if (this.listenerMode.equals((Object)ListenerMode.record) && this.emptyRecordList) {
            this.emptyRecordList = false;
            this.logger.warn((CharSequence)"The 'emptyRecordList' is processed only in the [ListenerMode.batch].");
        }
        this.config = new ConfigsBuilder(this.buildStreamTracker(), this.consumerGroup, this.kinesisClient, this.dynamoDBClient, this.cloudWatchClient, this.workerId, this.recordProcessorFactory);
        if (this.leaseTableName != null) {
            this.config.tableName(this.leaseTableName);
        }
    }

    private StreamTracker buildStreamTracker() {
        if (this.streams.length == 1) {
            return new SingleStreamTracker(StreamIdentifier.singleStreamInstance((String)this.streams[0]), this.streamInitialSequence);
        }
        return new StreamsTracker();
    }

    protected void doStart() {
        super.doStart();
        if (ListenerMode.batch.equals((Object)this.listenerMode) && CheckpointMode.record.equals((Object)this.checkpointMode)) {
            this.checkpointMode = CheckpointMode.batch;
            this.logger.warn((CharSequence)"The 'checkpointMode' is overridden from [CheckpointMode.record] to [CheckpointMode.batch] because it does not make sense in case of [ListenerMode.batch].");
        }
        LifecycleConfig lifecycleConfig = this.config.lifecycleConfig();
        lifecycleConfig.taskBackoffTimeMillis((long)this.consumerBackoff);
        this.lifecycleConfigCustomizer.accept(lifecycleConfig);
        String singleStreamName = this.streams.length == 1 ? this.streams[0] : null;
        Object retrievalSpecificConfig = this.fanOut ? new FanOutConfig(this.kinesisClient).applicationName(this.consumerGroup).streamName(singleStreamName) : new PollingConfig(this.kinesisClient).streamName(singleStreamName).maxRecords(this.pollingMaxRecords).idleTimeBetweenReadsInMillis(this.pollingIdleTime);
        RetrievalConfig retrievalConfig = this.config.retrievalConfig().glueSchemaRegistryDeserializer(this.glueSchemaRegistryDeserializer).retrievalSpecificConfig((RetrievalSpecificConfig)retrievalSpecificConfig);
        MetricsConfig metricsConfig = this.config.metricsConfig();
        metricsConfig.metricsLevel(this.metricsLevel);
        if (MetricsLevel.NONE.equals((Object)this.metricsLevel)) {
            metricsConfig.metricsFactory((MetricsFactory)new NullMetricsFactory());
        }
        this.metricsConfigCustomizer.accept(metricsConfig);
        CoordinatorConfig coordinatorConfig = this.config.coordinatorConfig();
        this.coordinatorConfigCustomizer.accept(coordinatorConfig);
        LeaseManagementConfig leaseManagementConfig = this.config.leaseManagementConfig();
        this.leaseManagementConfigCustomizer.accept(leaseManagementConfig);
        ProcessorConfig processorConfig = this.config.processorConfig().callProcessRecordsEvenForEmptyRecordList(this.emptyRecordList);
        this.scheduler = new Scheduler(this.config.checkpointConfig(), coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig);
        this.executor.execute((Runnable)this.scheduler);
    }

    protected void doStop() {
        super.doStop();
        if (this.gracefulShutdownTimeout == 0L) {
            this.scheduler.shutdown();
        } else {
            try {
                this.logger.info((CharSequence)"Start graceful shutdown for KCL...");
                this.scheduler.startGracefulShutdown().get(this.gracefulShutdownTimeout, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException | ExecutionException | TimeoutException ex) {
                throw new RuntimeException("Graceful shutdown for KCL has failed.", ex);
            }
        }
    }

    public void destroy() {
        super.destroy();
        if (this.isRunning()) {
            this.stop();
        }
    }

    protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
        AttributeAccessor attributes = attributesHolder.get();
        if (attributes == null) {
            return super.getErrorMessageAttributes(message);
        }
        return attributes;
    }

    public String toString() {
        return "KclMessageDrivenChannelAdapter{consumerGroup='" + this.consumerGroup + "', stream(s)='" + Arrays.toString(this.streams) + "'}";
    }

    private final class RecordProcessorFactory
    implements ShardRecordProcessorFactory {
        RecordProcessorFactory() {
        }

        public ShardRecordProcessor shardRecordProcessor() {
            throw new UnsupportedOperationException();
        }

        public ShardRecordProcessor shardRecordProcessor(StreamIdentifier streamIdentifier) {
            return new RecordProcessor(streamIdentifier.streamName());
        }
    }

    private final class StreamsTracker
    implements MultiStreamTracker {
        private final FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy = new FormerStreamsLeasesDeletionStrategy.AutoDetectionAndDeferredDeletionStrategy(){

            public Duration waitPeriodToDeleteFormerStreams() {
                return Duration.ZERO;
            }
        };
        private final Flux<StreamConfig> streamConfigs;

        StreamsTracker() {
            this.streamConfigs = Flux.fromArray((Object[])KclMessageDrivenChannelAdapter.this.streams).flatMap(streamName -> Mono.fromFuture((CompletableFuture)KclMessageDrivenChannelAdapter.this.kinesisClient.describeStreamSummary(request -> request.streamName(streamName)))).map(DescribeStreamSummaryResponse::streamDescriptionSummary).map(summary -> StreamIdentifier.multiStreamInstance((Arn)Arn.fromString((String)summary.streamARN()), (long)summary.streamCreationTimestamp().getEpochSecond())).map(streamIdentifier -> new StreamConfig(streamIdentifier, KclMessageDrivenChannelAdapter.this.streamInitialSequence)).cache();
        }

        public List<StreamConfig> streamConfigList() {
            return (List)this.streamConfigs.collectList().block();
        }

        public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy() {
            return this.formerStreamsLeasesDeletionStrategy;
        }
    }

    private final class RecordProcessor
    implements ShardRecordProcessor {
        private final String stream;
        private String shardId;
        private long nextCheckpointTimeInMillis;

        RecordProcessor(String stream) {
            this.stream = stream;
        }

        public void initialize(InitializationInput initializationInput) {
            this.shardId = initializationInput.shardId();
            KclMessageDrivenChannelAdapter.this.logger.info(() -> "Initializing record processor for shard: " + this.shardId);
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            KclMessageDrivenChannelAdapter.this.logger.info((CharSequence)LogMessage.format((String)"Shard [%s] ended; checkpointing...", (Object)this.shardId));
            try {
                shardEndedInput.checkpointer().checkpoint();
            }
            catch (InvalidStateException | ShutdownException ex) {
                KclMessageDrivenChannelAdapter.this.logger.error(ex, (CharSequence)"Exception while checkpointing at requested shutdown. Giving up");
            }
            if (KclMessageDrivenChannelAdapter.this.applicationEventPublisher != null) {
                KclMessageDrivenChannelAdapter.this.applicationEventPublisher.publishEvent((ApplicationEvent)new KinesisShardEndedEvent((Object)KclMessageDrivenChannelAdapter.this, this.shardId));
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            KclMessageDrivenChannelAdapter.this.logger.info((CharSequence)"Scheduler is shutting down; checkpointing...");
            try {
                shutdownRequestedInput.checkpointer().checkpoint();
            }
            catch (InvalidStateException | ShutdownException ex) {
                KclMessageDrivenChannelAdapter.this.logger.error(ex, (CharSequence)"Exception while checkpointing at requested shutdown. Giving up");
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            List records = processRecordsInput.records();
            RecordProcessorCheckpointer checkpointer = processRecordsInput.checkpointer();
            KclMessageDrivenChannelAdapter.this.logger.debug(() -> "Processing " + records.size() + " records from " + this.shardId);
            try {
                if (ListenerMode.record.equals((Object)KclMessageDrivenChannelAdapter.this.listenerMode)) {
                    for (KinesisClientRecord record : records) {
                        this.processSingleRecord(record, checkpointer);
                        this.checkpointIfRecordMode(checkpointer, record);
                        this.checkpointIfPeriodicMode(checkpointer, record);
                    }
                } else if (ListenerMode.batch.equals((Object)KclMessageDrivenChannelAdapter.this.listenerMode)) {
                    this.processMultipleRecords(records, checkpointer);
                    this.checkpointIfPeriodicMode(checkpointer, null);
                }
                this.checkpointIfBatchMode(checkpointer);
            }
            finally {
                attributesHolder.remove();
            }
        }

        private void processSingleRecord(KinesisClientRecord record, RecordProcessorCheckpointer checkpointer) {
            this.performSend(this.prepareMessageForRecord(record), record, checkpointer);
        }

        private void processMultipleRecords(List<KinesisClientRecord> records, RecordProcessorCheckpointer checkpointer) {
            AbstractIntegrationMessageBuilder messageBuilder = KclMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(records);
            if (KclMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) {
                List<Message> payload = records.stream().map(this::prepareMessageForRecord).map(AbstractIntegrationMessageBuilder::build).toList();
                messageBuilder = KclMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(payload);
            } else if (KclMessageDrivenChannelAdapter.this.converter != null) {
                ArrayList partitionKeys = new ArrayList();
                ArrayList sequenceNumbers = new ArrayList();
                List<Object> payload = records.stream().map(r -> {
                    partitionKeys.add(r.partitionKey());
                    sequenceNumbers.add(r.sequenceNumber());
                    return KclMessageDrivenChannelAdapter.this.converter.convert((Object)r.data().array());
                }).toList();
                messageBuilder = KclMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(payload).setHeader("aws_receivedPartitionKey", partitionKeys).setHeader("aws_receivedSequenceNumber", sequenceNumbers);
            }
            this.performSend(messageBuilder, records, checkpointer);
        }

        private AbstractIntegrationMessageBuilder<Object> prepareMessageForRecord(KinesisClientRecord record) {
            Object payload = BinaryUtils.copyAllBytesFrom((ByteBuffer)record.data());
            Message messageToUse = null;
            if (KclMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) {
                try {
                    messageToUse = KclMessageDrivenChannelAdapter.this.embeddedHeadersMapper.toMessage(payload);
                    if (messageToUse == null) {
                        throw new IllegalStateException("The 'embeddedHeadersMapper' returned null for payload: " + Arrays.toString(payload));
                    }
                    payload = messageToUse.getPayload();
                }
                catch (Exception ex) {
                    KclMessageDrivenChannelAdapter.this.logger.warn((Throwable)ex, (CharSequence)"Could not parse embedded headers. Remain payload untouched.");
                }
            }
            if (payload instanceof byte[] && KclMessageDrivenChannelAdapter.this.converter != null) {
                payload = KclMessageDrivenChannelAdapter.this.converter.convert(payload);
            }
            AbstractIntegrationMessageBuilder messageBuilder = KclMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(payload).setHeader("aws_receivedPartitionKey", (Object)record.partitionKey()).setHeader("aws_receivedSequenceNumber", (Object)record.sequenceNumber());
            if (KclMessageDrivenChannelAdapter.this.bindSourceRecord) {
                messageBuilder.setHeader("sourceData", (Object)record);
            }
            if (messageToUse != null) {
                messageBuilder.copyHeadersIfAbsent((Map)messageToUse.getHeaders());
            }
            return messageBuilder;
        }

        private void performSend(AbstractIntegrationMessageBuilder<?> messageBuilder, Object rawRecord, RecordProcessorCheckpointer checkpointer) {
            messageBuilder.setHeader("aws_receivedStream", (Object)this.stream).setHeader("aws_shard", (Object)this.shardId);
            if (CheckpointMode.manual.equals((Object)KclMessageDrivenChannelAdapter.this.checkpointMode)) {
                messageBuilder.setHeader("aws_checkpointer", (Object)checkpointer);
            }
            Message messageToSend = messageBuilder.build();
            this.setAttributesIfNecessary(rawRecord, messageToSend);
            try {
                KclMessageDrivenChannelAdapter.this.sendMessage(messageToSend);
            }
            catch (Exception ex) {
                KclMessageDrivenChannelAdapter.this.logger.error((Throwable)ex, () -> "Got an exception during sending a '" + messageToSend + "'\nfor the '" + rawRecord + "'.\nConsider to use 'errorChannel' flow for the compensation logic.");
            }
        }

        private void setAttributesIfNecessary(Object record, Message<?> message) {
            if (KclMessageDrivenChannelAdapter.this.getErrorChannel() != null) {
                AttributeAccessor attributes = ErrorMessageUtils.getAttributeAccessor(message, null);
                attributesHolder.set(attributes);
                attributes.setAttribute("aws_rawRecord", record);
            }
        }

        private void checkpoint(RecordProcessorCheckpointer checkpointer, @Nullable KinesisClientRecord record) {
            KclMessageDrivenChannelAdapter.this.logger.info(() -> "Checkpointing shard " + this.shardId);
            try {
                if (record == null) {
                    checkpointer.checkpoint();
                } else {
                    checkpointer.checkpoint(record.sequenceNumber());
                }
            }
            catch (ShutdownException se) {
                KclMessageDrivenChannelAdapter.this.logger.info((Throwable)se, (CharSequence)"Caught shutdown exception, skipping checkpoint.");
            }
            catch (ThrottlingException ex) {
                KclMessageDrivenChannelAdapter.this.logger.info((Throwable)ex, (CharSequence)"Transient issue when checkpointing");
            }
            catch (InvalidStateException ex) {
                KclMessageDrivenChannelAdapter.this.logger.error((Throwable)ex, (CharSequence)"Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client.");
            }
        }

        private void checkpointIfBatchMode(RecordProcessorCheckpointer checkpointer) {
            if (CheckpointMode.batch.equals((Object)KclMessageDrivenChannelAdapter.this.checkpointMode)) {
                this.checkpoint(checkpointer, null);
            }
        }

        private void checkpointIfRecordMode(RecordProcessorCheckpointer checkpointer, KinesisClientRecord record) {
            if (CheckpointMode.record.equals((Object)KclMessageDrivenChannelAdapter.this.checkpointMode)) {
                this.checkpoint(checkpointer, record);
            }
        }

        private void checkpointIfPeriodicMode(RecordProcessorCheckpointer checkpointer, @Nullable KinesisClientRecord record) {
            if (CheckpointMode.periodic.equals((Object)KclMessageDrivenChannelAdapter.this.checkpointMode) && System.currentTimeMillis() > this.nextCheckpointTimeInMillis) {
                this.checkpoint(checkpointer, record);
                this.nextCheckpointTimeInMillis = System.currentTimeMillis() + KclMessageDrivenChannelAdapter.this.checkpointsInterval;
            }
        }
    }
}

