/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.consensus.iot;

import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.DeserializedBatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
import org.apache.iotdb.consensus.iot.IoTConsensusServerMetrics;
import org.apache.iotdb.consensus.iot.client.AsyncIoTConsensusServiceClient;
import org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient;
import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
import org.apache.iotdb.consensus.iot.log.GetConsensusReqReaderPlan;
import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher;
import org.apache.iotdb.consensus.iot.snapshot.IoTConsensusRateLimiter;
import org.apache.iotdb.consensus.iot.snapshot.SnapshotFragmentReader;
import org.apache.iotdb.consensus.iot.thrift.TActivatePeerReq;
import org.apache.iotdb.consensus.iot.thrift.TActivatePeerRes;
import org.apache.iotdb.consensus.iot.thrift.TBuildSyncLogChannelReq;
import org.apache.iotdb.consensus.iot.thrift.TBuildSyncLogChannelRes;
import org.apache.iotdb.consensus.iot.thrift.TCleanupTransferredSnapshotReq;
import org.apache.iotdb.consensus.iot.thrift.TCleanupTransferredSnapshotRes;
import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerReq;
import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerRes;
import org.apache.iotdb.consensus.iot.thrift.TRemoveSyncLogChannelReq;
import org.apache.iotdb.consensus.iot.thrift.TRemoveSyncLogChannelRes;
import org.apache.iotdb.consensus.iot.thrift.TSendSnapshotFragmentReq;
import org.apache.iotdb.consensus.iot.thrift.TSendSnapshotFragmentRes;
import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadReq;
import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadRes;
import org.apache.iotdb.consensus.iot.thrift.TWaitReleaseAllRegionRelatedResourceReq;
import org.apache.iotdb.consensus.iot.thrift.TWaitReleaseAllRegionRelatedResourceRes;
import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteReq;
import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteRes;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IoTConsensusServerImpl {
    public static final String SNAPSHOT_DIR_NAME = "snapshot";
    private static final Pattern SNAPSHOT_INDEX_PATTEN = Pattern.compile(".*[^\\d](?=(\\d+))");
    private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS = PerformanceOverviewMetrics.getInstance();
    private final Logger logger = LoggerFactory.getLogger(IoTConsensusServerImpl.class);
    private final Peer thisNode;
    private final IStateMachine stateMachine;
    private final ConcurrentHashMap<Integer, SyncLogCacheQueue> cacheQueueMap;
    private final Lock stateMachineLock = new ReentrantLock();
    private final Condition stateMachineCondition = this.stateMachineLock.newCondition();
    private final String storageDir;
    private final TreeSet<Peer> configuration;
    private final AtomicLong searchIndex;
    private final LogDispatcher logDispatcher;
    private IoTConsensusConfig config;
    private final ConsensusReqReader consensusReqReader;
    private volatile boolean active = true;
    private String newSnapshotDirName;
    private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
    private final IoTConsensusServerMetrics ioTConsensusServerMetrics;
    private final String consensusGroupId;
    private final ScheduledExecutorService backgroundTaskService;
    private final IoTConsensusRateLimiter ioTConsensusRateLimiter = IoTConsensusRateLimiter.getInstance();

    public IoTConsensusServerImpl(String storageDir, Peer thisNode, TreeSet<Peer> configuration, IStateMachine stateMachine, ScheduledExecutorService backgroundTaskService, IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager, IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager, IoTConsensusConfig config) {
        this.storageDir = storageDir;
        this.thisNode = thisNode;
        this.stateMachine = stateMachine;
        this.cacheQueueMap = new ConcurrentHashMap();
        this.syncClientManager = syncClientManager;
        this.configuration = configuration;
        this.backgroundTaskService = backgroundTaskService;
        this.config = config;
        this.consensusGroupId = thisNode.getGroupId().toString();
        this.consensusReqReader = (ConsensusReqReader)((Object)stateMachine.read(new GetConsensusReqReaderPlan()));
        this.searchIndex = new AtomicLong(this.consensusReqReader.getCurrentSearchIndex());
        this.ioTConsensusServerMetrics = new IoTConsensusServerMetrics(this);
        this.logDispatcher = new LogDispatcher(this, clientManager);
    }

    public IStateMachine getStateMachine() {
        return this.stateMachine;
    }

    public void start() {
        this.checkAndUpdateIndex();
        MetricService.getInstance().addMetricSet((IMetricSet)this.ioTConsensusServerMetrics);
        this.stateMachine.start();
        this.logDispatcher.start();
    }

    public void stop() {
        this.logDispatcher.stop();
        this.stateMachine.stop();
        MetricService.getInstance().removeMetricSet((IMetricSet)this.ioTConsensusServerMetrics);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TSStatus write(IConsensusRequest request) {
        long consensusWriteStartTime = System.nanoTime();
        this.stateMachineLock.lock();
        try {
            AtomicLong atomicLong;
            long getStateMachineLockTime = System.nanoTime();
            this.ioTConsensusServerMetrics.recordGetStateMachineLockTime(getStateMachineLockTime - consensusWriteStartTime);
            if (this.needBlockWrite()) {
                this.logger.info("[Throttle Down] index:{}, safeIndex:{}", (Object)this.getSearchIndex(), (Object)this.getMinSyncIndex());
                try {
                    boolean timeout;
                    boolean bl = timeout = !this.stateMachineCondition.await(this.config.getReplication().getThrottleTimeOutMs(), TimeUnit.MILLISECONDS);
                    if (timeout) {
                        TSStatus tSStatus = RpcUtils.getStatus((TSStatusCode)TSStatusCode.WRITE_PROCESS_REJECT, (String)String.format("The write is rejected because the wal directory size has reached the threshold %d bytes. You may need to adjust the flush policy of the storage storageengine or the IoTConsensus synchronization parameter", this.config.getReplication().getWalThrottleThreshold()));
                        return tSStatus;
                    }
                }
                catch (InterruptedException e) {
                    this.logger.error("Failed to throttle down because ", (Throwable)e);
                    Thread.currentThread().interrupt();
                }
            }
            long writeToStateMachineStartTime = System.nanoTime();
            this.ioTConsensusServerMetrics.recordCheckingBeforeWriteTime(writeToStateMachineStartTime - getStateMachineLockTime);
            IndexedConsensusRequest indexedConsensusRequest = this.buildIndexedConsensusRequestForLocalRequest(request);
            if (indexedConsensusRequest.getSearchIndex() % 100000L == 0L) {
                this.logger.info("DataRegion[{}]: index after build: safeIndex:{}, searchIndex: {}", new Object[]{this.thisNode.getGroupId(), this.getMinSyncIndex(), indexedConsensusRequest.getSearchIndex()});
            }
            IConsensusRequest planNode = this.stateMachine.deserializeRequest(indexedConsensusRequest);
            long startWriteTime = System.nanoTime();
            TSStatus result = this.stateMachine.write(planNode);
            PERFORMANCE_OVERVIEW_METRICS.recordEngineCost(System.nanoTime() - startWriteTime);
            long writeToStateMachineEndTime = System.nanoTime();
            this.ioTConsensusServerMetrics.recordWriteStateMachineTime(writeToStateMachineEndTime - writeToStateMachineStartTime);
            if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                atomicLong = this.searchIndex;
                synchronized (atomicLong) {
                    this.logDispatcher.offer(indexedConsensusRequest);
                    this.searchIndex.incrementAndGet();
                }
                this.ioTConsensusServerMetrics.recordOfferRequestToQueueTime(System.nanoTime() - writeToStateMachineEndTime);
            } else {
                this.logger.debug("{}: write operation failed. searchIndex: {}. Code: {}", new Object[]{this.thisNode.getGroupId(), indexedConsensusRequest.getSearchIndex(), result.getCode()});
            }
            this.ioTConsensusServerMetrics.recordConsensusWriteTime(System.nanoTime() - consensusWriteStartTime);
            atomicLong = result;
            return atomicLong;
        }
        finally {
            this.stateMachineLock.unlock();
        }
    }

    public DataSet read(IConsensusRequest request) {
        return this.stateMachine.read(request);
    }

    public void takeSnapshot() throws ConsensusGroupModifyPeerException {
        try {
            this.newSnapshotDirName = String.format("%s_%s_%s", SNAPSHOT_DIR_NAME, this.thisNode.getGroupId().getId(), UUID.randomUUID());
            File snapshotDir = new File(this.storageDir, this.newSnapshotDirName);
            if (snapshotDir.exists()) {
                org.apache.commons.io.FileUtils.deleteDirectory((File)snapshotDir);
            }
            if (!snapshotDir.mkdirs()) {
                throw new ConsensusGroupModifyPeerException(String.format("%s: cannot mkdir for snapshot", this.thisNode.getGroupId()));
            }
            if (!this.stateMachine.takeSnapshot(snapshotDir)) {
                throw new ConsensusGroupModifyPeerException("unknown error when taking snapshot");
            }
            this.clearOldSnapshot();
        }
        catch (IOException e) {
            throw new ConsensusGroupModifyPeerException("error when taking snapshot", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void transmitSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException {
        File snapshotDir = new File(this.storageDir, this.newSnapshotDirName);
        List<File> snapshotPaths = this.stateMachine.getSnapshotFiles(snapshotDir);
        AtomicLong snapshotSizeSumAtomic = new AtomicLong();
        StringBuilder allFilesStr = new StringBuilder();
        snapshotPaths.forEach(file -> {
            long fileSize = file.length();
            snapshotSizeSumAtomic.addAndGet(fileSize);
            allFilesStr.append("\n").append(file.getName()).append(" ").append(FileUtils.humanReadableByteCountSI((long)fileSize));
        });
        long snapshotSizeSum = snapshotSizeSumAtomic.get();
        long transitedSnapshotSizeSum = 0L;
        long transitedFilesNum = 0L;
        long startTime = System.nanoTime();
        this.logger.info("[SNAPSHOT TRANSMISSION] Start to transmit snapshots ({} files, total size {}) from dir {}", new Object[]{snapshotPaths.size(), FileUtils.humanReadableByteCountSI((long)snapshotSizeSum), snapshotDir});
        this.logger.info("[SNAPSHOT TRANSMISSION] All the files below shell be transmitted: {}", (Object)allFilesStr);
        try (SyncIoTConsensusServiceClient client = (SyncIoTConsensusServiceClient)this.syncClientManager.borrowClient((Object)targetPeer.getEndpoint());){
            for (File file2 : snapshotPaths) {
                try (SnapshotFragmentReader reader = new SnapshotFragmentReader(this.newSnapshotDirName, file2.toPath());){
                    while (reader.hasNext()) {
                        TSendSnapshotFragmentReq req = reader.next().toTSendSnapshotFragmentReq();
                        req.setConsensusGroupId(targetPeer.getGroupId().convertToTConsensusGroupId());
                        this.ioTConsensusRateLimiter.acquireTransitDataSizeWithRateLimiter(req.getChunkLength());
                        TSendSnapshotFragmentRes res = client.sendSnapshotFragment(req);
                        if (this.isSuccess(res.getStatus())) continue;
                        throw new ConsensusGroupModifyPeerException(String.format("[SNAPSHOT TRANSMISSION] Error when transmitting snapshot fragment to %s", targetPeer));
                    }
                    this.logger.info("[SNAPSHOT TRANSMISSION] The overall progress for dir {}: files {}/{} done, size {}/{} done, time {} passed. File {} done.", new Object[]{this.newSnapshotDirName, ++transitedFilesNum, snapshotPaths.size(), FileUtils.humanReadableByteCountSI((long)(transitedSnapshotSizeSum += reader.getTotalReadSize())), FileUtils.humanReadableByteCountSI((long)snapshotSizeSum), CommonDateTimeUtils.convertMillisecondToDurationStr((long)((System.nanoTime() - startTime) / 1000000L)), file2});
                }
            }
        }
        catch (Exception e) {
            throw new ConsensusGroupModifyPeerException(String.format("[SNAPSHOT TRANSMISSION] Error when send snapshot file to %s", targetPeer), e);
        }
        this.logger.info("[SNAPSHOT TRANSMISSION] After {}, successfully transmit all snapshots from dir {}", (Object)CommonDateTimeUtils.convertMillisecondToDurationStr((long)((System.nanoTime() - startTime) / 1000000L)), (Object)snapshotDir);
    }

    public void receiveSnapshotFragment(String snapshotId, String originalFilePath, ByteBuffer fileChunk, long fileOffset) throws ConsensusGroupModifyPeerException {
        try {
            String targetFilePath = this.calculateSnapshotPath(snapshotId, originalFilePath);
            File targetFile = new File(this.storageDir, targetFilePath);
            Path parentDir = Paths.get(targetFile.getParent(), new String[0]);
            if (!Files.exists(parentDir, new LinkOption[0])) {
                Files.createDirectories(parentDir, new FileAttribute[0]);
            }
            try (FileOutputStream fos = new FileOutputStream(targetFile.getAbsolutePath(), true);
                 FileChannel channel = fos.getChannel();){
                channel.write(fileChunk.slice(), fileOffset);
            }
        }
        catch (IOException e) {
            throw new ConsensusGroupModifyPeerException(String.format("error when receiving snapshot %s", snapshotId), e);
        }
    }

    private String calculateSnapshotPath(String snapshotId, String originalFilePath) throws ConsensusGroupModifyPeerException {
        if (!originalFilePath.contains(snapshotId)) {
            throw new ConsensusGroupModifyPeerException(String.format("invalid snapshot file. snapshotId: %s, filePath: %s", snapshotId, originalFilePath));
        }
        return originalFilePath.substring(originalFilePath.indexOf(snapshotId));
    }

    private void clearOldSnapshot() {
        File directory = new File(this.storageDir);
        File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(SNAPSHOT_DIR_NAME));
        if (versionFiles == null || versionFiles.length == 0) {
            this.logger.error("Can not find any snapshot dir after build a new snapshot for group {}", (Object)this.thisNode.getGroupId());
            return;
        }
        for (File file : versionFiles) {
            if (file.getName().equals(this.newSnapshotDirName)) continue;
            try {
                org.apache.commons.io.FileUtils.deleteDirectory((File)file);
            }
            catch (IOException e) {
                this.logger.error("Delete old snapshot dir {} failed", (Object)file.getAbsolutePath(), (Object)e);
            }
        }
    }

    public void loadSnapshot(String snapshotId) {
        this.stateMachine.loadSnapshot(new File(this.storageDir, snapshotId));
    }

    public void inactivatePeer(Peer peer, boolean forDeletionPurpose) throws ConsensusGroupModifyPeerException {
        ConsensusGroupModifyPeerException lastException = null;
        for (int i = 0; i < 2; ++i) {
            try {
                TInactivatePeerRes res;
                SyncIoTConsensusServiceClient client;
                block13: {
                    client = (SyncIoTConsensusServiceClient)this.syncClientManager.borrowClient((Object)peer.getEndpoint());
                    res = client.inactivatePeer(new TInactivatePeerReq(peer.getGroupId().convertToTConsensusGroupId()).setForDeletionPurpose(forDeletionPurpose));
                    if (!this.isSuccess(res.status)) break block13;
                    if (client != null) {
                        client.close();
                    }
                    return;
                }
                try {
                    try {
                        lastException = new ConsensusGroupModifyPeerException(String.format("error when inactivating %s. %s", peer, res.getStatus()));
                    }
                    catch (Exception e) {
                        lastException = new ConsensusGroupModifyPeerException(String.format("error when inactivating %s", peer), e);
                    }
                    continue;
                }
                catch (Throwable throwable) {
                    throw throwable;
                }
                finally {
                    if (client != null) {
                        client.close();
                    }
                }
            }
            catch (ClientManagerException e) {
                lastException = new ConsensusGroupModifyPeerException(e);
            }
        }
        throw lastException;
    }

    public void triggerSnapshotLoad(Peer peer) throws ConsensusGroupModifyPeerException {
        try (SyncIoTConsensusServiceClient client = (SyncIoTConsensusServiceClient)this.syncClientManager.borrowClient((Object)peer.getEndpoint());){
            TTriggerSnapshotLoadRes res = client.triggerSnapshotLoad(new TTriggerSnapshotLoadReq(this.thisNode.getGroupId().convertToTConsensusGroupId(), this.newSnapshotDirName));
            if (!this.isSuccess(res.status)) {
                throw new ConsensusGroupModifyPeerException(String.format("error when triggering snapshot load %s. %s", peer, res.getStatus()));
            }
        }
        catch (Exception e) {
            throw new ConsensusGroupModifyPeerException(String.format("error when activating %s", peer), e);
        }
    }

    public void activePeer(Peer peer) throws ConsensusGroupModifyPeerException {
        try (SyncIoTConsensusServiceClient client = (SyncIoTConsensusServiceClient)this.syncClientManager.borrowClient((Object)peer.getEndpoint());){
            TActivatePeerRes res = client.activatePeer(new TActivatePeerReq(peer.getGroupId().convertToTConsensusGroupId()));
            if (!this.isSuccess(res.status)) {
                throw new ConsensusGroupModifyPeerException(String.format("error when activating %s. %s", peer, res.getStatus()));
            }
        }
        catch (Exception e) {
            throw new ConsensusGroupModifyPeerException(String.format("error when activating %s", peer), e);
        }
    }

    public void notifyPeersToBuildSyncLogChannel(Peer targetPeer) throws ConsensusGroupModifyPeerException {
        ArrayList<Peer> currentMembers = new ArrayList<Peer>(this.configuration);
        this.logger.info("[IoTConsensus] notify current peers to build sync log. group member: {}, target: {}", currentMembers, (Object)targetPeer);
        for (Peer peer : currentMembers) {
            this.logger.info("[IoTConsensus] build sync log channel from {}", (Object)peer);
            if (peer.equals(this.thisNode)) {
                this.buildSyncLogChannel(targetPeer, true);
                continue;
            }
            try {
                SyncIoTConsensusServiceClient client = (SyncIoTConsensusServiceClient)this.syncClientManager.borrowClient((Object)peer.getEndpoint());
                try {
                    TBuildSyncLogChannelRes res = client.buildSyncLogChannel(new TBuildSyncLogChannelReq(targetPeer.getGroupId().convertToTConsensusGroupId(), targetPeer.getEndpoint(), targetPeer.getNodeId()));
                    if (this.isSuccess(res.status)) continue;
                    throw new ConsensusGroupModifyPeerException(String.format("build sync log channel failed from %s to %s", peer, targetPeer));
                }
                finally {
                    if (client == null) continue;
                    client.close();
                }
            }
            catch (Exception e) {
                this.logger.error("cannot notify {} to build sync log channel. Please check the status of this node manually", (Object)peer, (Object)e);
            }
        }
    }

    public void notifyPeersToRemoveSyncLogChannel(Peer targetPeer) {
        ImmutableList currentMembers = ImmutableList.copyOf(this.configuration);
        this.removeSyncLogChannel(targetPeer);
        for (Peer peer : currentMembers) {
            if (peer.equals(targetPeer) || peer.equals(this.thisNode)) continue;
            try {
                SyncIoTConsensusServiceClient client = (SyncIoTConsensusServiceClient)this.syncClientManager.borrowClient((Object)peer.getEndpoint());
                try {
                    TRemoveSyncLogChannelRes res = client.removeSyncLogChannel(new TRemoveSyncLogChannelReq(targetPeer.getGroupId().convertToTConsensusGroupId(), targetPeer.getEndpoint(), targetPeer.getNodeId()));
                    if (this.isSuccess(res.status)) continue;
                    this.logger.warn("removing sync log channel failed from {} to {}", (Object)peer, (Object)targetPeer);
                }
                finally {
                    if (client == null) continue;
                    client.close();
                }
            }
            catch (Exception e) {
                this.logger.warn("Exception happened during removing sync log channel from {} to {}", new Object[]{peer, targetPeer, e});
            }
        }
    }

    public void waitTargetPeerUntilSyncLogCompleted(Peer targetPeer) throws ConsensusGroupModifyPeerException {
        long checkIntervalInMs = 10000L;
        try (SyncIoTConsensusServiceClient client = (SyncIoTConsensusServiceClient)this.syncClientManager.borrowClient((Object)targetPeer.getEndpoint());){
            while (true) {
                TWaitSyncLogCompleteRes res = client.waitSyncLogComplete(new TWaitSyncLogCompleteReq(targetPeer.getGroupId().convertToTConsensusGroupId()));
                if (res.complete) {
                    this.logger.info("[WAIT LOG SYNC] {} SyncLog is completed. TargetIndex: {}, CurrentSyncIndex: {}", new Object[]{targetPeer, res.searchIndex, res.safeIndex});
                    return;
                }
                this.logger.info("[WAIT LOG SYNC] {} SyncLog is still in progress. TargetIndex: {}, CurrentSyncIndex: {}", new Object[]{targetPeer, res.searchIndex, res.safeIndex});
                Thread.sleep(checkIntervalInMs);
            }
        }
        catch (ClientManagerException | TException e) {
            throw new ConsensusGroupModifyPeerException(String.format("error when waiting %s to complete SyncLog. %s", targetPeer, e.getMessage()), e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ConsensusGroupModifyPeerException(String.format("thread interrupted when waiting %s to complete SyncLog. %s", targetPeer, e.getMessage()), e);
        }
    }

    public boolean hasReleaseAllRegionRelatedResource(ConsensusGroupId groupId) {
        return this.stateMachine.hasReleaseAllRegionRelatedResource(groupId);
    }

    public void waitReleaseAllRegionRelatedResource(Peer targetPeer) throws ConsensusGroupModifyPeerException {
        long checkIntervalInMs = 10000L;
        try (SyncIoTConsensusServiceClient client = (SyncIoTConsensusServiceClient)this.syncClientManager.borrowClient((Object)targetPeer.getEndpoint());){
            while (true) {
                TWaitReleaseAllRegionRelatedResourceRes res = client.waitReleaseAllRegionRelatedResource(new TWaitReleaseAllRegionRelatedResourceReq(targetPeer.getGroupId().convertToTConsensusGroupId()));
                if (res.releaseAllResource) {
                    this.logger.info("[WAIT RELEASE] {} has released all region related resource", (Object)targetPeer);
                    return;
                }
                this.logger.info("[WAIT RELEASE] {} is still releasing all region related resource", (Object)targetPeer);
                Thread.sleep(checkIntervalInMs);
            }
        }
        catch (ClientManagerException | TException e) {
            throw new ConsensusGroupModifyPeerException(String.format("error when waiting %s to release all region related resource. %s", targetPeer, e.getMessage()), e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ConsensusGroupModifyPeerException(String.format("thread interrupted when waiting %s to release all region related resource. %s", targetPeer, e.getMessage()), e);
        }
    }

    private boolean isSuccess(TSStatus status) {
        return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
    }

    public void buildSyncLogChannel(Peer targetPeer, boolean startNow) {
        this.buildSyncLogChannel(targetPeer, this.getMinSyncIndex(), startNow);
    }

    public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex, boolean startNow) {
        KillPoint.setKillPoint((Enum)DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
        this.configuration.add(targetPeer);
        if (Objects.equals(targetPeer, this.thisNode)) {
            return;
        }
        this.logDispatcher.addLogDispatcherThread(targetPeer, initialSyncIndex, startNow);
        this.logger.info("[IoTConsensus] Successfully build sync log channel to {} with initialSyncIndex {}. {}", new Object[]{targetPeer, initialSyncIndex, startNow ? "Sync log channel has started." : "Sync log channel maybe start later."});
    }

    public boolean removeSyncLogChannel(Peer targetPeer) {
        boolean exceptionHappened = false;
        String suggestion = "";
        try {
            this.logDispatcher.removeLogDispatcherThread(targetPeer);
            this.logger.info("[IoTConsensus] log dispatcher to {} removed and cleanup", (Object)targetPeer);
        }
        catch (Exception e) {
            this.logger.warn("[IoTConsensus] Exception happened during removing log dispatcher thread, but configuration.dat will still be removed.", (Throwable)e);
            suggestion = "It's suggested restart the DataNode to remove log dispatcher thread.";
            exceptionHappened = true;
        }
        if (!exceptionHappened) {
            this.logger.info("[IoTConsensus] Log dispatcher thread to {} has been removed and cleanup", (Object)targetPeer);
        }
        this.configuration.remove(targetPeer);
        this.checkAndUpdateSafeDeletedSearchIndex();
        this.logger.info("[IoTConsensus Configuration] Configuration updated to {}. {}", this.configuration, (Object)suggestion);
        return !exceptionHappened;
    }

    public static String generateConfigurationDatFileName(int nodeId, String suffix) {
        return nodeId + "_" + suffix;
    }

    public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(IConsensusRequest request) {
        if (request instanceof ComparableConsensusRequest) {
            IoTProgressIndex iotProgressIndex = new IoTProgressIndex(Integer.valueOf(this.thisNode.getNodeId()), Long.valueOf(this.searchIndex.get() + 1L));
            ((ComparableConsensusRequest)request).setProgressIndex((ProgressIndex)iotProgressIndex);
        }
        return new IndexedConsensusRequest(this.searchIndex.get() + 1L, Collections.singletonList(request));
    }

    public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(long syncIndex, List<IConsensusRequest> requests) {
        return new IndexedConsensusRequest(-1L, syncIndex, requests);
    }

    public long getMinSyncIndex() {
        return this.logDispatcher.getMinSyncIndex().orElseGet(this.searchIndex::get);
    }

    public long getMinFlushedSyncIndex() {
        return this.logDispatcher.getMinFlushedSyncIndex().orElseGet(this.searchIndex::get);
    }

    public String getStorageDir() {
        return this.storageDir;
    }

    public Peer getThisNode() {
        return this.thisNode;
    }

    public List<Peer> getConfiguration() {
        return new ArrayList<Peer>(this.configuration);
    }

    public long getSearchIndex() {
        return this.searchIndex.get();
    }

    public long getSyncLag() {
        long minSyncIndex = this.getMinSyncIndex();
        return this.getSearchIndex() - minSyncIndex;
    }

    public IoTConsensusConfig getConfig() {
        return this.config;
    }

    public long getLogEntriesFromWAL() {
        return this.logDispatcher.getLogEntriesFromWAL();
    }

    public long getLogEntriesFromQueue() {
        return this.logDispatcher.getLogEntriesFromQueue();
    }

    public boolean needBlockWrite() {
        return this.consensusReqReader.getTotalSize() > this.config.getReplication().getWalThrottleThreshold();
    }

    public boolean unblockWrite() {
        return this.consensusReqReader.getTotalSize() < this.config.getReplication().getWalThrottleThreshold();
    }

    public void signal() {
        this.stateMachineLock.lock();
        try {
            this.stateMachineCondition.signalAll();
        }
        finally {
            this.stateMachineLock.unlock();
        }
    }

    public AtomicLong getIndexObject() {
        return this.searchIndex;
    }

    public ScheduledExecutorService getBackgroundTaskService() {
        return this.backgroundTaskService;
    }

    public LogDispatcher getLogDispatcher() {
        return this.logDispatcher;
    }

    public IoTConsensusServerMetrics getIoTConsensusServerMetrics() {
        return this.ioTConsensusServerMetrics;
    }

    public boolean isReadOnly() {
        return this.stateMachine.isReadOnly();
    }

    public boolean isActive() {
        return this.active;
    }

    public void setActive(boolean active) {
        this.logger.info("set {} active status to {}", (Object)this.thisNode, (Object)active);
        this.active = active;
    }

    public void cleanupRemoteSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException {
        try (SyncIoTConsensusServiceClient client = (SyncIoTConsensusServiceClient)this.syncClientManager.borrowClient((Object)targetPeer.getEndpoint());){
            TCleanupTransferredSnapshotReq req = new TCleanupTransferredSnapshotReq(targetPeer.getGroupId().convertToTConsensusGroupId(), this.newSnapshotDirName);
            TCleanupTransferredSnapshotRes res = client.cleanupTransferredSnapshot(req);
            if (!this.isSuccess(res.getStatus())) {
                throw new ConsensusGroupModifyPeerException(String.format("cleanup remote snapshot failed of %s ,status is %s", targetPeer, res.getStatus()));
            }
        }
        catch (Exception e) {
            throw new ConsensusGroupModifyPeerException(String.format("cleanup remote snapshot failed of %s", targetPeer), e);
        }
    }

    public void cleanupSnapshot(String snapshotId) throws ConsensusGroupModifyPeerException {
        File snapshotDir = new File(this.storageDir, snapshotId);
        if (snapshotDir.exists()) {
            try {
                org.apache.commons.io.FileUtils.deleteDirectory((File)snapshotDir);
            }
            catch (IOException e) {
                throw new ConsensusGroupModifyPeerException(e);
            }
        } else {
            this.logger.info("File not exist: {}", (Object)snapshotDir);
        }
    }

    public void cleanupLocalSnapshot() {
        try {
            this.cleanupSnapshot(this.newSnapshotDirName);
            this.stateMachine.clearSnapshot();
        }
        catch (ConsensusGroupModifyPeerException e) {
            this.logger.warn("Cleanup local snapshot fail. You may manually delete {}.", (Object)this.newSnapshotDirName, (Object)e);
        }
    }

    void checkAndUpdateIndex() {
        this.checkAndUpdateSafeDeletedSearchIndex();
        this.checkAndUpdateSearchIndex();
    }

    void checkAndUpdateSafeDeletedSearchIndex() {
        if (this.configuration.isEmpty()) {
            this.logger.error("Configuration is empty, which is unexpected. Safe deleted search index won't be updated this time.");
        } else if (this.configuration.size() == 1) {
            this.consensusReqReader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
        } else {
            this.consensusReqReader.setSafelyDeletedSearchIndex(this.getMinFlushedSyncIndex());
        }
    }

    public void checkAndUpdateSearchIndex() {
        long safelyDeletedSearchIndex;
        long currentSearchIndex = this.searchIndex.get();
        if (currentSearchIndex < (safelyDeletedSearchIndex = this.getMinFlushedSyncIndex())) {
            this.logger.warn("The searchIndex for this region({}) is smaller than the safelyDeletedSearchIndex when the node is restarted, which means that the data of the current region is not flushed by the wal, but has been synchronized to other nodes. At this point, different replicas have been inconsistent and cannot be automatically recovered. To prevent subsequent logs from marking smaller searchIndex and exacerbating the inconsistency, we manually set the searchIndex({}) to safelyDeletedSearchIndex({}) here to reduce the impact of this problem in the future", new Object[]{this.consensusGroupId, currentSearchIndex, safelyDeletedSearchIndex});
            this.searchIndex.set(safelyDeletedSearchIndex);
        }
    }

    public TSStatus syncLog(int sourcePeerId, IConsensusRequest request) {
        return this.cacheQueueMap.computeIfAbsent(sourcePeerId, x$0 -> new SyncLogCacheQueue((int)x$0)).cacheAndInsertLatestNode((DeserializedBatchIndexedConsensusRequest)request);
    }

    public String getConsensusGroupId() {
        return this.consensusGroupId;
    }

    public void reloadConsensusConfig(IoTConsensusConfig config) {
        this.config = config;
    }

    static /* synthetic */ String access$100(IoTConsensusServerImpl x0) {
        return x0.consensusGroupId;
    }

    static /* synthetic */ Logger access$200(IoTConsensusServerImpl x0) {
        return x0.logger;
    }

    static /* synthetic */ IoTConsensusConfig access$300(IoTConsensusServerImpl x0) {
        return x0.config;
    }

    static /* synthetic */ IoTConsensusServerMetrics access$400(IoTConsensusServerImpl x0) {
        return x0.ioTConsensusServerMetrics;
    }

    static /* synthetic */ IStateMachine access$500(IoTConsensusServerImpl x0) {
        return x0.stateMachine;
    }

    private class SyncLogCacheQueue {
        private final int sourcePeerId;
        private final Lock queueLock = new ReentrantLock();
        private final Condition queueSortCondition = this.queueLock.newCondition();
        private final PriorityQueue<DeserializedBatchIndexedConsensusRequest> requestCache;
        private long nextSyncIndex = -1L;

        public SyncLogCacheQueue(int sourcePeerId) {
            this.sourcePeerId = sourcePeerId;
            this.requestCache = new PriorityQueue();
        }

        /*
         * Exception decompiling
         */
        private TSStatus cacheAndInsertLatestNode(DeserializedBatchIndexedConsensusRequest request) {
            /*
             * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
             * 
             * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [5[DOLOOP]], but top level block is 1[TRYBLOCK]
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
             *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
             *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
             *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
             *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
             *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
             *     at org.benf.cfr.reader.Main.main(Main.java:54)
             */
            throw new IllegalStateException("Decompilation failed");
        }
    }

    @FunctionalInterface
    public static interface ThrowableFunction<T, R> {
        public R apply(T var1) throws Exception;
    }
}

