/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.CheckpointValueComparator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.SequenceNumberValidator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.TaskResult;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.TaskType;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

class ShardConsumer {
    private static final Log LOG = LogFactory.getLog(ShardConsumer.class);
    private final StreamConfig streamConfig;
    private final IRecordProcessor recordProcessor;
    private final RecordProcessorCheckpointer recordProcessorCheckpointer;
    private final ExecutorService executorService;
    private final ShardInfo shardInfo;
    private final KinesisDataFetcher dataFetcher;
    private final IMetricsFactory metricsFactory;
    private final ILeaseManager<KinesisClientLease> leaseManager;
    private ICheckpoint checkpoint;
    private final long parentShardPollIntervalMillis;
    private final boolean cleanupLeasesOfCompletedShards;
    private final long taskBackoffTimeMillis;
    private ITask currentTask;
    private long currentTaskSubmitTime;
    private Future<TaskResult> future;
    private ShardConsumerState currentState = ShardConsumerState.WAITING_ON_PARENT_SHARDS;
    private boolean beginShutdown;
    private ShutdownReason shutdownReason;

    ShardConsumer(ShardInfo shardInfo, StreamConfig streamConfig, ICheckpoint checkpoint, IRecordProcessor recordProcessor, ILeaseManager<KinesisClientLease> leaseManager, long parentShardPollIntervalMillis, boolean cleanupLeasesOfCompletedShards, ExecutorService executorService, IMetricsFactory metricsFactory, long backoffTimeMillis) {
        this.streamConfig = streamConfig;
        this.recordProcessor = recordProcessor;
        this.executorService = executorService;
        this.shardInfo = shardInfo;
        this.checkpoint = checkpoint;
        this.recordProcessorCheckpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint, new SequenceNumberValidator(streamConfig.getStreamProxy(), shardInfo.getShardId(), streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()), new CheckpointValueComparator());
        this.dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
        this.leaseManager = leaseManager;
        this.metricsFactory = metricsFactory;
        this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
        this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
        this.taskBackoffTimeMillis = backoffTimeMillis;
    }

    synchronized boolean consumeShard() {
        return this.checkAndSubmitNextTask();
    }

    private synchronized boolean checkAndSubmitNextTask() {
        boolean taskCompletedSuccessfully = false;
        boolean submittedNewTask = false;
        if (this.future == null || this.future.isCancelled() || this.future.isDone()) {
            block17: {
                if (this.future != null && this.future.isDone()) {
                    try {
                        TaskResult result = this.future.get();
                        if (result.getException() == null) {
                            taskCompletedSuccessfully = true;
                            if (result.isShardEndReached()) {
                                this.markForShutdown(ShutdownReason.TERMINATE);
                            }
                        } else if (LOG.isDebugEnabled()) {
                            Exception taskException = result.getException();
                            if (taskException instanceof BlockedOnParentShardException) {
                                LOG.debug((Object)("Shard " + this.shardInfo.getShardId() + " is blocked on completion of parent shard."));
                            } else {
                                LOG.debug((Object)("Caught exception running " + (Object)((Object)this.currentTask.getTaskType()) + " task: "), (Throwable)result.getException());
                            }
                        }
                    }
                    catch (InterruptedException e) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)((Object)((Object)this.currentTask.getTaskType()) + " task was interrupted: "), (Throwable)e);
                        }
                    }
                    catch (ExecutionException e) {
                        if (!LOG.isDebugEnabled()) break block17;
                        LOG.debug((Object)((Object)((Object)this.currentTask.getTaskType()) + " task encountered execution exception: "), (Throwable)e);
                    }
                }
            }
            this.updateState(taskCompletedSuccessfully);
            ITask nextTask = this.getNextTask();
            if (nextTask != null) {
                this.currentTask = nextTask;
                this.future = this.executorService.submit(this.currentTask);
                this.currentTaskSubmitTime = System.currentTimeMillis();
                submittedNewTask = true;
                LOG.debug((Object)("Submitted new " + (Object)((Object)this.currentTask.getTaskType()) + " task for shard " + this.shardInfo.getShardId()));
            } else if (LOG.isDebugEnabled()) {
                LOG.debug((Object)String.format("No new task to submit for shard %s, currentState %s", this.shardInfo.getShardId(), this.currentState.toString()));
            }
        } else if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Previous " + (Object)((Object)this.currentTask.getTaskType()) + " task still pending for shard " + this.shardInfo.getShardId() + " since " + (System.currentTimeMillis() - this.currentTaskSubmitTime) + " ms ago" + ".  Not submitting new task."));
        }
        return submittedNewTask;
    }

    synchronized boolean beginShutdown() {
        if (this.currentState != ShardConsumerState.SHUTDOWN_COMPLETE) {
            this.markForShutdown(ShutdownReason.ZOMBIE);
            this.checkAndSubmitNextTask();
        }
        return this.isShutdown();
    }

    synchronized void markForShutdown(ShutdownReason reason) {
        this.beginShutdown = true;
        if (this.shutdownReason == null || this.shutdownReason == ShutdownReason.TERMINATE) {
            this.shutdownReason = reason;
        }
    }

    boolean isShutdown() {
        return this.currentState == ShardConsumerState.SHUTDOWN_COMPLETE;
    }

    ShutdownReason getShutdownReason() {
        return this.shutdownReason;
    }

    private ITask getNextTask() {
        ITask nextTask = null;
        switch (this.currentState) {
            case WAITING_ON_PARENT_SHARDS: {
                nextTask = new BlockOnParentShardTask(this.shardInfo, this.leaseManager, this.parentShardPollIntervalMillis);
                break;
            }
            case INITIALIZING: {
                nextTask = new InitializeTask(this.shardInfo, this.recordProcessor, this.checkpoint, this.recordProcessorCheckpointer, this.dataFetcher, this.taskBackoffTimeMillis);
                break;
            }
            case PROCESSING: {
                nextTask = new ProcessTask(this.shardInfo, this.streamConfig, this.recordProcessor, this.recordProcessorCheckpointer, this.dataFetcher, this.taskBackoffTimeMillis);
                break;
            }
            case SHUTTING_DOWN: {
                nextTask = new ShutdownTask(this.shardInfo, this.recordProcessor, this.recordProcessorCheckpointer, this.shutdownReason, this.streamConfig.getStreamProxy(), this.streamConfig.getInitialPositionInStream(), this.cleanupLeasesOfCompletedShards, this.leaseManager, this.taskBackoffTimeMillis);
                break;
            }
            case SHUTDOWN_COMPLETE: {
                break;
            }
        }
        if (nextTask == null) {
            return null;
        }
        return new MetricsCollectingTaskDecorator(nextTask, this.metricsFactory);
    }

    void updateState(boolean taskCompletedSuccessfully) {
        switch (this.currentState) {
            case WAITING_ON_PARENT_SHARDS: {
                if (taskCompletedSuccessfully && TaskType.BLOCK_ON_PARENT_SHARDS.equals((Object)this.currentTask.getTaskType())) {
                    if (this.beginShutdown) {
                        this.currentState = ShardConsumerState.SHUTTING_DOWN;
                        break;
                    }
                    this.currentState = ShardConsumerState.INITIALIZING;
                    break;
                }
                if (this.currentTask != null || !this.beginShutdown) break;
                this.currentState = ShardConsumerState.SHUTDOWN_COMPLETE;
                break;
            }
            case INITIALIZING: {
                if (taskCompletedSuccessfully && TaskType.INITIALIZE.equals((Object)this.currentTask.getTaskType())) {
                    if (this.beginShutdown) {
                        this.currentState = ShardConsumerState.SHUTTING_DOWN;
                        break;
                    }
                    this.currentState = ShardConsumerState.PROCESSING;
                    break;
                }
                if (this.currentTask != null || !this.beginShutdown) break;
                this.currentState = ShardConsumerState.SHUTDOWN_COMPLETE;
                break;
            }
            case PROCESSING: {
                if (!taskCompletedSuccessfully || !TaskType.PROCESS.equals((Object)this.currentTask.getTaskType())) break;
                if (this.beginShutdown) {
                    this.currentState = ShardConsumerState.SHUTTING_DOWN;
                    break;
                }
                this.currentState = ShardConsumerState.PROCESSING;
                break;
            }
            case SHUTTING_DOWN: {
                if (this.currentTask != null && (!taskCompletedSuccessfully || !TaskType.SHUTDOWN.equals((Object)this.currentTask.getTaskType()))) break;
                this.currentState = ShardConsumerState.SHUTDOWN_COMPLETE;
                break;
            }
            case SHUTDOWN_COMPLETE: {
                break;
            }
            default: {
                LOG.error((Object)("Unexpected state: " + (Object)((Object)this.currentState)));
            }
        }
    }

    ShardConsumerState getCurrentState() {
        return this.currentState;
    }

    boolean isBeginShutdown() {
        return this.beginShutdown;
    }

    static enum ShardConsumerState {
        WAITING_ON_PARENT_SHARDS,
        INITIALIZING,
        PROCESSING,
        SHUTTING_DOWN,
        SHUTDOWN_COMPLETE;

    }
}

