package com.aliyun.openservices.ots.internal.streamclient.core;

import com.alicloud.openservices.tablestore.model.StreamDetails;
import com.aliyun.openservices.ots.internal.streamclient.ClientConfig;
import com.aliyun.openservices.ots.internal.streamclient.DependencyException;
import com.aliyun.openservices.ots.internal.streamclient.StreamClientException;
import com.aliyun.openservices.ots.internal.streamclient.StreamConfig;
import com.aliyun.openservices.ots.internal.streamclient.core.task.TaskRetryStrategy;
import com.aliyun.openservices.ots.internal.streamclient.lease.LeaseCoordinator;
import com.aliyun.openservices.ots.internal.streamclient.lease.LeaseManager;
import com.aliyun.openservices.ots.internal.streamclient.lease.LeaseManagerRetryStrategy;
import com.aliyun.openservices.ots.internal.streamclient.lease.ShardLease;
import com.aliyun.openservices.ots.internal.streamclient.lease.ShardLeaseSerializer;
import com.aliyun.openservices.ots.internal.streamclient.lease.interfaces.ILeaseManager;
import com.aliyun.openservices.ots.internal.streamclient.model.ICheckpointTracker;
import com.aliyun.openservices.ots.internal.streamclient.model.IRecordProcessorFactory;
import com.aliyun.openservices.ots.internal.streamclient.model.IRetryStrategy;
import com.aliyun.openservices.ots.internal.streamclient.model.ShardInfo;
import com.aliyun.openservices.ots.internal.streamclient.model.ShutdownReason;
import com.aliyun.openservices.ots.internal.streamclient.model.WorkerStatus;
import com.aliyun.openservices.ots.internal.streamclient.utils.OTSHelper;
import com.aliyun.openservices.ots.internal.streamclient.utils.Preconditions;
import com.aliyun.openservices.ots.internal.streamclient.utils.TimeUtils;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/openservices/ots/internal/streamclient/core/InnerWorker.class */
public class InnerWorker implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(InnerWorker.class);
    private final String workerIdentifier;
    private final ClientConfig clientConfig;
    private final StreamConfig streamConfig;
    private final IRecordProcessorFactory recordProcessorFactory;
    private final ILeaseManager<ShardLease> leaseManager;
    private final ExecutorService executorService;
    private final LeaseCoordinator<ShardLease> leaseCoordinator;
    private final ShardSyncer shardSyncer;
    private final ICheckpointTracker checkpointTracker;
    private final IRetryStrategy taskRetryStrategy;
    private final IRetryStrategy leaseManagerRetryStrategy;
    long lastSyncShardTimeMillis;
    private ConcurrentMap<ShardInfo, ShardConsumer> shardConsumerMap = new ConcurrentHashMap();
    private volatile boolean running;
    private volatile boolean shutdown;
    private Exception exception;
    private String streamId;

    public InnerWorker(String str, ClientConfig clientConfig, StreamConfig streamConfig, IRecordProcessorFactory iRecordProcessorFactory, ExecutorService executorService, ILeaseManager<ShardLease> iLeaseManager, LeaseCoordinator<ShardLease> leaseCoordinator, ICheckpointTracker iCheckpointTracker) {
        Preconditions.checkArgument((str == null || str.isEmpty()) ? false : true, "workerIdentifier should not be null or empty");
        Preconditions.checkNotNull(clientConfig);
        Preconditions.checkNotNull(streamConfig);
        Preconditions.checkNotNull(streamConfig.getOTSClient());
        Preconditions.checkNotNull(iRecordProcessorFactory);
        Preconditions.checkNotNull(executorService);
        LOG.info("Initialize inner worker.");
        LOG.info("ClientConfig: {}", clientConfig);
        LOG.info("StreamConfig: {}", streamConfig);
        this.workerIdentifier = str;
        this.clientConfig = clientConfig;
        this.streamConfig = streamConfig;
        this.recordProcessorFactory = iRecordProcessorFactory;
        this.executorService = executorService;
        StreamDetails streamDetails = OTSHelper.getStreamDetails(this.streamConfig.getOTSClient(), this.streamConfig.getDataTableName());
        if (!streamDetails.isEnableStream()) {
            throw new IllegalArgumentException("The data table does not enable stream.");
        }
        this.streamId = streamDetails.getStreamId();
        if (clientConfig.getTaskRetryStrategy() != null) {
            this.taskRetryStrategy = clientConfig.getTaskRetryStrategy();
        } else {
            this.taskRetryStrategy = new TaskRetryStrategy();
        }
        if (clientConfig.getLeaseManagerRetryStrategy() != null) {
            this.leaseManagerRetryStrategy = clientConfig.getLeaseManagerRetryStrategy();
        } else {
            this.leaseManagerRetryStrategy = new LeaseManagerRetryStrategy();
        }
        if (iLeaseManager != null) {
            this.leaseManager = iLeaseManager;
        } else {
            this.leaseManager = new LeaseManager(this.streamConfig.getOTSClient(), this.streamConfig.getStatusTableName(), new ShardLeaseSerializer(this.streamConfig.getStatusTableName(), this.streamId), this.leaseManagerRetryStrategy, this.clientConfig.getCheckTableReadyIntervalMillis());
        }
        if (leaseCoordinator != null) {
            this.leaseCoordinator = leaseCoordinator;
        } else {
            this.leaseCoordinator = new LeaseCoordinator<>(this.leaseManager, this.workerIdentifier, this.clientConfig);
        }
        if (iCheckpointTracker != null) {
            this.checkpointTracker = iCheckpointTracker;
        } else {
            this.checkpointTracker = new CheckpointTracker(this.leaseManager, this.leaseCoordinator);
        }
        this.shardSyncer = new ShardSyncer(this.streamConfig, this.leaseManager, this.executorService, this.taskRetryStrategy);
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            if (this.running || this.shutdown) {
                throw new StreamClientException("Can't rerun a worker.");
            }
            this.running = true;
            initialize();
            this.leaseCoordinator.start();
            this.lastSyncShardTimeMillis = System.currentTimeMillis();
            while (!this.shutdown) {
                runProcessLoop();
                TimeUtils.sleepMillis(this.clientConfig.getWorkerIdleTimeMillis());
            }
        } catch (Throwable th) {
            LOG.error("Exception: {}.", th);
            if (th instanceof Exception) {
                this.exception = (Exception) th;
            } else {
                this.exception = new RuntimeException(th);
            }
            shutdown();
        }
    }

    void runProcessLoop() throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        getCurrentlyHeldShards(arrayList, arrayList2);
        Iterator<ShardInfo> it = arrayList.iterator();
        while (it.hasNext()) {
            ShardConsumer createOrGetShardConsumer = createOrGetShardConsumer(it.next());
            if (!createOrGetShardConsumer.isShutdown()) {
                createOrGetShardConsumer.consumeShard();
            }
        }
        if (System.currentTimeMillis() - this.lastSyncShardTimeMillis > this.clientConfig.getSyncShardIntervalMillis() && this.shardSyncer.syncShardAndLeaseInfo(false)) {
            this.lastSyncShardTimeMillis = System.currentTimeMillis();
        }
        cleanupShardConsumers(arrayList, arrayList2);
        this.leaseCoordinator.checkRenewerAndTakerStatus(this.clientConfig.getMaxDurationBeforeLastSuccessfulRenewOrTakeLease());
    }

    public void shutdown() {
        this.running = false;
        this.shutdown = true;
        this.leaseCoordinator.stop();
    }

    public WorkerStatus getWorkerStatus() {
        return this.exception != null ? WorkerStatus.ERROR : this.running ? WorkerStatus.RUNNING : this.shutdown ? WorkerStatus.SHUTDOWN : WorkerStatus.NOT_RUNNING;
    }

    public Exception getException() {
        return this.exception;
    }

    private void initialize() throws StreamClientException, DependencyException {
        this.leaseCoordinator.initialize();
        this.shardSyncer.syncShardAndLeaseInfo(true);
    }

    private void getCurrentlyHeldShards(List<ShardInfo> list, List<ShardInfo> list2) {
        list.clear();
        list2.clear();
        for (ShardLease shardLease : this.leaseCoordinator.getCurrentlyHeldLeases()) {
            if (shardLease.getLeaseStealer().isEmpty()) {
                LOG.debug("Currently held shard: {}", shardLease);
                list.add(shardLease.toShardInfo());
            } else {
                LOG.debug("Currently stolen shard: {}", shardLease);
                list2.add(shardLease.toShardInfo());
            }
        }
    }

    private ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo) {
        ShardConsumer shardConsumer = this.shardConsumerMap.get(shardInfo);
        if (shardConsumer == null || (shardConsumer.isShutdown() && shardConsumer.getShutdownReason() == ShutdownReason.PROCESS_RESTART)) {
            shardConsumer = new ShardConsumer(shardInfo, this.streamConfig, this.checkpointTracker, this.recordProcessorFactory.createProcessor(), this.leaseManager, this.clientConfig.getParentShardPollIntervalMillis(), this.executorService, this.shardSyncer, this.taskRetryStrategy);
            this.shardConsumerMap.put(shardInfo, shardConsumer);
            LOG.info("CreateNewConsumer, ShardInfo: {}.", shardInfo);
        }
        return shardConsumer;
    }

    private void cleanupShardConsumers(List<ShardInfo> list, List<ShardInfo> list2) throws StreamClientException, DependencyException {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        Iterator<ShardInfo> it = list.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next());
        }
        Iterator<ShardInfo> it2 = list2.iterator();
        while (it2.hasNext()) {
            hashSet2.add(it2.next());
        }
        for (ShardInfo shardInfo : this.shardConsumerMap.keySet()) {
            if (!hashSet.contains(shardInfo)) {
                boolean contains = hashSet2.contains(shardInfo);
                ShutdownReason shutdownReason = contains ? ShutdownReason.STOLEN : ShutdownReason.ZOMBIE;
                boolean beginShutdown = this.shardConsumerMap.get(shardInfo).beginShutdown(shutdownReason);
                LOG.info("CleanConsumer, ShardInfo: {}, Reason: {}, IsShutDown: {}.", new Object[]{shardInfo, shutdownReason, Boolean.valueOf(beginShutdown)});
                if (beginShutdown) {
                    this.shardConsumerMap.remove(shardInfo);
                    if (contains) {
                        LOG.info("Transfer lease: {}.", shardInfo);
                        this.leaseCoordinator.transferLease(shardInfo.getShardId(), shardInfo.getLeaseIdentifier());
                    }
                }
            }
        }
    }
}
