/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import com.amazonaws.services.cloudwatch.model.StandardUnit;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.TaskResult;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.TaskType;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.Record;
import java.math.BigInteger;
import java.util.List;
import java.util.ListIterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

class ProcessTask
implements ITask {
    private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
    private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed";
    private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed";
    private static final Log LOG = LogFactory.getLog(ProcessTask.class);
    private final ShardInfo shardInfo;
    private final IRecordProcessor recordProcessor;
    private final RecordProcessorCheckpointer recordProcessorCheckpointer;
    private final KinesisDataFetcher dataFetcher;
    private final TaskType taskType = TaskType.PROCESS;
    private final StreamConfig streamConfig;
    private final long backoffTimeMillis;

    public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, long backoffTimeMillis) {
        this.shardInfo = shardInfo;
        this.recordProcessor = recordProcessor;
        this.recordProcessorCheckpointer = recordProcessorCheckpointer;
        this.dataFetcher = dataFetcher;
        this.streamConfig = streamConfig;
        this.backoffTimeMillis = backoffTimeMillis;
    }

    @Override
    public TaskResult call() {
        long startTimeMillis = System.currentTimeMillis();
        IMetricsScope scope = MetricsHelper.getMetricsScope();
        scope.addDimension("ShardId", this.shardInfo.getShardId());
        scope.addData(RECORDS_PROCESSED_METRIC, 0.0, StandardUnit.Count);
        scope.addData(DATA_BYTES_PROCESSED_METRIC, 0.0, StandardUnit.Bytes);
        Exception exception = null;
        try {
            if (this.dataFetcher.isShardEndReached()) {
                LOG.info((Object)("Reached end of shard " + this.shardInfo.getShardId()));
                boolean shardEndReached = true;
                return new TaskResult(null, shardEndReached);
            }
            List<Record> records = this.getRecords();
            if (records.isEmpty()) {
                LOG.debug((Object)("Kinesis didn't return any records for shard " + this.shardInfo.getShardId()));
                long sleepTimeMillis = this.streamConfig.getIdleTimeInMilliseconds() - (System.currentTimeMillis() - startTimeMillis);
                if (sleepTimeMillis > 0L) {
                    sleepTimeMillis = Math.max(sleepTimeMillis, this.streamConfig.getIdleTimeInMilliseconds());
                    try {
                        LOG.debug((Object)("Sleeping for " + sleepTimeMillis + " ms since there were no new records in shard " + this.shardInfo.getShardId()));
                        Thread.sleep(sleepTimeMillis);
                    }
                    catch (InterruptedException e) {
                        LOG.debug((Object)("ShardId " + this.shardInfo.getShardId() + ": Sleep was interrupted"));
                    }
                }
            }
            if (!records.isEmpty() || this.streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList()) {
                if (!records.isEmpty()) {
                    String maxSequenceNumber = this.getMaxSequenceNumber(scope, records);
                    this.recordProcessorCheckpointer.setLargestPermittedCheckpointValue(maxSequenceNumber);
                }
                try {
                    LOG.debug((Object)("Calling application processRecords() with " + records.size() + " records from " + this.shardInfo.getShardId()));
                    this.recordProcessor.processRecords(records, this.recordProcessorCheckpointer);
                }
                catch (Exception e) {
                    LOG.error((Object)("ShardId " + this.shardInfo.getShardId() + ": Application processRecords() threw an exception when processing shard "), (Throwable)e);
                    LOG.error((Object)("ShardId " + this.shardInfo.getShardId() + ": Skipping over the following data records: " + records));
                }
            }
        }
        catch (KinesisClientLibException | RuntimeException e) {
            LOG.error((Object)("ShardId " + this.shardInfo.getShardId() + ": Caught exception: "), (Throwable)e);
            exception = e;
            try {
                Thread.sleep(this.backoffTimeMillis);
            }
            catch (InterruptedException ie) {
                LOG.debug((Object)(this.shardInfo.getShardId() + ": Sleep was interrupted"), (Throwable)ie);
            }
        }
        return new TaskResult(exception);
    }

    private String getMaxSequenceNumber(IMetricsScope scope, List<Record> records) {
        scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count);
        ListIterator<Record> recordIterator = records.listIterator();
        BigInteger maxSequenceNumber = BigInteger.ZERO;
        while (recordIterator.hasNext()) {
            Record record = recordIterator.next();
            BigInteger sequenceNumber = new BigInteger(record.getSequenceNumber());
            if (maxSequenceNumber.compareTo(sequenceNumber) < 0) {
                maxSequenceNumber = sequenceNumber;
            }
            scope.addData(DATA_BYTES_PROCESSED_METRIC, record.getData().limit(), StandardUnit.Bytes);
        }
        return maxSequenceNumber.toString();
    }

    private List<Record> getRecords() throws KinesisClientLibException {
        int maxRecords = this.streamConfig.getMaxRecords();
        try {
            return this.dataFetcher.getRecords(maxRecords);
        }
        catch (ExpiredIteratorException e) {
            LOG.info((Object)("ShardId " + this.shardInfo.getShardId() + ": getRecords threw ExpiredIteratorException - restarting after greatest seqNum " + "passed to customer"), (Throwable)e);
            MetricsHelper.getMetricsScope().addData(EXPIRED_ITERATOR_METRIC, 1.0, StandardUnit.Count);
            this.dataFetcher.advanceIteratorAfter(this.recordProcessorCheckpointer.getLargestPermittedCheckpointValue());
            try {
                return this.dataFetcher.getRecords(maxRecords);
            }
            catch (ExpiredIteratorException ex) {
                String msg = "Shard " + this.shardInfo.getShardId() + ": getRecords threw ExpiredIteratorException with a fresh iterator.";
                LOG.error((Object)msg, (Throwable)ex);
                throw ex;
            }
        }
    }

    @Override
    public TaskType getTaskType() {
        return this.taskType;
    }
}

