/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.aws.kinesis.stream.record;

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
import org.apache.nifi.processors.aws.kinesis.stream.pause.RecordProcessorBlocker;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.exceptions.ThrottlingException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

public abstract class AbstractKinesisRecordProcessor
implements ShardRecordProcessor {
    public static final String AWS_KINESIS_SHARD_ID = "aws.kinesis.shard.id";
    public static final String AWS_KINESIS_SEQUENCE_NUMBER = "aws.kinesis.sequence.number";
    public static final String AWS_KINESIS_PARTITION_KEY = "aws.kinesis.partition.key";
    public static final String AWS_KINESIS_APPROXIMATE_ARRIVAL_TIMESTAMP = "aws.kinesis.approximate.arrival.timestamp";
    public static final String KINESIS_RECORD_SCHEMA_KEY = "kinesis.name";
    static final Base64.Encoder BASE_64_ENCODER = Base64.getEncoder();
    private final ProcessSessionFactory sessionFactory;
    private final ComponentLog log;
    private final String streamName;
    private final String transitUriPrefix;
    private final long checkpointIntervalMillis;
    private final long retryWaitMillis;
    private final int numRetries;
    private final DateTimeFormatter dateTimeFormatter;
    private final RecordProcessorBlocker recordProcessorBlocker;
    private String kinesisShardId;
    private long nextCheckpointTimeInMillis;
    private boolean processingRecords = false;

    AbstractKinesisRecordProcessor(ProcessSessionFactory sessionFactory, ComponentLog log, String streamName, String endpointPrefix, String kinesisEndpoint, long checkpointIntervalMillis, long retryWaitMillis, int numRetries, DateTimeFormatter dateTimeFormatter, RecordProcessorBlocker recordProcessorBlocker) {
        this.sessionFactory = sessionFactory;
        this.log = log;
        this.streamName = streamName;
        this.checkpointIntervalMillis = checkpointIntervalMillis;
        this.retryWaitMillis = retryWaitMillis;
        this.numRetries = numRetries;
        this.dateTimeFormatter = dateTimeFormatter;
        this.recordProcessorBlocker = recordProcessorBlocker;
        this.transitUriPrefix = StringUtils.isBlank((String)kinesisEndpoint) ? String.format("http://%s.amazonaws.com", endpointPrefix) : kinesisEndpoint;
    }

    public void initialize(InitializationInput initializationInput) {
        if (initializationInput.pendingCheckpointSequenceNumber() != null) {
            this.log.warn("Initializing record processor for stream: {} / shard {}; from sequence number: {}; indicates previously uncheckpointed sequence number: {}", new Object[]{this.streamName, initializationInput.shardId(), initializationInput.extendedSequenceNumber(), initializationInput.pendingCheckpointSequenceNumber()});
        } else {
            this.log.debug("Initializing record processor for stream: {} / shard: {}; from sequence number: {}", new Object[]{this.streamName, initializationInput.shardId(), initializationInput.extendedSequenceNumber()});
        }
        this.kinesisShardId = initializationInput.shardId();
        this.nextCheckpointTimeInMillis = System.currentTimeMillis() + this.checkpointIntervalMillis;
    }

    public void processRecords(ProcessRecordsInput processRecordsInput) {
        block6: {
            try {
                this.recordProcessorBlocker.await();
            }
            catch (InterruptedException ie) {
                this.getLogger().debug("Interrupted while waiting for recordProcessorBlocker to unblock, resuming record processing", (Throwable)ie);
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Processing {} records from {}; cache entry: {}; cache exit: {}; millis behind latest: {}", new Object[]{processRecordsInput.records().size(), this.kinesisShardId, processRecordsInput.cacheEntryTime() != null ? this.dateTimeFormatter.format(processRecordsInput.cacheEntryTime().atZone(ZoneId.systemDefault())) : null, processRecordsInput.cacheExitTime() != null ? this.dateTimeFormatter.format(processRecordsInput.cacheExitTime().atZone(ZoneId.systemDefault())) : null, processRecordsInput.millisBehindLatest()});
            }
            ProcessSession session = null;
            try {
                List records = processRecordsInput.records();
                if (!records.isEmpty()) {
                    ArrayList<FlowFile> flowFiles = new ArrayList<FlowFile>(records.size());
                    StopWatch stopWatch = new StopWatch(true);
                    session = this.sessionFactory.createSession();
                    this.startProcessingRecords();
                    int recordsTransformed = this.processRecordsWithRetries(records, flowFiles, session, stopWatch);
                    this.transferTo(ConsumeKinesisStream.REL_SUCCESS, session, records.size(), recordsTransformed, flowFiles);
                    session.commitAsync(() -> {
                        this.processingRecords = false;
                        this.checkpointOnceEveryCheckpointInterval(processRecordsInput.checkpointer());
                    });
                }
            }
            catch (Exception e) {
                this.log.error("Unable to fully process received Kinesis record(s) due to {}", new Object[]{e.getLocalizedMessage(), e});
                if (session == null) break block6;
                session.rollback();
            }
        }
    }

    void startProcessingRecords() {
        this.processingRecords = true;
    }

    private int processRecordsWithRetries(List<KinesisClientRecord> records, List<FlowFile> flowFiles, ProcessSession session, StopWatch stopWatch) {
        int recordsTransformed = 0;
        for (int r = 0; r < records.size(); ++r) {
            KinesisClientRecord kinesisRecord = records.get(r);
            boolean processedSuccessfully = false;
            for (int i = 0; !processedSuccessfully && i < this.numRetries; ++i) {
                processedSuccessfully = this.attemptProcessRecord(flowFiles, kinesisRecord, r == records.size() - 1, session, stopWatch);
            }
            if (processedSuccessfully) {
                ++recordsTransformed;
                continue;
            }
            this.log.error("Couldn't process Kinesis record {}, skipping.", new Object[]{kinesisRecord});
        }
        return recordsTransformed;
    }

    private boolean attemptProcessRecord(List<FlowFile> flowFiles, KinesisClientRecord kinesisRecord, boolean lastRecord, ProcessSession session, StopWatch stopWatch) {
        boolean processedSuccessfully = false;
        try {
            this.processRecord(flowFiles, kinesisRecord, lastRecord, session, stopWatch);
            processedSuccessfully = true;
        }
        catch (Exception e) {
            this.log.error("Caught Exception while processing Kinesis record {}", new Object[]{kinesisRecord, e});
            try {
                Thread.sleep(this.retryWaitMillis);
            }
            catch (InterruptedException ie) {
                this.log.debug("Interrupted sleep during record processing back-off", (Throwable)ie);
            }
        }
        return processedSuccessfully;
    }

    abstract void processRecord(List<FlowFile> var1, KinesisClientRecord var2, boolean var3, ProcessSession var4, StopWatch var5);

    void reportProvenance(ProcessSession session, FlowFile flowFile, String partitionKey, String sequenceNumber, StopWatch stopWatch) {
        String transitUri = StringUtils.isNotBlank((String)partitionKey) && StringUtils.isNotBlank((String)sequenceNumber) ? String.format("%s/%s/%s#%s", this.transitUriPrefix, this.kinesisShardId, partitionKey, sequenceNumber) : String.format("%s/%s", this.transitUriPrefix, this.kinesisShardId);
        session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
    }

    Map<String, String> getDefaultAttributes(String sequenceNumber, String partitionKey, Instant approximateArrivalTimestamp) {
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put(AWS_KINESIS_SHARD_ID, this.kinesisShardId);
        attributes.put(AWS_KINESIS_SEQUENCE_NUMBER, sequenceNumber);
        attributes.put(AWS_KINESIS_PARTITION_KEY, partitionKey);
        if (approximateArrivalTimestamp != null) {
            attributes.put(AWS_KINESIS_APPROXIMATE_ARRIVAL_TIMESTAMP, this.dateTimeFormatter.format(approximateArrivalTimestamp.atZone(ZoneId.systemDefault())));
        }
        return attributes;
    }

    void transferTo(Relationship relationship, ProcessSession session, int recordsProcessed, int recordsTransformed, List<FlowFile> flowFiles) {
        session.adjustCounter("Records Processed", (long)recordsProcessed, false);
        if (!flowFiles.isEmpty()) {
            session.adjustCounter("Records Transformed", (long)recordsTransformed, false);
            session.transfer(flowFiles, relationship);
        }
    }

    private void checkpointOnceEveryCheckpointInterval(RecordProcessorCheckpointer checkpointer) {
        if (System.currentTimeMillis() > this.nextCheckpointTimeInMillis) {
            this.checkpointWithRetries(checkpointer);
            this.nextCheckpointTimeInMillis = System.currentTimeMillis() + this.checkpointIntervalMillis;
        }
    }

    public void leaseLost(LeaseLostInput leaseLostInput) {
        this.log.debug("Lease lost");
    }

    public void shardEnded(ShardEndedInput shardEndedInput) {
        this.log.debug("Shutting down Record Processor for shard: {} with reason: Shard Ended", new Object[]{this.kinesisShardId});
        this.checkpointWithRetries(shardEndedInput.checkpointer());
    }

    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        this.log.debug("Shutting down Record Processor for shard: {} with reason: Shutdown Requested", new Object[]{this.kinesisShardId});
        for (int i = 0; this.processingRecords && i < this.numRetries; ++i) {
            this.log.debug("Record Processor for shard {} still processing records, waiting before shutdown", new Object[]{this.kinesisShardId});
            try {
                Thread.sleep(this.retryWaitMillis);
                continue;
            }
            catch (InterruptedException ie) {
                this.log.debug("Interrupted sleep while waiting for record processing to complete before shutdown (TERMINATE)", (Throwable)ie);
            }
        }
        if (this.processingRecords) {
            this.log.warn("Record Processor for shard {} still running, but maximum wait time elapsed, checkpoint will be attempted", new Object[]{this.kinesisShardId});
        }
        this.checkpointWithRetries(shutdownRequestedInput.checkpointer());
    }

    private void checkpointWithRetries(RecordProcessorCheckpointer checkpointer) {
        this.log.debug("Checkpointing shard {}", new Object[]{this.kinesisShardId});
        try {
            for (int i = 0; i < this.numRetries && !this.attemptCheckpoint(checkpointer, i); ++i) {
            }
        }
        catch (ShutdownException se) {
            this.log.info("Caught shutdown exception, skipping checkpoint.", (Throwable)se);
        }
        catch (InvalidStateException e) {
            this.log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", (Throwable)e);
        }
    }

    private boolean attemptCheckpoint(RecordProcessorCheckpointer checkpointer, int attempt) throws ShutdownException, InvalidStateException {
        boolean success = false;
        try {
            checkpointer.checkpoint();
            success = true;
        }
        catch (ThrottlingException e) {
            if (attempt >= this.numRetries - 1) {
                this.log.error("Checkpoint failed after {} attempts.", new Object[]{attempt + 1, e});
            }
            this.log.warn("Transient issue when checkpointing - attempt {} of {}", new Object[]{attempt + 1, this.numRetries, e});
            try {
                Thread.sleep(this.retryWaitMillis);
            }
            catch (InterruptedException ie) {
                this.log.debug("Interrupted sleep during checkpoint back-off", (Throwable)ie);
            }
        }
        return success;
    }

    ComponentLog getLogger() {
        return this.log;
    }

    String getStreamName() {
        return this.streamName;
    }

    String getKinesisShardId() {
        return this.kinesisShardId;
    }

    void setKinesisShardId(String kinesisShardId) {
        this.kinesisShardId = kinesisShardId;
    }

    long getNextCheckpointTimeInMillis() {
        return this.nextCheckpointTimeInMillis;
    }

    void setNextCheckpointTimeInMillis(long nextCheckpointTimeInMillis) {
        this.nextCheckpointTimeInMillis = nextCheckpointTimeInMillis;
    }

    void setProcessingRecords(boolean processingRecords) {
        this.processingRecords = processingRecords;
    }
}

