package com.aliyun.openservices.ots.internal.streamclient.core.task;

import com.alicloud.openservices.tablestore.SyncClientInterface;
import com.alicloud.openservices.tablestore.model.Stream;
import com.alicloud.openservices.tablestore.model.StreamShard;
import com.aliyun.openservices.ots.internal.streamclient.DependencyException;
import com.aliyun.openservices.ots.internal.streamclient.StreamClientException;
import com.aliyun.openservices.ots.internal.streamclient.StreamConfig;
import com.aliyun.openservices.ots.internal.streamclient.lease.ShardLease;
import com.aliyun.openservices.ots.internal.streamclient.lease.interfaces.ILeaseManager;
import com.aliyun.openservices.ots.internal.streamclient.model.CheckpointPosition;
import com.aliyun.openservices.ots.internal.streamclient.utils.OTSHelper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/openservices/ots/internal/streamclient/core/task/ShardSyncTask.class */
public class ShardSyncTask implements ITask {
    private static final Logger LOG = LoggerFactory.getLogger(ShardSyncTask.class);
    private final SyncClientInterface ots;
    private final String tableName;
    private final ILeaseManager<ShardLease> leaseManager;
    private String streamId;

    public ShardSyncTask(StreamConfig streamConfig, ILeaseManager<ShardLease> iLeaseManager) {
        this.ots = streamConfig.getOTSClient();
        this.tableName = streamConfig.getDataTableName();
        this.leaseManager = iLeaseManager;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public TaskResult call() {
        LOG.debug("Start shard sync task.");
        try {
            checkAndCreateLeasesForNewShards();
            LOG.debug("Shard Sync task completed.");
            return new TaskResult(true);
        } catch (Exception e) {
            LOG.warn("Exception encountered in shard sync task: {}", e);
            return new TaskResult(e);
        }
    }

    @Override // com.aliyun.openservices.ots.internal.streamclient.core.task.ITask
    public TaskType getTaskType() {
        return TaskType.SHARDSYNC;
    }

    void checkAndCreateLeasesForNewShards() throws StreamClientException, DependencyException {
        List<StreamShard> listShard = OTSHelper.listShard(this.ots, getStreamId());
        List<ShardLease> listLeases = this.leaseManager.listLeases();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Current leases, count: {}." + listLeases.size());
            Iterator<ShardLease> it = listLeases.iterator();
            while (it.hasNext()) {
                LOG.debug("ShardLease: {}", it.next());
            }
        }
        for (ShardLease shardLease : determineNewLeasesToCreate(listShard, listLeases)) {
            this.leaseManager.createLease(shardLease);
            LOG.info("New lease created, Lease: {}.", shardLease);
        }
        cleanupGarbageLeases(listShard, listLeases);
    }

    List<ShardLease> determineNewLeasesToCreate(List<StreamShard> list, List<ShardLease> list2) throws DependencyException {
        HashSet hashSet = new HashSet();
        Iterator<ShardLease> it = list2.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getLeaseKey());
        }
        HashMap hashMap = new HashMap();
        for (StreamShard streamShard : list) {
            if (!hashSet.contains(streamShard.getShardId())) {
                ShardLease createNewShardLease = createNewShardLease(getStreamId(), streamShard);
                hashMap.put(createNewShardLease.getLeaseKey(), createNewShardLease);
            }
        }
        LinkedHashMap<String, ShardLease> linkedHashMap = new LinkedHashMap<>();
        Iterator<ShardLease> it2 = hashMap.values().iterator();
        while (it2.hasNext()) {
            sortLeaseInInherit(it2.next(), hashMap, linkedHashMap);
        }
        return new ArrayList(linkedHashMap.values());
    }

    private void sortLeaseInInherit(ShardLease shardLease, Map<String, ShardLease> map, LinkedHashMap<String, ShardLease> linkedHashMap) {
        Iterator<String> it = shardLease.getParentShardIds().iterator();
        while (it.hasNext()) {
            ShardLease shardLease2 = map.get(it.next());
            if (shardLease2 != null && !linkedHashMap.containsKey(shardLease2.getLeaseKey())) {
                sortLeaseInInherit(shardLease2, map, linkedHashMap);
            }
        }
        if (linkedHashMap.containsKey(shardLease.getLeaseKey())) {
            return;
        }
        linkedHashMap.put(shardLease.getLeaseKey(), shardLease);
    }

    ShardLease createNewShardLease(String str, StreamShard streamShard) {
        ShardLease shardLease = new ShardLease(streamShard.getShardId());
        shardLease.setStreamId(str);
        HashSet hashSet = new HashSet();
        if (streamShard.getParentId() != null) {
            hashSet.add(streamShard.getParentId());
        }
        if (streamShard.getParentSiblingId() != null) {
            hashSet.add(streamShard.getParentSiblingId());
        }
        shardLease.setParentShardIds(hashSet);
        shardLease.setCheckpoint(CheckpointPosition.TRIM_HORIZON);
        return shardLease;
    }

    String getStreamId() throws DependencyException {
        if (this.streamId != null) {
            return this.streamId;
        }
        List<Stream> listStream = OTSHelper.listStream(this.ots, this.tableName);
        if (listStream.isEmpty()) {
            throw new DependencyException("Can't get streamId. Please check whether to enable Stream.");
        }
        if (listStream.size() == 1) {
            this.streamId = listStream.get(0).getStreamId();
            return this.streamId;
        }
        LOG.error("Expect there is only one stream, tableName: {}.", this.tableName);
        Iterator<Stream> it = listStream.iterator();
        while (it.hasNext()) {
            LOG.error("Stream: {}", it.next());
        }
        throw new DependencyException("Expect there is only one stream.");
    }

    void cleanupGarbageLeases(List<StreamShard> list, List<ShardLease> list2) throws StreamClientException, DependencyException {
        HashSet<String> hashSet = new HashSet();
        Iterator<ShardLease> it = list2.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getLeaseKey());
        }
        for (StreamShard streamShard : list) {
            if (hashSet.contains(streamShard.getShardId())) {
                hashSet.remove(streamShard.getShardId());
            }
        }
        for (String str : hashSet) {
            this.leaseManager.deleteLease(str);
            LOG.info("Delete expired lease, LeaseKey: {}.", str);
        }
    }
}
