/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.kinesis.connectors;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class KinesisClientLibraryPipelinedRecordProcessor
implements IRecordProcessor {
    public static final long DEFAULT_MAXIMUM_QUEUE_WAIT_TIME_MS = 5000L;
    public static final long DEFAULT_MAXIMUM_PROCESS_RECORDS_WAIT_TIME_MS = 60000L;
    private static final Log LOG = LogFactory.getLog(KinesisClientLibraryPipelinedRecordProcessor.class);
    private final long maxQueueWaitTimeMs;
    private final long maxProcessRecordsWaitTimeMs;
    private final BlockingQueue<Record> recordQueue;
    private final IRecordProcessor recordProcessor;
    private final ExecutorService queueConsumerExecutor = Executors.newSingleThreadExecutor();
    private QueueConsumer queueConsumer;
    private String shardId;

    public KinesisClientLibraryPipelinedRecordProcessor(IRecordProcessor recordProcessor, int maxQueueSize) {
        this(recordProcessor, maxQueueSize, 5000L, 60000L);
    }

    public KinesisClientLibraryPipelinedRecordProcessor(IRecordProcessor recordProcessor, int maxQueueSize, Long maxQueueWaitTimeMs, Long maxProcessRecordsWaitTimeMs) {
        this.recordProcessor = recordProcessor;
        this.recordQueue = new LinkedBlockingQueue<Record>(maxQueueSize);
        this.maxQueueWaitTimeMs = maxQueueWaitTimeMs == null ? 5000L : maxQueueWaitTimeMs;
        this.maxProcessRecordsWaitTimeMs = maxProcessRecordsWaitTimeMs == null ? 60000L : maxProcessRecordsWaitTimeMs;
    }

    public void initialize(String shardId) {
        if (shardId == null) {
            throw new IllegalArgumentException("ShardId cannot be null");
        }
        this.shardId = shardId;
        this.recordProcessor.initialize(shardId);
        this.queueConsumer = new QueueConsumer();
        this.queueConsumerExecutor.submit(this.queueConsumer);
        this.queueConsumerExecutor.shutdown();
        LOG.info((Object)("Initialized pipelined record processor for shard: " + shardId));
    }

    public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
        this.queueConsumer.setCheckpointer(checkpointer);
        for (Record record : records) {
            try {
                this.recordQueue.put(record);
            }
            catch (InterruptedException e) {
                LOG.error((Object)"Interrupted while adding a record to the queue", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        }
    }

    public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
        LOG.info((Object)("Shutting down pipelined processor for shard: " + this.shardId + " with reason:" + reason));
        this.queueConsumer.shutdown = true;
        try {
            if (this.queueConsumerExecutor.awaitTermination(this.maxProcessRecordsWaitTimeMs, TimeUnit.MILLISECONDS)) {
                ArrayList records = new ArrayList();
                this.recordQueue.drainTo(records);
                this.recordProcessor.processRecords(records, checkpointer);
                this.recordProcessor.shutdown(checkpointer, reason);
            } else {
                LOG.warn((Object)("Queue consumer took longer than " + this.maxProcessRecordsWaitTimeMs + " ms to complete. Shutdown task failed."));
            }
        }
        catch (InterruptedException e) {
            LOG.error((Object)"Interrupted while draining queue", (Throwable)e);
            Thread.currentThread().interrupt();
        }
    }

    IRecordProcessorCheckpointer protectCheckpointer(final IRecordProcessorCheckpointer checkpointer) {
        return new IRecordProcessorCheckpointer(){
            private final IRecordProcessorCheckpointer internalCheckpointer;
            {
                this.internalCheckpointer = checkpointer;
            }

            public void checkpoint(String sequenceNumber) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
                this.internalCheckpointer.checkpoint(sequenceNumber);
            }

            public void checkpoint() throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
                throw new UnsupportedOperationException();
            }

            public void checkpoint(Record record) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
                throw new UnsupportedOperationException();
            }

            public void checkpoint(String sequenceNumber, long subSequenceNumber) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
                throw new UnsupportedOperationException();
            }
        };
    }

    private class QueueConsumer
    implements Runnable {
        volatile boolean shutdown = false;
        private volatile IRecordProcessorCheckpointer checkpointer = null;

        private QueueConsumer() {
        }

        public void setCheckpointer(IRecordProcessorCheckpointer checkpointer) {
            this.checkpointer = KinesisClientLibraryPipelinedRecordProcessor.this.protectCheckpointer(checkpointer);
        }

        @Override
        public void run() {
            LOG.info((Object)("Starting queue consumer for shard: " + KinesisClientLibraryPipelinedRecordProcessor.this.shardId));
            while (!this.shutdown) {
                this.consumeQueue();
            }
            LOG.info((Object)("Queue consumer terminated for shard: " + KinesisClientLibraryPipelinedRecordProcessor.this.shardId));
        }

        private void consumeQueue() {
            ArrayList<Record> records = new ArrayList<Record>();
            int drained = 0;
            Record polled = null;
            try {
                polled = (Record)KinesisClientLibraryPipelinedRecordProcessor.this.recordQueue.poll(KinesisClientLibraryPipelinedRecordProcessor.this.maxQueueWaitTimeMs, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                LOG.error((Object)e);
                Thread.currentThread().interrupt();
            }
            if (polled == null) {
                KinesisClientLibraryPipelinedRecordProcessor.this.processRecords(records, this.checkpointer);
                return;
            }
            records.add(polled);
            ++drained;
            KinesisClientLibraryPipelinedRecordProcessor.this.recordProcessor.processRecords(records, this.checkpointer);
            LOG.info((Object)("Consumed " + (drained += KinesisClientLibraryPipelinedRecordProcessor.this.recordQueue.drainTo(records)) + " records"));
        }
    }
}

