package com.aliyun.openservices.ots.internal.streamclient.core;

import com.aliyun.openservices.ots.internal.streamclient.DependencyException;
import com.aliyun.openservices.ots.internal.streamclient.StreamClientException;
import com.aliyun.openservices.ots.internal.streamclient.StreamConfig;
import com.aliyun.openservices.ots.internal.streamclient.core.task.BlockOnParentShardTask;
import com.aliyun.openservices.ots.internal.streamclient.core.task.ITask;
import com.aliyun.openservices.ots.internal.streamclient.core.task.InitializeTask;
import com.aliyun.openservices.ots.internal.streamclient.core.task.ProcessTask;
import com.aliyun.openservices.ots.internal.streamclient.core.task.RetryingTaskDecorator;
import com.aliyun.openservices.ots.internal.streamclient.core.task.ShutdownTask;
import com.aliyun.openservices.ots.internal.streamclient.core.task.TaskResult;
import com.aliyun.openservices.ots.internal.streamclient.core.task.TaskType;
import com.aliyun.openservices.ots.internal.streamclient.lease.ShardLease;
import com.aliyun.openservices.ots.internal.streamclient.lease.interfaces.ILeaseManager;
import com.aliyun.openservices.ots.internal.streamclient.model.ICheckpointTracker;
import com.aliyun.openservices.ots.internal.streamclient.model.IRecordProcessor;
import com.aliyun.openservices.ots.internal.streamclient.model.IRetryStrategy;
import com.aliyun.openservices.ots.internal.streamclient.model.IShutdownMarker;
import com.aliyun.openservices.ots.internal.streamclient.model.ShardInfo;
import com.aliyun.openservices.ots.internal.streamclient.model.ShutdownReason;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/openservices/ots/internal/streamclient/core/ShardConsumer.class */
class ShardConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
    private final ShardInfo shardInfo;
    private final StreamConfig streamConfig;
    private final IRecordProcessor recordProcessor;
    private final ILeaseManager<ShardLease> leaseManager;
    private final long parentShardPollIntervalMillis;
    private final ExecutorService executorService;
    private ICheckpointTracker checkpointTracker;
    private final RecordProcessorCheckpointer recordProcessorCheckpointer;
    private final DataFetcher dataFetcher;
    private final ShardSyncer shardSyncer;
    private final IRetryStrategy taskRetryStrategy;
    private ITask currentTask;
    private long currentTaskSubmitTime;
    private Future<TaskResult> future;
    private boolean beginShutdown;
    private ShutdownReason shutdownReason;
    private ShardConsumerState currentState = ShardConsumerState.WAITING_ON_PARENT_SHARDS;
    private IShutdownMarker shutdownMarker = new IShutdownMarker() { // from class: com.aliyun.openservices.ots.internal.streamclient.core.ShardConsumer.1
        @Override // com.aliyun.openservices.ots.internal.streamclient.model.IShutdownMarker
        public void markForProcessDone() {
            ShardConsumer.this.markForShutdown(ShutdownReason.PROCESS_DONE);
        }

        @Override // com.aliyun.openservices.ots.internal.streamclient.model.IShutdownMarker
        public void markForProcessRestart() {
            ShardConsumer.this.markForShutdown(ShutdownReason.PROCESS_RESTART);
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/aliyun/openservices/ots/internal/streamclient/core/ShardConsumer$ShardConsumerState.class */
    public enum ShardConsumerState {
        WAITING_ON_PARENT_SHARDS,
        INITIALIZING,
        PROCESSING,
        SHUTTING_DOWN,
        SHUTDOWN_COMPLETE
    }

    public ShardConsumer(ShardInfo shardInfo, StreamConfig streamConfig, ICheckpointTracker iCheckpointTracker, IRecordProcessor iRecordProcessor, ILeaseManager<ShardLease> iLeaseManager, long j, ExecutorService executorService, ShardSyncer shardSyncer, IRetryStrategy iRetryStrategy) {
        this.shardInfo = shardInfo;
        this.streamConfig = streamConfig;
        this.checkpointTracker = iCheckpointTracker;
        this.recordProcessor = iRecordProcessor;
        this.leaseManager = iLeaseManager;
        this.parentShardPollIntervalMillis = j;
        this.executorService = executorService;
        this.shardSyncer = shardSyncer;
        this.taskRetryStrategy = iRetryStrategy;
        this.dataFetcher = new DataFetcher(streamConfig.getOTSClient(), shardInfo);
        this.recordProcessorCheckpointer = new RecordProcessorCheckpointer(shardInfo, iCheckpointTracker, this.dataFetcher);
    }

    public synchronized boolean consumeShard() throws StreamClientException, DependencyException {
        return checkAndSubmitNextTask();
    }

    synchronized boolean checkAndSubmitNextTask() throws StreamClientException, DependencyException {
        if (this.future != null && !this.future.isCancelled() && !this.future.isDone()) {
            return false;
        }
        boolean z = false;
        if (this.future != null && this.future.isDone()) {
            try {
                TaskResult taskResult = this.future.get();
                if (taskResult.getException() != null) {
                    LOG.error("ShardId: {}, Task: {}, Exception: {}.", new Object[]{this.shardInfo.getShardId(), this.currentTask.getTaskType(), taskResult.getException()});
                    throw taskResult.getException();
                }
                z = taskResult.isPhaseCompleted();
                LOG.debug("PreviousTaskDone, ShardId: {}, Task: {}, IsPhaseCompleted: {}.", new Object[]{this.shardInfo.getShardId(), this.currentTask.getTaskType(), Boolean.valueOf(z)});
            } catch (DependencyException e) {
                throw e;
            } catch (StreamClientException e2) {
                throw e2;
            } catch (Exception e3) {
                throw new StreamClientException(e3.getMessage(), e3);
            }
        }
        boolean z2 = false;
        updateState(z);
        ITask nextTask = getNextTask();
        if (nextTask != null) {
            this.currentTask = nextTask;
            this.future = this.executorService.submit(this.currentTask);
            this.currentTaskSubmitTime = System.currentTimeMillis();
            z2 = true;
            LOG.debug("SubmitNewTask, ShardId: {}, Task: {}.", this.shardInfo.getShardId(), this.currentTask.getTaskType());
        }
        return z2;
    }

    ITask getNextTask() {
        RetryingTaskDecorator retryingTaskDecorator = null;
        switch (this.currentState) {
            case WAITING_ON_PARENT_SHARDS:
                retryingTaskDecorator = new RetryingTaskDecorator(IRetryStrategy.RetryableAction.TASK_BLOCK_ON_PARENT_SHARD, this.taskRetryStrategy, new BlockOnParentShardTask(this.shardInfo, this.leaseManager, this.checkpointTracker, this.parentShardPollIntervalMillis));
                break;
            case INITIALIZING:
                retryingTaskDecorator = new RetryingTaskDecorator(IRetryStrategy.RetryableAction.TASK_INITIALIZE, this.taskRetryStrategy, new InitializeTask(this.shardInfo, this.recordProcessor, this.checkpointTracker, this.recordProcessorCheckpointer, this.dataFetcher, this.shutdownMarker));
                break;
            case PROCESSING:
                retryingTaskDecorator = new RetryingTaskDecorator(IRetryStrategy.RetryableAction.TASK_PROCESS, this.taskRetryStrategy, new ProcessTask(this.shardInfo, this.recordProcessor, this.recordProcessorCheckpointer, this.dataFetcher, this.streamConfig, this.shutdownMarker));
                break;
            case SHUTTING_DOWN:
                retryingTaskDecorator = new RetryingTaskDecorator(IRetryStrategy.RetryableAction.TASK_SHUTDOWN, this.taskRetryStrategy, new ShutdownTask(this.shardInfo, this.recordProcessor, this.recordProcessorCheckpointer, this.shutdownReason, this.shardSyncer));
                break;
        }
        return retryingTaskDecorator;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean beginShutdown(ShutdownReason shutdownReason) throws StreamClientException, DependencyException {
        if (!isShutdown()) {
            markForShutdown(shutdownReason);
            checkAndSubmitNextTask();
        }
        return isShutdown();
    }

    synchronized void markForShutdown(ShutdownReason shutdownReason) {
        this.beginShutdown = true;
        this.shutdownReason = shutdownReason;
    }

    public boolean isShutdown() {
        return this.currentState == ShardConsumerState.SHUTDOWN_COMPLETE;
    }

    public ShutdownReason getShutdownReason() {
        return this.shutdownReason;
    }

    void updateState(boolean z) {
        switch (this.currentState) {
            case WAITING_ON_PARENT_SHARDS:
                if (z && TaskType.BLOCK_ON_PARENT_SHARDS.equals(this.currentTask.getTaskType())) {
                    if (this.beginShutdown) {
                        this.currentState = ShardConsumerState.SHUTDOWN_COMPLETE;
                        return;
                    } else {
                        this.currentState = ShardConsumerState.INITIALIZING;
                        return;
                    }
                }
                if (this.currentTask == null && this.beginShutdown) {
                    this.currentState = ShardConsumerState.SHUTDOWN_COMPLETE;
                    return;
                }
                return;
            case INITIALIZING:
                if (z && TaskType.INITIALIZE.equals(this.currentTask.getTaskType())) {
                    if (this.beginShutdown) {
                        this.currentState = ShardConsumerState.SHUTTING_DOWN;
                        return;
                    } else {
                        this.currentState = ShardConsumerState.PROCESSING;
                        return;
                    }
                }
                if (this.currentTask == null && this.beginShutdown) {
                    this.currentState = ShardConsumerState.SHUTDOWN_COMPLETE;
                    return;
                }
                return;
            case PROCESSING:
                if (TaskType.PROCESS.equals(this.currentTask.getTaskType())) {
                    if (this.beginShutdown) {
                        this.currentState = ShardConsumerState.SHUTTING_DOWN;
                        return;
                    } else {
                        if (z) {
                            markForShutdown(ShutdownReason.TERMINATE);
                            this.currentState = ShardConsumerState.SHUTTING_DOWN;
                            return;
                        }
                        return;
                    }
                }
                return;
            case SHUTTING_DOWN:
                if (this.currentTask == null || (z && TaskType.SHUTDOWN.equals(this.currentTask.getTaskType()))) {
                    this.currentState = ShardConsumerState.SHUTDOWN_COMPLETE;
                    return;
                }
                return;
            case SHUTDOWN_COMPLETE:
                return;
            default:
                LOG.error("Unexpected state: " + this.currentState);
                return;
        }
    }

    ShardConsumerState getCurrentState() {
        return this.currentState;
    }
}
