/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.pipe.receiver.protocol.pipeconsensus;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.request.PipeConsensusRequestType;
import org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.request.PipeConsensusRequestVersion;
import org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.request.PipeConsensusTransferFilePieceReq;
import org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.response.PipeConsensusTransferFilePieceResp;
import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.pipe.PipeConsensus;
import org.apache.iotdb.consensus.pipe.PipeConsensusServerImpl;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.load.LoadFileException;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceReq;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealReq;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.consensus.PipeConsensusReceiverMetrics;
import org.apache.iotdb.db.pipe.event.common.tsfile.TsFileInsertionPointCounter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import org.apache.iotdb.db.storageengine.load.LoadTsFileManager;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipeConsensusReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeConsensusReceiver.class);
    private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
    private static final long PIPE_CONSENSUS_RECEIVER_MAX_WAITING_TIME_IN_MS = (long)IOTDB_CONFIG.getConnectionTimeoutInMS() / 6L * (long)IOTDB_CONFIG.getPipeConsensusPipelineSize();
    private static final long CLOSE_TSFILE_WRITER_MAX_WAIT_TIME_IN_MS = 5000L;
    private static final long RETRY_WAIT_TIME = 500L;
    private final RequestExecutor requestExecutor;
    private final PipeConsensus pipeConsensus;
    private final ConsensusGroupId consensusGroupId;
    private final ConsensusPipeName consensusPipeName;
    private final List<String> receiverBaseDirsName;
    private final PipeConsensusTsFileWriterPool pipeConsensusTsFileWriterPool;
    private final AtomicReference<File> receiverFileDirWithIdSuffix = new AtomicReference();
    private final PipeConsensusReceiverMetrics pipeConsensusReceiverMetrics;
    private final FolderManager folderManager;

    public PipeConsensusReceiver(PipeConsensus pipeConsensus, ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName) {
        this.pipeConsensus = pipeConsensus;
        this.consensusGroupId = consensusGroupId;
        this.pipeConsensusReceiverMetrics = new PipeConsensusReceiverMetrics(this);
        this.consensusPipeName = consensusPipeName;
        this.receiverBaseDirsName = Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getPipeConsensusReceiverFileDirs());
        try {
            this.folderManager = new FolderManager(this.receiverBaseDirsName, DirectoryStrategyType.SEQUENCE_STRATEGY);
        }
        catch (Exception e) {
            LOGGER.error("Fail to create pipeConsensus receiver file folders allocation strategy because all disks of folders are full.", (Throwable)e);
            throw new RuntimeException(e);
        }
        try {
            this.initiateTsFileBufferFolder();
            this.pipeConsensusTsFileWriterPool = new PipeConsensusTsFileWriterPool(consensusPipeName, this.receiverFileDirWithIdSuffix.get().getPath());
        }
        catch (Exception e) {
            LOGGER.error("Fail to initiate file buffer folder, Error msg: {}", (Object)e.getMessage());
            throw new RuntimeException(e);
        }
        this.requestExecutor = new RequestExecutor(this.pipeConsensusReceiverMetrics, this.pipeConsensusTsFileWriterPool);
        MetricService.getInstance().addMetricSet((IMetricSet)this.pipeConsensusReceiverMetrics);
    }

    public TPipeConsensusTransferResp receive(TPipeConsensusTransferReq req) {
        long startNanos = System.nanoTime();
        TPipeConsensusTransferResp resp = this.preCheckForReceiver(req);
        if (resp != null) {
            return resp;
        }
        short rawRequestType = req.getType();
        if (PipeConsensusRequestType.isValidatedRequestType((short)rawRequestType)) {
            switch (PipeConsensusRequestType.valueOf((short)rawRequestType)) {
                case TRANSFER_TS_FILE_PIECE: 
                case TRANSFER_TS_FILE_PIECE_WITH_MOD: {
                    this.requestExecutor.onRequest(req, true, false);
                    resp = this.loadEvent(req);
                    break;
                }
                case TRANSFER_TS_FILE_SEAL: 
                case TRANSFER_TS_FILE_SEAL_WITH_MOD: {
                    resp = this.requestExecutor.onRequest(req, false, true);
                    break;
                }
                default: {
                    resp = this.requestExecutor.onRequest(req, false, false);
                }
            }
            long durationNanos = System.nanoTime() - startNanos;
            this.pipeConsensusReceiverMetrics.recordReceiveEventTimer(durationNanos);
            return resp;
        }
        TSStatus status = RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_TYPE_ERROR, (String)String.format("PipeConsensus Unknown PipeRequestType %s.", rawRequestType));
        if (LOGGER.isWarnEnabled()) {
            LOGGER.warn("PipeConsensus Unknown PipeRequestType, response status = {}.", (Object)status);
        }
        return new TPipeConsensusTransferResp(status);
    }

    private TPipeConsensusTransferResp preCheckForReceiver(TPipeConsensusTransferReq req) {
        ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)req.getConsensusGroupId());
        PipeConsensusServerImpl impl = this.pipeConsensus.getImpl(groupId);
        if (impl == null) {
            String message = String.format("PipeConsensus-PipeName-%s: unexpected consensusGroupId %s", this.consensusPipeName, groupId);
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error(message);
            }
            return new TPipeConsensusTransferResp(RpcUtils.getStatus((int)TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), (String)message));
        }
        if (impl.isReadOnly()) {
            String message = String.format("PipeConsensus-PipeName-%s: fail to receive because system is read-only.", this.consensusPipeName);
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error(message);
            }
            return new TPipeConsensusTransferResp(RpcUtils.getStatus((int)TSStatusCode.SYSTEM_READ_ONLY.getStatusCode(), (String)message));
        }
        if (!impl.isActive()) {
            String message = String.format("PipeConsensus-PipeName-%s: fail to receive because peer is inactive and not ready.", this.consensusPipeName);
            if (LOGGER.isWarnEnabled()) {
                LOGGER.warn(message);
            }
            return new TPipeConsensusTransferResp(RpcUtils.getStatus((int)TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode(), (String)message));
        }
        return null;
    }

    private TPipeConsensusTransferResp loadEvent(TPipeConsensusTransferReq req) {
        try {
            short rawRequestType = req.getType();
            if (PipeConsensusRequestType.isValidatedRequestType((short)rawRequestType)) {
                switch (PipeConsensusRequestType.valueOf((short)rawRequestType)) {
                    case TRANSFER_TABLET_INSERT_NODE: {
                        return this.handleTransferTabletInsertNode(PipeConsensusTabletInsertNodeReq.fromTPipeConsensusTransferReq(req));
                    }
                    case TRANSFER_TABLET_BINARY: {
                        return this.handleTransferTabletBinary(PipeConsensusTabletBinaryReq.fromTPipeConsensusTransferReq(req));
                    }
                    case TRANSFER_TS_FILE_PIECE: {
                        return this.handleTransferFilePiece(PipeConsensusTsFilePieceReq.fromTPipeConsensusTransferReq(req), true);
                    }
                    case TRANSFER_TS_FILE_SEAL: {
                        return this.handleTransferFileSeal(PipeConsensusTsFileSealReq.fromTPipeConsensusTransferReq(req));
                    }
                    case TRANSFER_TS_FILE_PIECE_WITH_MOD: {
                        return this.handleTransferFilePiece(PipeConsensusTsFilePieceReq.fromTPipeConsensusTransferReq(req), false);
                    }
                    case TRANSFER_TS_FILE_SEAL_WITH_MOD: {
                        return this.handleTransferFileSealWithMods(PipeConsensusTsFileSealWithModReq.fromTPipeConsensusTransferReq(req));
                    }
                    case TRANSFER_TABLET_BATCH: {
                        LOGGER.info("PipeConsensus transfer batch hasn't been implemented yet.");
                    }
                }
            }
            TSStatus status = RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_CONSENSUS_TYPE_ERROR, (String)String.format("Unknown PipeConsensusRequestType %s.", rawRequestType));
            LOGGER.warn("PipeConsensus-PipeName-{}: Unknown PipeRequestType, response status = {}.", (Object)this.consensusPipeName, (Object)status);
            return new TPipeConsensusTransferResp(status);
        }
        catch (Exception e) {
            String error = String.format("Serialization error during pipe receiving, %s", e);
            LOGGER.warn("PipeConsensus-PipeName-{}: {}", new Object[]{this.consensusPipeName, error, e});
            return new TPipeConsensusTransferResp(RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_ERROR, (String)error));
        }
    }

    private TPipeConsensusTransferResp handleTransferTabletInsertNode(PipeConsensusTabletInsertNodeReq req) throws ConsensusGroupNotExistException {
        LOGGER.info("PipeConsensus-PipeName-{}: starting to receive tablet insertNode", (Object)this.consensusPipeName);
        PipeConsensusServerImpl impl = Optional.ofNullable(this.pipeConsensus.getImpl(this.consensusGroupId)).orElseThrow(() -> new ConsensusGroupNotExistException(this.consensusGroupId));
        InsertNode insertNode = req.getInsertNode();
        insertNode.markAsGeneratedByRemoteConsensusLeader();
        insertNode.setProgressIndex(ProgressIndexType.deserializeFrom((ByteBuffer)ByteBuffer.wrap(req.getProgressIndex())));
        return new TPipeConsensusTransferResp(impl.writeOnFollowerReplica((IConsensusRequest)insertNode));
    }

    private TPipeConsensusTransferResp handleTransferTabletBinary(PipeConsensusTabletBinaryReq req) throws ConsensusGroupNotExistException {
        LOGGER.info("PipeConsensus-PipeName-{}: starting to receive tablet binary", (Object)this.consensusPipeName);
        PipeConsensusServerImpl impl = Optional.ofNullable(this.pipeConsensus.getImpl(this.consensusGroupId)).orElseThrow(() -> new ConsensusGroupNotExistException(this.consensusGroupId));
        InsertNode insertNode = req.convertToInsertNode();
        insertNode.markAsGeneratedByRemoteConsensusLeader();
        insertNode.setProgressIndex(ProgressIndexType.deserializeFrom((ByteBuffer)ByteBuffer.wrap(req.getProgressIndex())));
        return new TPipeConsensusTransferResp(impl.writeOnFollowerReplica((IConsensusRequest)insertNode));
    }

    private TPipeConsensusTransferResp handleTransferFilePiece(PipeConsensusTransferFilePieceReq req, boolean isSingleFile) {
        LOGGER.info("PipeConsensus-PipeName-{}: starting to receive tsFile pieces", (Object)this.consensusPipeName);
        long startBorrowTsFileWriterNanos = System.nanoTime();
        PipeConsensusTsFileWriter tsFileWriter = this.pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(req.getCommitId());
        long startPreCheckNanos = System.nanoTime();
        this.pipeConsensusReceiverMetrics.recordBorrowTsFileWriterTimer(startPreCheckNanos - startBorrowTsFileWriterNanos);
        try {
            this.updateWritingFileIfNeeded(tsFileWriter, req.getFileName(), isSingleFile);
            File writingFile = tsFileWriter.getWritingFile();
            RandomAccessFile writingFileWriter = tsFileWriter.getWritingFileWriter();
            if (this.isWritingFileOffsetNonCorrect(tsFileWriter, req.getStartWritingOffset())) {
                if (!writingFile.getName().endsWith(".tsfile")) {
                    writingFileWriter.setLength(0L);
                }
                TSStatus status = RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_OFFSET_RESET, (String)String.format("Request sender to reset file reader's offset from %s to %s.", req.getStartWritingOffset(), writingFileWriter.length()));
                LOGGER.warn("PipeConsensus-PipeName-{}: File offset reset requested by receiver, response status = {}.", (Object)this.consensusPipeName, (Object)status);
                return PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp((TSStatus)status, (long)writingFileWriter.length());
            }
            long endPreCheckNanos = System.nanoTime();
            this.pipeConsensusReceiverMetrics.recordTsFilePiecePreCheckTime(endPreCheckNanos - startPreCheckNanos);
            writingFileWriter.write(req.getFilePiece());
            this.pipeConsensusReceiverMetrics.recordTsFilePieceWriteTime(System.nanoTime() - endPreCheckNanos);
            return PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp((TSStatus)RpcUtils.SUCCESS_STATUS, (long)writingFileWriter.length());
        }
        catch (Exception e) {
            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to write file piece from req {}.", new Object[]{this.consensusPipeName, req, e});
            TSStatus status = RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR, (String)String.format("Failed to write file piece, because %s", e.getMessage()));
            try {
                return PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp((TSStatus)status, (long)-1L);
            }
            catch (IOException ex) {
                return PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp((TSStatus)status);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TPipeConsensusTransferResp handleTransferFileSeal(PipeConsensusTsFileSealReq req) {
        LOGGER.info("PipeConsensus-PipeName-{}: starting to receive tsFile seal", (Object)this.consensusPipeName);
        long startBorrowTsFileWriterNanos = System.nanoTime();
        PipeConsensusTsFileWriter tsFileWriter = this.pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(req.getCommitId());
        long startPreCheckNanos = System.nanoTime();
        this.pipeConsensusReceiverMetrics.recordBorrowTsFileWriterTimer(startPreCheckNanos - startBorrowTsFileWriterNanos);
        File writingFile = tsFileWriter.getWritingFile();
        RandomAccessFile writingFileWriter = tsFileWriter.getWritingFileWriter();
        try {
            if (this.isWritingFileNonAvailable(tsFileWriter)) {
                TSStatus status = RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR, (String)String.format("Failed to seal file, because writing file %s is not available.", writingFile));
                LOGGER.warn(status.getMessage());
                TPipeConsensusTransferResp tPipeConsensusTransferResp = new TPipeConsensusTransferResp(status);
                return tPipeConsensusTransferResp;
            }
            TPipeConsensusTransferResp resp = this.checkFinalFileSeal(tsFileWriter, req.getFileName(), req.getFileLength());
            if (Objects.nonNull(resp)) {
                TPipeConsensusTransferResp tPipeConsensusTransferResp = resp;
                return tPipeConsensusTransferResp;
            }
            String fileAbsolutePath = writingFile.getAbsolutePath();
            writingFileWriter.getFD().sync();
            writingFileWriter.close();
            tsFileWriter.setWritingFileWriter(null);
            tsFileWriter.setWritingFile(null);
            long endPreCheckNanos = System.nanoTime();
            this.pipeConsensusReceiverMetrics.recordTsFileSealPreCheckTimer(endPreCheckNanos - startPreCheckNanos);
            this.updateWritePointCountMetrics(req.getPointCount(), fileAbsolutePath);
            TSStatus status = this.loadFileToDataRegion(fileAbsolutePath, ProgressIndexType.deserializeFrom((ByteBuffer)ByteBuffer.wrap(req.getProgressIndex())));
            this.pipeConsensusReceiverMetrics.recordTsFileSealLoadTimer(System.nanoTime() - endPreCheckNanos);
            if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                tsFileWriter.returnSelf(this.consensusPipeName);
                LOGGER.info("PipeConsensus-PipeName-{}: Seal file {} successfully.", (Object)this.consensusPipeName, (Object)fileAbsolutePath);
            } else {
                LOGGER.warn("PipeConsensus-PipeName-{}: Failed to seal file {}, because {}.", new Object[]{this.consensusPipeName, fileAbsolutePath, status.getMessage()});
            }
            TPipeConsensusTransferResp tPipeConsensusTransferResp = new TPipeConsensusTransferResp(status);
            return tPipeConsensusTransferResp;
        }
        catch (IOException e) {
            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to seal file {} from req {}.", new Object[]{this.consensusPipeName, writingFile, req, e});
            TPipeConsensusTransferResp tPipeConsensusTransferResp = new TPipeConsensusTransferResp(RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR, (String)String.format("Failed to seal file %s because %s", writingFile, e.getMessage())));
            return tPipeConsensusTransferResp;
        }
        catch (LoadFileException e) {
            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to load file {} from req {}.", new Object[]{this.consensusPipeName, writingFile, req, e});
            TPipeConsensusTransferResp tPipeConsensusTransferResp = new TPipeConsensusTransferResp(RpcUtils.getStatus((TSStatusCode)TSStatusCode.LOAD_FILE_ERROR, (String)String.format("Failed to seal file %s because %s", writingFile, e.getMessage())));
            return tPipeConsensusTransferResp;
        }
        finally {
            this.closeCurrentWritingFileWriter(tsFileWriter, false);
            this.deleteCurrentWritingFile(tsFileWriter);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TPipeConsensusTransferResp handleTransferFileSealWithMods(PipeConsensusTsFileSealWithModReq req) {
        LOGGER.info("PipeConsensus-PipeName-{}: starting to receive tsFile seal with mods", (Object)this.consensusPipeName);
        long startBorrowTsFileWriterNanos = System.nanoTime();
        PipeConsensusTsFileWriter tsFileWriter = this.pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(req.getCommitId());
        long startPreCheckNanos = System.nanoTime();
        this.pipeConsensusReceiverMetrics.recordBorrowTsFileWriterTimer(startPreCheckNanos - startBorrowTsFileWriterNanos);
        File writingFile = tsFileWriter.getWritingFile();
        RandomAccessFile writingFileWriter = tsFileWriter.getWritingFileWriter();
        List files = req.getFileNames().stream().map(fileName -> new File(this.receiverFileDirWithIdSuffix.get(), (String)fileName)).collect(Collectors.toList());
        try {
            if (this.isWritingFileNonAvailable(tsFileWriter)) {
                TSStatus status = RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR, (String)String.format("Failed to seal file %s, because writing file %s is not available.", req.getFileNames(), writingFile));
                LOGGER.warn(status.getMessage());
                TPipeConsensusTransferResp tPipeConsensusTransferResp = new TPipeConsensusTransferResp(status);
                return tPipeConsensusTransferResp;
            }
            for (int i = 0; i < req.getFileNames().size(); ++i) {
                TPipeConsensusTransferResp resp;
                TPipeConsensusTransferResp tPipeConsensusTransferResp = resp = i == req.getFileNames().size() - 1 ? this.checkFinalFileSeal(tsFileWriter, (String)req.getFileNames().get(i), (Long)req.getFileLengths().get(i)) : this.checkNonFinalFileSeal(tsFileWriter, (File)files.get(i), (String)req.getFileNames().get(i), (Long)req.getFileLengths().get(i));
                if (!Objects.nonNull(resp)) continue;
                TPipeConsensusTransferResp tPipeConsensusTransferResp2 = resp;
                return tPipeConsensusTransferResp2;
            }
            writingFileWriter.getFD().sync();
            writingFileWriter.close();
            tsFileWriter.setWritingFileWriter(null);
            tsFileWriter.setWritingFile(null);
            List fileAbsolutePaths = files.stream().map(File::getAbsolutePath).collect(Collectors.toList());
            long endPreCheckNanos = System.nanoTime();
            this.pipeConsensusReceiverMetrics.recordTsFileSealPreCheckTimer(endPreCheckNanos - startPreCheckNanos);
            String tsFileAbsolutePath = (String)fileAbsolutePaths.get(1);
            this.updateWritePointCountMetrics((Long)req.getPointCounts().get(1), tsFileAbsolutePath);
            TSStatus status = this.loadFileToDataRegion(tsFileAbsolutePath, ProgressIndexType.deserializeFrom((ByteBuffer)ByteBuffer.wrap(req.getProgressIndex())));
            this.pipeConsensusReceiverMetrics.recordTsFileSealLoadTimer(System.nanoTime() - endPreCheckNanos);
            if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                tsFileWriter.returnSelf(this.consensusPipeName);
                LOGGER.info("PipeConsensus-PipeName-{}: Seal file with mods {} successfully.", (Object)this.consensusPipeName, fileAbsolutePaths);
            } else {
                LOGGER.warn("PipeConsensus-PipeName-{}: Failed to seal file {}, status is {}.", new Object[]{this.consensusPipeName, fileAbsolutePaths, status});
            }
            TPipeConsensusTransferResp tPipeConsensusTransferResp = new TPipeConsensusTransferResp(status);
            return tPipeConsensusTransferResp;
        }
        catch (Exception e) {
            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to seal file {} from req {}.", new Object[]{this.consensusPipeName, files, req, e});
            TPipeConsensusTransferResp tPipeConsensusTransferResp = new TPipeConsensusTransferResp(RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR, (String)String.format("Failed to seal file %s because %s", writingFile, e.getMessage())));
            return tPipeConsensusTransferResp;
        }
        finally {
            this.closeCurrentWritingFileWriter(tsFileWriter, false);
            IoTDBReceiverAgent.cleanPipeReceiverDir((File)this.receiverFileDirWithIdSuffix.get());
        }
    }

    private TPipeConsensusTransferResp checkNonFinalFileSeal(PipeConsensusTsFileWriter tsFileWriter, File file, String fileName, long fileLength) throws IOException {
        RandomAccessFile writingFileWriter = tsFileWriter.getWritingFileWriter();
        if (!file.exists()) {
            TSStatus status = RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR, (String)String.format("Failed to seal file %s, the file does not exist.", fileName));
            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to seal file {}, because the file does not exist.", (Object)this.consensusPipeName, (Object)fileName);
            return new TPipeConsensusTransferResp(status);
        }
        if (fileLength != file.length()) {
            TSStatus status = RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR, (String)String.format("Failed to seal file %s, because the length of file is not correct. The original file has length %s, but receiver file has length %s.", fileName, fileLength, writingFileWriter.length()));
            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to seal file {} when check non final seal, because the length of file is not correct. The original file has length {}, but receiver file has length {}.", new Object[]{this.consensusPipeName, fileName, fileLength, writingFileWriter.length()});
            return new TPipeConsensusTransferResp(status);
        }
        return null;
    }

    private TSStatus loadFileToDataRegion(String filePath, ProgressIndex progressIndex) throws IOException, LoadFileException {
        StorageEngine.getInstance().getDataRegion((DataRegionId)this.consensusGroupId).loadNewTsFile(this.generateTsFileResource(filePath, progressIndex), true, false);
        return RpcUtils.SUCCESS_STATUS;
    }

    private void updateWritePointCountMetrics(long writePointCountGivenByReq, String tsFileAbsolutePath) {
        if (writePointCountGivenByReq >= 0L) {
            this.updateWritePointCountMetrics(writePointCountGivenByReq);
            return;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("PipeConsensus-PipeName-{}: The point count of TsFile {} is not given by sender, will read actual point count from TsFile.", (Object)this.consensusPipeName, (Object)tsFileAbsolutePath);
        }
        try (TsFileInsertionPointCounter counter = new TsFileInsertionPointCounter(new File(tsFileAbsolutePath), null);){
            this.updateWritePointCountMetrics(counter.count());
        }
        catch (IOException e) {
            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to read TsFile when counting points: {}.", new Object[]{this.consensusPipeName, tsFileAbsolutePath, e});
        }
    }

    private void updateWritePointCountMetrics(long writePointCount) {
        DataRegion dataRegion = StorageEngine.getInstance().getDataRegion((DataRegionId)this.consensusGroupId);
        dataRegion.getNonSystemDatabaseName().ifPresent(databaseName -> LoadTsFileManager.updateWritePointCountMetrics(dataRegion, databaseName, writePointCount, true));
    }

    private TsFileResource generateTsFileResource(String filePath, ProgressIndex progressIndex) throws IOException {
        File tsFile = new File(filePath);
        TsFileResource tsFileResource = new TsFileResource(tsFile);
        try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath());){
            TsFileResourceUtils.updateTsFileResource(reader, tsFileResource);
        }
        tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
        tsFileResource.setProgressIndex(progressIndex);
        tsFileResource.setGeneratedByPipeConsensus(true);
        tsFileResource.serialize();
        return tsFileResource;
    }

    private boolean isWritingFileNonAvailable(PipeConsensusTsFileWriter tsFileWriter) {
        boolean isWritingFileAvailable;
        File writingFile = tsFileWriter.getWritingFile();
        RandomAccessFile writingFileWriter = tsFileWriter.getWritingFileWriter();
        boolean bl = isWritingFileAvailable = writingFile != null && writingFile.exists() && writingFileWriter != null;
        if (!isWritingFileAvailable) {
            LOGGER.info("PipeConsensus-PipeName-{}: Writing file {} is not available. Writing file is null: {}, writing file exists: {}, writing file writer is null: {}.", new Object[]{this.consensusPipeName, writingFile, writingFile == null, writingFile != null && writingFile.exists(), writingFileWriter == null});
        }
        return !isWritingFileAvailable;
    }

    private TPipeConsensusTransferResp checkFinalFileSeal(PipeConsensusTsFileWriter tsFileWriter, String fileName, long fileLength) throws IOException {
        File writingFile = tsFileWriter.getWritingFile();
        RandomAccessFile writingFileWriter = tsFileWriter.getWritingFileWriter();
        if (!this.isFileExistedAndNameCorrect(tsFileWriter, fileName)) {
            TSStatus status = RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR, (String)String.format("Failed to seal file %s, because writing file is %s.", fileName, writingFile));
            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to seal file {}, because writing file is {}.", new Object[]{this.consensusPipeName, fileName, writingFile});
            return new TPipeConsensusTransferResp(status);
        }
        if (this.isWritingFileOffsetNonCorrect(tsFileWriter, fileLength)) {
            TSStatus status = RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR, (String)String.format("Failed to seal file %s, because the length of file is not correct. The original file has length %s, but receiver file has length %s.", fileName, fileLength, writingFileWriter.length()));
            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to seal file {} when check final seal file, because the length of file is not correct. The original file has length {}, but receiver file has length {}.", new Object[]{this.consensusPipeName, fileName, fileLength, writingFileWriter.length()});
            return new TPipeConsensusTransferResp(status);
        }
        return null;
    }

    private boolean isFileExistedAndNameCorrect(PipeConsensusTsFileWriter tsFileWriter, String fileName) {
        File writingFile = tsFileWriter.getWritingFile();
        return writingFile != null && writingFile.getName().equals(fileName);
    }

    private boolean isWritingFileOffsetNonCorrect(PipeConsensusTsFileWriter tsFileWriter, long offset) throws IOException {
        boolean offsetCorrect;
        File writingFile = tsFileWriter.getWritingFile();
        RandomAccessFile writingFileWriter = tsFileWriter.getWritingFileWriter();
        boolean bl = offsetCorrect = writingFileWriter.length() == offset;
        if (!offsetCorrect) {
            LOGGER.warn("PipeConsensus-PipeName-{}: Writing file {}'s offset is {}, but request sender's offset is {}.", new Object[]{this.consensusPipeName, writingFile.getPath(), writingFileWriter.length(), offset});
        }
        return !offsetCorrect;
    }

    private void closeCurrentWritingFileWriter(PipeConsensusTsFileWriter tsFileWriter, boolean fsyncAfterClose) {
        if (tsFileWriter.getWritingFileWriter() != null) {
            try {
                tsFileWriter.getWritingFileWriter().close();
                if (fsyncAfterClose) {
                    tsFileWriter.getWritingFileWriter().getFD().sync();
                }
                LOGGER.info("PipeConsensus-PipeName-{}: Current writing file writer {} was closed.", (Object)this.consensusPipeName, (Object)(tsFileWriter.getWritingFile() == null ? "null" : tsFileWriter.getWritingFile().getPath()));
            }
            catch (IOException e) {
                LOGGER.warn("PipeConsensus-PipeName-{}: Failed to close current writing file writer {}, because {}.", new Object[]{this.consensusPipeName, tsFileWriter.getWritingFile() == null ? "null" : tsFileWriter.getWritingFile().getPath(), e.getMessage(), e});
            }
            tsFileWriter.setWritingFileWriter(null);
        } else if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("PipeConsensus-PipeName-{}: Current writing file writer is null. No need to close.", (Object)this.consensusPipeName.toString());
        }
    }

    private void deleteFile(File file) {
        if (file.exists()) {
            try {
                RetryUtils.retryOnException(() -> FileUtils.delete((File)file));
                LOGGER.info("PipeConsensus-PipeName-{}: Original writing file {} was deleted.", (Object)this.consensusPipeName, (Object)file.getPath());
            }
            catch (IOException e) {
                LOGGER.warn("PipeConsensus-PipeName-{}: Failed to delete original writing file {}, because {}.", new Object[]{this.consensusPipeName, file.getPath(), e.getMessage(), e});
            }
        } else if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("PipeConsensus-PipeName-{}: Original file {} is not existed. No need to delete.", (Object)this.consensusPipeName, (Object)file.getPath());
        }
    }

    private void deleteCurrentWritingFile(PipeConsensusTsFileWriter tsFileWriter) {
        if (tsFileWriter.getWritingFile() != null) {
            this.deleteFile(tsFileWriter.getWritingFile());
        } else if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("PipeConsensus-PipeName-{}: Current writing file is null. No need to delete.", (Object)this.consensusPipeName.toString());
        }
    }

    private void updateWritingFileIfNeeded(PipeConsensusTsFileWriter tsFileWriter, String fileName, boolean isSingleFile) throws IOException {
        if (this.isFileExistedAndNameCorrect(tsFileWriter, fileName)) {
            return;
        }
        LOGGER.info("PipeConsensus-PipeName-{}: Writing file {} is not existed or name is not correct, try to create it. Current writing file is {}.", new Object[]{this.consensusPipeName, fileName, tsFileWriter.getWritingFile() == null ? "null" : tsFileWriter.getWritingFile().getPath()});
        this.closeCurrentWritingFileWriter(tsFileWriter, !isSingleFile);
        if (tsFileWriter.getWritingFile() != null && isSingleFile) {
            this.deleteCurrentWritingFile(tsFileWriter);
        }
        if (!this.receiverFileDirWithIdSuffix.get().exists()) {
            if (this.receiverFileDirWithIdSuffix.get().mkdirs()) {
                LOGGER.info("PipeConsensus-PipeName-{}: Receiver file dir {} was created.", (Object)this.consensusPipeName, (Object)this.receiverFileDirWithIdSuffix.get().getPath());
            } else {
                LOGGER.error("PipeConsensus-PipeName-{}: Failed to create receiver file dir {}.", (Object)this.consensusPipeName, (Object)this.receiverFileDirWithIdSuffix.get().getPath());
            }
        }
        tsFileWriter.setWritingFile(new File(tsFileWriter.getLocalWritingDirPath(), fileName));
        tsFileWriter.setWritingFileWriter(new RandomAccessFile(tsFileWriter.getWritingFile(), "rw"));
        LOGGER.info("PipeConsensus-PipeName-{}: Writing file {} was created. Ready to write file pieces.", (Object)this.consensusPipeName, (Object)tsFileWriter.getWritingFile().getPath());
    }

    private String getReceiverFileBaseDir() throws DiskSpaceInsufficientException {
        return Objects.isNull(this.folderManager) ? null : this.folderManager.getNextFolder();
    }

    private void initiateTsFileBufferFolder() throws DiskSpaceInsufficientException, IOException {
        String receiverFileBaseDir;
        if (this.receiverFileDirWithIdSuffix.get() != null) {
            if (this.receiverFileDirWithIdSuffix.get().exists()) {
                try {
                    RetryUtils.retryOnException(() -> {
                        FileUtils.deleteDirectory((File)this.receiverFileDirWithIdSuffix.get());
                        return null;
                    });
                    LOGGER.info("PipeConsensus-PipeName-{}: Original receiver file dir {} was deleted successfully.", (Object)this.consensusPipeName, (Object)this.receiverFileDirWithIdSuffix.get().getPath());
                }
                catch (IOException e) {
                    LOGGER.warn("PipeConsensus-PipeName-{}: Failed to delete original receiver file dir {}, because {}.", new Object[]{this.consensusPipeName, this.receiverFileDirWithIdSuffix.get().getPath(), e.getMessage(), e});
                }
            } else if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("PipeConsensus-PipeName-{}: Original receiver file dir {} is not existed. No need to delete.", (Object)this.consensusPipeName, (Object)this.receiverFileDirWithIdSuffix.get().getPath());
            }
            this.receiverFileDirWithIdSuffix.set(null);
        } else {
            LOGGER.debug("PipeConsensus-PipeName-{}: Current receiver file dir is null. No need to delete.", (Object)this.consensusPipeName.toString());
        }
        try {
            receiverFileBaseDir = this.getReceiverFileBaseDir();
        }
        catch (Exception e) {
            LOGGER.warn("Failed to init pipeConsensus receiver file folder manager because all disks of folders are full.", (Throwable)e);
            throw e;
        }
        if (Objects.isNull(receiverFileBaseDir)) {
            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to get pipeConsensus receiver file base directory, because your folderManager is null. May because the disk is full.", (Object)this.consensusPipeName.toString());
            throw new DiskSpaceInsufficientException(this.receiverBaseDirsName);
        }
        File newReceiverDir = new File(receiverFileBaseDir, this.consensusPipeName.toString());
        File systemDir = new File(IoTDBDescriptor.getInstance().getConfig().getSystemDir());
        if (!systemDir.exists()) {
            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to create receiver file dir {}. Because parent system dir have been deleted due to system concurrently exit.", (Object)this.consensusPipeName, (Object)newReceiverDir.getPath());
            throw new IOException(String.format("PipeConsensus-PipeName-%s: Failed to create receiver file dir %s. Because parent system dir have been deleted due to system concurrently exit.", this.consensusPipeName, newReceiverDir.getPath()));
        }
        if (newReceiverDir.exists()) {
            RetryUtils.retryOnException(() -> {
                FileUtils.deleteDirectory((File)newReceiverDir);
                return null;
            });
            LOGGER.info("PipeConsensus-PipeName-{}: Origin receiver file dir {} was deleted.", (Object)this.consensusPipeName, (Object)newReceiverDir.getPath());
        }
        if (!newReceiverDir.mkdirs()) {
            LOGGER.warn("PipeConsensus-PipeName-{}: Failed to create receiver file dir {}. May because authority or dir already exists etc.", (Object)this.consensusPipeName, (Object)newReceiverDir.getPath());
            throw new IOException(String.format("PipeConsensus-PipeName-%s: Failed to create receiver file dir %s. May because authority or dir already exists etc.", this.consensusPipeName, newReceiverDir.getPath()));
        }
        this.receiverFileDirWithIdSuffix.set(newReceiverDir);
    }

    public PipeConsensusRequestVersion getVersion() {
        return PipeConsensusRequestVersion.VERSION_1;
    }

    public synchronized void handleExit() {
        this.pipeConsensusTsFileWriterPool.handleExit(this.consensusPipeName);
        if (this.receiverFileDirWithIdSuffix.get() != null) {
            if (this.receiverFileDirWithIdSuffix.get().exists()) {
                try {
                    RetryUtils.retryOnException(() -> {
                        FileUtils.deleteDirectory((File)this.receiverFileDirWithIdSuffix.get());
                        return null;
                    });
                    LOGGER.info("PipeConsensus-PipeName-{}: Receiver exit: Original receiver file dir {} was deleted.", (Object)this.consensusPipeName, (Object)this.receiverFileDirWithIdSuffix.get().getPath());
                }
                catch (IOException e) {
                    LOGGER.warn("PipeConsensus-PipeName-{}: Receiver exit: Delete original receiver file dir {} error.", new Object[]{this.consensusPipeName, this.receiverFileDirWithIdSuffix.get().getPath(), e});
                }
            } else if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("PipeConsensus-PipeName-{}: Receiver exit: Original receiver file dir {} does not exist. No need to delete.", (Object)this.consensusPipeName, (Object)this.receiverFileDirWithIdSuffix.get().getPath());
            }
            this.receiverFileDirWithIdSuffix.set(null);
        } else if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("PipeConsensus-PipeName-{}: Receiver exit: Original receiver file dir is null. No need to delete.", (Object)this.consensusPipeName.toString());
        }
        MetricService.getInstance().removeMetricSet((IMetricSet)this.pipeConsensusReceiverMetrics);
        LOGGER.info("PipeConsensus-PipeName-{}: Receiver exit: Receiver exited.", (Object)this.consensusPipeName.toString());
    }

    public int getReceiveBufferSize() {
        return this.requestExecutor.reqExecutionOrderBuffer.size();
    }

    public int getWALEventCount() {
        return this.requestExecutor.WALEventCount.get();
    }

    public int getTsFileEventCount() {
        return this.requestExecutor.tsFileEventCount.get();
    }

    public String getConsensusGroupIdStr() {
        return this.consensusGroupId.toString();
    }

    private static class PipeConsensusTsFileWriterPool {
        private final Lock lock = new ReentrantLock();
        private final List<PipeConsensusTsFileWriter> pipeConsensusTsFileWriterPool = new ArrayList<PipeConsensusTsFileWriter>();
        private final ConsensusPipeName consensusPipeName;

        public PipeConsensusTsFileWriterPool(ConsensusPipeName consensusPipeName, String receiverBasePath) throws IOException {
            this.consensusPipeName = consensusPipeName;
            for (int i = 0; i < IOTDB_CONFIG.getPipeConsensusPipelineSize(); ++i) {
                PipeConsensusTsFileWriter tsFileWriter = new PipeConsensusTsFileWriter(i, consensusPipeName);
                tsFileWriter.setFilePath(receiverBasePath);
                this.pipeConsensusTsFileWriterPool.add(tsFileWriter);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public PipeConsensusTsFileWriter borrowCorrespondingWriter(TCommitId commitId) {
            Optional<PipeConsensusTsFileWriter> tsFileWriter = this.pipeConsensusTsFileWriterPool.stream().filter(item -> Objects.equals(commitId, item.getCommitIdOfCorrespondingHolderEvent())).findFirst();
            if (!tsFileWriter.isPresent()) {
                try {
                    this.lock.lock();
                    while (!tsFileWriter.isPresent()) {
                        tsFileWriter = this.pipeConsensusTsFileWriterPool.stream().filter(item -> !item.isUsed()).findFirst();
                        Thread.sleep(500L);
                    }
                    tsFileWriter.get().setUsed(true);
                    tsFileWriter.get().setCommitIdOfCorrespondingHolderEvent(commitId);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    LOGGER.warn("PipeConsensus: receiver thread get interrupted when waiting for borrowing tsFileWriter.");
                }
                finally {
                    this.lock.unlock();
                }
            }
            return tsFileWriter.get();
        }

        public void handleExit(ConsensusPipeName consensusPipeName) {
            this.pipeConsensusTsFileWriterPool.forEach(tsFileWriter -> {
                long currentTime = System.currentTimeMillis();
                while (System.currentTimeMillis() - currentTime < 5000L && tsFileWriter.isUsed()) {
                    try {
                        Thread.sleep(500L);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        LOGGER.warn("PipeConsensus-PipeName-{}: receiver thread get interrupted when exiting.", (Object)consensusPipeName.toString());
                        break;
                    }
                }
                tsFileWriter.closeSelf(consensusPipeName);
                tsFileWriter.returnSelf(consensusPipeName);
            });
        }
    }

    private class RequestExecutor {
        private final TreeSet<RequestMeta> reqExecutionOrderBuffer;
        private final Lock lock;
        private final Condition condition;
        private final PipeConsensusReceiverMetrics metric;
        private final PipeConsensusTsFileWriterPool tsFileWriterPool;
        private long onSyncedCommitIndex = 0L;
        private int connectorRebootTimes = 0;
        private AtomicInteger WALEventCount = new AtomicInteger(0);
        private AtomicInteger tsFileEventCount = new AtomicInteger(0);

        public RequestExecutor(PipeConsensusReceiverMetrics metric, PipeConsensusTsFileWriterPool tsFileWriterPool) {
            this.reqExecutionOrderBuffer = new TreeSet<RequestMeta>(Comparator.comparingInt(RequestMeta::getRebootTimes).thenComparingLong(RequestMeta::getCommitIndex));
            this.lock = new ReentrantLock();
            this.condition = this.lock.newCondition();
            this.metric = metric;
            this.tsFileWriterPool = tsFileWriterPool;
        }

        private void onSuccess(long nextSyncedCommitIndex, boolean isTransferTsFileSeal) {
            LOGGER.info("PipeConsensus-PipeName-{}: process no.{} event successfully!", (Object)PipeConsensusReceiver.this.consensusPipeName, (Object)nextSyncedCommitIndex);
            RequestMeta curMeta = this.reqExecutionOrderBuffer.pollFirst();
            this.onSyncedCommitIndex = nextSyncedCommitIndex;
            if (isTransferTsFileSeal) {
                this.tsFileEventCount.decrementAndGet();
                this.metric.recordReceiveTsFileTimer(System.nanoTime() - curMeta.getStartApplyNanos());
            } else {
                this.WALEventCount.decrementAndGet();
                this.metric.recordReceiveWALTimer(System.nanoTime() - curMeta.getStartApplyNanos());
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private TPipeConsensusTransferResp onRequest(TPipeConsensusTransferReq req, boolean isTransferTsFilePiece, boolean isTransferTsFileSeal) {
            long startAcquireLockNanos = System.nanoTime();
            this.lock.lock();
            try {
                long startDispatchNanos = System.nanoTime();
                this.metric.recordAcquireExecutorLockTimer(startDispatchNanos - startAcquireLockNanos);
                TCommitId tCommitId = req.getCommitId();
                RequestMeta requestMeta = new RequestMeta(tCommitId);
                LOGGER.info("PipeConsensus-PipeName-{}: start to receive no.{} event", (Object)PipeConsensusReceiver.this.consensusPipeName, (Object)tCommitId.getCommitIndex());
                if (tCommitId.getRebootTimes() < this.connectorRebootTimes) {
                    TSStatus status = new TSStatus(RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST, (String)"PipeConsensus receiver received a deprecated request, which may be sent before the connector restart. Consider to discard it"));
                    LOGGER.info("PipeConsensus-PipeName-{}: received a deprecated request, which may be sent before the connector restart. Consider to discard it", (Object)PipeConsensusReceiver.this.consensusPipeName);
                    TPipeConsensusTransferResp tPipeConsensusTransferResp = new TPipeConsensusTransferResp(status);
                    return tPipeConsensusTransferResp;
                }
                if (tCommitId.getRebootTimes() > this.connectorRebootTimes) {
                    this.resetWithNewestRebootTime(tCommitId.getRebootTimes());
                }
                if (isTransferTsFilePiece && !this.reqExecutionOrderBuffer.contains(requestMeta)) {
                    this.tsFileEventCount.incrementAndGet();
                }
                if (!isTransferTsFileSeal && !isTransferTsFilePiece) {
                    this.WALEventCount.incrementAndGet();
                }
                this.reqExecutionOrderBuffer.add(requestMeta);
                if (isTransferTsFilePiece) {
                    long startApplyNanos = System.nanoTime();
                    this.metric.recordDispatchWaitingTimer(startApplyNanos - startDispatchNanos);
                    requestMeta.setStartApplyNanos(startApplyNanos);
                    TPipeConsensusTransferResp tPipeConsensusTransferResp = null;
                    return tPipeConsensusTransferResp;
                }
                if (this.reqExecutionOrderBuffer.size() >= IOTDB_CONFIG.getPipeConsensusPipelineSize() && !this.reqExecutionOrderBuffer.first().equals(requestMeta)) {
                    this.condition.signalAll();
                }
                while (true) {
                    boolean timeout;
                    if (this.reqExecutionOrderBuffer.first().equals(requestMeta) && tCommitId.getCommitIndex() == this.onSyncedCommitIndex + 1L) {
                        long startApplyNanos = System.nanoTime();
                        this.metric.recordDispatchWaitingTimer(startApplyNanos - startDispatchNanos);
                        requestMeta.setStartApplyNanos(startApplyNanos);
                        TPipeConsensusTransferResp resp = PipeConsensusReceiver.this.loadEvent(req);
                        if (resp != null && resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                            this.onSuccess(this.onSyncedCommitIndex + 1L, isTransferTsFileSeal);
                        }
                        TPipeConsensusTransferResp tPipeConsensusTransferResp = resp;
                        return tPipeConsensusTransferResp;
                    }
                    if (this.reqExecutionOrderBuffer.size() >= IOTDB_CONFIG.getPipeConsensusPipelineSize() && this.reqExecutionOrderBuffer.first().equals(requestMeta)) {
                        long startApplyNanos = System.nanoTime();
                        this.metric.recordDispatchWaitingTimer(startApplyNanos - startDispatchNanos);
                        requestMeta.setStartApplyNanos(startApplyNanos);
                        TPipeConsensusTransferResp resp = PipeConsensusReceiver.this.loadEvent(req);
                        if (resp != null && resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                            this.onSuccess(tCommitId.getCommitIndex(), isTransferTsFileSeal);
                            this.condition.signalAll();
                        }
                        TPipeConsensusTransferResp tPipeConsensusTransferResp = resp;
                        return tPipeConsensusTransferResp;
                    }
                    boolean bl = timeout = !this.condition.await(PIPE_CONSENSUS_RECEIVER_MAX_WAITING_TIME_IN_MS, TimeUnit.MILLISECONDS);
                    if (!timeout || this.reqExecutionOrderBuffer.size() >= IOTDB_CONFIG.getPipeConsensusPipelineSize() || this.reqExecutionOrderBuffer.first() == null || !this.reqExecutionOrderBuffer.first().equals(requestMeta)) continue;
                    long startApplyNanos = System.nanoTime();
                    this.metric.recordDispatchWaitingTimer(startApplyNanos - startDispatchNanos);
                    requestMeta.setStartApplyNanos(startApplyNanos);
                    TPipeConsensusTransferResp resp = PipeConsensusReceiver.this.loadEvent(req);
                    if (resp != null && resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                        this.onSuccess(tCommitId.getCommitIndex(), isTransferTsFileSeal);
                        this.condition.signalAll();
                    }
                    TPipeConsensusTransferResp tPipeConsensusTransferResp = resp;
                    return tPipeConsensusTransferResp;
                }
            }
            finally {
                this.lock.unlock();
            }
        }

        private void resetWithNewestRebootTime(int connectorRebootTimes) {
            LOGGER.info("PipeConsensus-PipeName-{}: receiver detected an newer rebootTimes, which indicates the leader has rebooted. receiver will reset all its data.", (Object)PipeConsensusReceiver.this.consensusPipeName);
            this.reqExecutionOrderBuffer.clear();
            this.onSyncedCommitIndex = 0L;
            this.connectorRebootTimes = connectorRebootTimes;
            this.tsFileWriterPool.handleExit(PipeConsensusReceiver.this.consensusPipeName);
        }
    }

    private static class PipeConsensusTsFileWriter {
        private final ConsensusPipeName consensusPipeName;
        private final int index;
        private String localWritingDirPath;
        private boolean isUsed = false;
        private TCommitId commitIdOfCorrespondingHolderEvent;
        private File writingFile;
        private RandomAccessFile writingFileWriter;

        public PipeConsensusTsFileWriter(int index, ConsensusPipeName consensusPipeName) {
            this.index = index;
            this.consensusPipeName = consensusPipeName;
        }

        public void setFilePath(String receiverBasePath) throws IOException {
            this.localWritingDirPath = receiverBasePath + File.separator + this.index;
            File tsFileWriterDirectory = new File(this.localWritingDirPath);
            if (tsFileWriterDirectory.exists()) {
                RetryUtils.retryOnException(() -> {
                    FileUtils.deleteDirectory((File)tsFileWriterDirectory);
                    return null;
                });
                LOGGER.info("PipeConsensus-PipeName-{}: Origin receiver tsFileWriter-{} file dir {} was deleted.", new Object[]{this.consensusPipeName, this.index, tsFileWriterDirectory.getPath()});
            }
            if (!tsFileWriterDirectory.mkdirs()) {
                LOGGER.warn("PipeConsensus-PipeName-{}: Failed to create receiver tsFileWriter-{} file dir {}. May because authority or dir already exists etc.", new Object[]{this.consensusPipeName, this.index, tsFileWriterDirectory.getPath()});
                throw new IOException(String.format("PipeConsensus-PipeName-%s: Failed to create tsFileWriter-%d receiver file dir %s. May because authority or dir already exists etc.", this.consensusPipeName, this.index, tsFileWriterDirectory.getPath()));
            }
        }

        public String getLocalWritingDirPath() {
            return this.localWritingDirPath;
        }

        public File getWritingFile() {
            return this.writingFile;
        }

        public void setWritingFile(File writingFile) {
            this.writingFile = writingFile;
        }

        public RandomAccessFile getWritingFileWriter() {
            return this.writingFileWriter;
        }

        public void setWritingFileWriter(RandomAccessFile writingFileWriter) {
            this.writingFileWriter = writingFileWriter;
        }

        public TCommitId getCommitIdOfCorrespondingHolderEvent() {
            return this.commitIdOfCorrespondingHolderEvent;
        }

        public void setCommitIdOfCorrespondingHolderEvent(TCommitId commitIdOfCorrespondingHolderEvent) {
            this.commitIdOfCorrespondingHolderEvent = commitIdOfCorrespondingHolderEvent;
        }

        public boolean isUsed() {
            return this.isUsed;
        }

        public void setUsed(boolean used) {
            this.isUsed = used;
        }

        public void returnSelf(ConsensusPipeName consensusPipeName) {
            this.isUsed = false;
            this.commitIdOfCorrespondingHolderEvent = null;
            LOGGER.info("PipeConsensus-PipeName-{}: tsFileWriter-{} returned self", (Object)consensusPipeName.toString(), (Object)this.index);
        }

        public void closeSelf(ConsensusPipeName consensusPipeName) {
            if (this.writingFileWriter != null) {
                try {
                    this.writingFileWriter.close();
                    LOGGER.info("PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Writing file writer was closed.", (Object)consensusPipeName.toString(), (Object)this.index);
                }
                catch (Exception e) {
                    LOGGER.warn("PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Close Writing file writer error.", new Object[]{consensusPipeName, this.index, e});
                }
                this.setWritingFileWriter(null);
            } else if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Writing file writer is null. No need to close.", (Object)consensusPipeName.toString(), (Object)this.index);
            }
            if (this.writingFile != null) {
                try {
                    RetryUtils.retryOnException(() -> FileUtils.delete((File)this.writingFile));
                    LOGGER.info("PipeConsensus-PipeName-{}: TsFileWriter exit: Writing file {} was deleted.", (Object)consensusPipeName, (Object)this.writingFile.getPath());
                }
                catch (Exception e) {
                    LOGGER.warn("PipeConsensus-PipeName-{}: TsFileWriter exit: Delete writing file {} error.", new Object[]{consensusPipeName, this.writingFile.getPath(), e});
                }
                this.setWritingFile(null);
            } else if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("PipeConsensus-PipeName-{}: TsFileWriter exit: Writing file is null. No need to delete.", (Object)consensusPipeName.toString());
            }
        }
    }

    private static class RequestMeta {
        private final TCommitId commitId;
        private long startApplyNanos = 0L;

        public RequestMeta(TCommitId commitId) {
            this.commitId = commitId;
        }

        public int getRebootTimes() {
            return this.commitId.getRebootTimes();
        }

        public long getCommitIndex() {
            return this.commitId.getCommitIndex();
        }

        public void setStartApplyNanos(long startApplyNanos) {
            if (this.startApplyNanos == 0L) {
                this.startApplyNanos = startApplyNanos;
            }
        }

        public long getStartApplyNanos() {
            if (this.startApplyNanos == 0L) {
                return System.nanoTime();
            }
            return this.startApplyNanos;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            RequestMeta that = (RequestMeta)o;
            return this.commitId.equals(that.commitId);
        }

        public int hashCode() {
            return Objects.hash(this.commitId);
        }
    }
}

