/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ShardIteratorType;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.EncryptionType;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.Record;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.StartingPosition;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherConfiguration;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.FullJitterBackoff;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class FanOutRecordPublisher
implements RecordPublisher {
    private static final Logger LOG = LoggerFactory.getLogger(FanOutRecordPublisher.class);
    private final FullJitterBackoff backoff;
    private final String consumerArn;
    private final KinesisProxyV2Interface kinesisProxy;
    private final StreamShardHandle subscribedShard;
    private final FanOutRecordPublisherConfiguration configuration;
    private int attempt = 0;
    private StartingPosition nextStartingPosition;

    public FanOutRecordPublisher(StartingPosition startingPosition, String consumerArn, StreamShardHandle subscribedShard, KinesisProxyV2Interface kinesisProxy, FanOutRecordPublisherConfiguration configuration, FullJitterBackoff backoff) {
        this.nextStartingPosition = (StartingPosition)Preconditions.checkNotNull((Object)startingPosition);
        this.consumerArn = (String)Preconditions.checkNotNull((Object)consumerArn);
        this.subscribedShard = (StreamShardHandle)Preconditions.checkNotNull((Object)subscribedShard);
        this.kinesisProxy = (KinesisProxyV2Interface)Preconditions.checkNotNull((Object)kinesisProxy);
        this.configuration = (FanOutRecordPublisherConfiguration)Preconditions.checkNotNull((Object)configuration);
        this.backoff = (FullJitterBackoff)Preconditions.checkNotNull((Object)backoff);
    }

    @Override
    public RecordPublisher.RecordPublisherRunResult run(RecordPublisher.RecordBatchConsumer recordConsumer) throws InterruptedException {
        LOG.info("Running fan out record publisher on {}::{} from {} - {}", new Object[]{this.subscribedShard.getStreamName(), this.subscribedShard.getShard().getShardId(), this.nextStartingPosition.getShardIteratorType(), this.nextStartingPosition.getStartingMarker()});
        Consumer<SubscribeToShardEvent> eventConsumer = event -> {
            RecordBatch recordBatch = new RecordBatch(this.toSdkV1Records(event.records()), this.subscribedShard, event.millisBehindLatest());
            SequenceNumber sequenceNumber = recordConsumer.accept(recordBatch);
            this.nextStartingPosition = this.getNextStartingPosition(sequenceNumber);
        };
        RecordPublisher.RecordPublisherRunResult result = this.runWithBackoff(eventConsumer);
        LOG.info("Subscription expired {}::{}, with status {}", new Object[]{this.subscribedShard.getStreamName(), this.subscribedShard.getShard().getShardId(), result});
        return result;
    }

    private StartingPosition getNextStartingPosition(SequenceNumber latestSequenceNumber) {
        if (SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get().equals(latestSequenceNumber)) {
            Preconditions.checkState((this.nextStartingPosition.getShardIteratorType() == ShardIteratorType.AT_TIMESTAMP ? 1 : 0) != 0);
            return this.nextStartingPosition;
        }
        return StartingPosition.continueFromSequenceNumber(latestSequenceNumber);
    }

    private RecordPublisher.RecordPublisherRunResult runWithBackoff(Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException {
        boolean complete;
        FanOutShardSubscriber fanOutShardSubscriber = new FanOutShardSubscriber(this.consumerArn, this.subscribedShard.getShard().getShardId(), this.kinesisProxy, this.configuration.getSubscribeToShardTimeout());
        try {
            complete = fanOutShardSubscriber.subscribeToShardAndConsumeRecords(this.toSdkV2StartingPosition(this.nextStartingPosition), eventConsumer);
            this.attempt = 0;
        }
        catch (FanOutShardSubscriber.FanOutSubscriberInterruptedException ex) {
            LOG.info("Thread interrupted, closing record publisher for shard {}.", (Object)this.subscribedShard.getShard().getShardId(), (Object)ex);
            return RecordPublisher.RecordPublisherRunResult.CANCELLED;
        }
        catch (FanOutShardSubscriber.RecoverableFanOutSubscriberException ex) {
            this.backoff(ex);
            return RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
        }
        catch (FanOutShardSubscriber.FanOutSubscriberException ex) {
            if (ex.getCause() instanceof ResourceNotFoundException) {
                LOG.warn("Received ResourceNotFoundException. Either the shard does not exist, or the stream subscriber has been deregistered.Marking this shard as complete {} ({})", (Object)this.subscribedShard.getShard().getShardId(), (Object)this.consumerArn);
                return RecordPublisher.RecordPublisherRunResult.COMPLETE;
            }
            if (this.attempt == this.configuration.getSubscribeToShardMaxRetries()) {
                String errorMessage = "Maximum retries exceeded for SubscribeToShard. Failed " + this.configuration.getSubscribeToShardMaxRetries() + " times.";
                LOG.error(errorMessage, ex.getCause());
                throw new RuntimeException(errorMessage, ex.getCause());
            }
            ++this.attempt;
            this.backoff(ex);
            return RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
        }
        return complete ? RecordPublisher.RecordPublisherRunResult.COMPLETE : RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
    }

    private void backoff(Throwable ex) throws InterruptedException {
        long backoffMillis = this.backoff.calculateFullJitterBackoff(this.configuration.getSubscribeToShardBaseBackoffMillis(), this.configuration.getSubscribeToShardMaxBackoffMillis(), this.configuration.getSubscribeToShardExpConstant(), this.attempt);
        LOG.warn("Encountered recoverable error {}. Backing off for {} millis {} ({})", new Object[]{ex.getCause().getClass().getSimpleName(), backoffMillis, this.subscribedShard.getShard().getShardId(), this.consumerArn, ex});
        this.backoff.sleep(backoffMillis);
    }

    private List<org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record> toSdkV1Records(List<Record> records) {
        ArrayList<org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record> sdkV1Records = new ArrayList<org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record>();
        for (Record record : records) {
            sdkV1Records.add(this.toSdkV1Record(record));
        }
        return sdkV1Records;
    }

    private org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record toSdkV1Record(@Nonnull Record record) {
        org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record recordV1 = new org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record().withData(record.data().asByteBuffer()).withSequenceNumber(record.sequenceNumber()).withPartitionKey(record.partitionKey()).withApproximateArrivalTimestamp(new Date(record.approximateArrivalTimestamp().toEpochMilli()));
        EncryptionType encryptionType = record.encryptionType();
        if (encryptionType != null) {
            recordV1.withEncryptionType(encryptionType.name());
        }
        return recordV1;
    }

    private org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.StartingPosition toSdkV2StartingPosition(StartingPosition startingPosition) {
        StartingPosition.Builder builder = org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().type(startingPosition.getShardIteratorType().toString());
        Object marker = startingPosition.getStartingMarker();
        switch (startingPosition.getShardIteratorType()) {
            case AT_TIMESTAMP: {
                Preconditions.checkNotNull((Object)marker, (String)"StartingPosition AT_TIMESTAMP date marker is null.");
                builder.timestamp(((Date)marker).toInstant());
                break;
            }
            case AT_SEQUENCE_NUMBER: 
            case AFTER_SEQUENCE_NUMBER: {
                Preconditions.checkNotNull((Object)marker, (String)"StartingPosition *_SEQUENCE_NUMBER position is null.");
                builder.sequenceNumber(marker.toString());
            }
        }
        return (org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.StartingPosition)builder.build();
    }
}

