/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.pipe.agent.task;

import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.agent.task.PipeTask;
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask;
import org.apache.iotdb.db.pipe.agent.task.builder.PipeDataNodeBuilder;
import org.apache.iotdb.db.pipe.agent.task.builder.PipeDataNodeTaskBuilder;
import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningFilter;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSchemaQueueNode;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.SystemMetric;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.thrift.TException;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipeDataNodeTaskAgent
extends PipeTaskAgent {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataNodeTaskAgent.class);
    protected static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
    private static final AtomicLong LAST_FORCED_RESTART_TIME = new AtomicLong(System.currentTimeMillis());
    private static final Map<String, AtomicLong> PIPE_NAME_TO_LAST_RESTART_TIME_MAP = new ConcurrentHashMap<String, AtomicLong>();

    protected boolean isShutdown() {
        return PipeDataNodeAgent.runtime().isShutdown();
    }

    protected Map<Integer, PipeTask> buildPipeTasks(PipeMeta pipeMetaFromConfigNode) throws IllegalPathException {
        return new PipeDataNodeBuilder(pipeMetaFromConfigNode).build();
    }

    protected void startPipe(String pipeName, long creationTime) {
        PipeMeta existedPipeMeta = this.pipeMetaKeeper.getPipeMeta(pipeName);
        PipeStatus status = (PipeStatus)existedPipeMeta.getRuntimeMeta().getStatus().get();
        if (PipeStatus.STOPPED.equals((Object)status) || status == null) {
            this.restartPipeToReloadResourceIfNeeded(existedPipeMeta);
        }
        super.startPipe(pipeName, creationTime);
    }

    private void restartPipeToReloadResourceIfNeeded(PipeMeta pipeMeta) {
        if (System.currentTimeMillis() - pipeMeta.getStaticMeta().getCreationTime() < PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()) {
            return;
        }
        AtomicLong lastRestartTime = PIPE_NAME_TO_LAST_RESTART_TIME_MAP.get(pipeMeta.getStaticMeta().getPipeName());
        if (lastRestartTime != null && System.currentTimeMillis() - lastRestartTime.get() < PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()) {
            LOGGER.info("Skipping reload resource for stopped pipe {} before starting it because reloading resource is too frequent.", (Object)pipeMeta.getStaticMeta().getPipeName());
            return;
        }
        if (PIPE_NAME_TO_LAST_RESTART_TIME_MAP.isEmpty()) {
            LOGGER.info("Flushing storage engine before restarting pipe {}.", (Object)pipeMeta.getStaticMeta().getPipeName());
            long currentTime = System.currentTimeMillis();
            StorageEngine.getInstance().syncCloseAllProcessor();
            WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
            LOGGER.info("Finished flushing storage engine, time cost: {} ms.", (Object)(System.currentTimeMillis() - currentTime));
        }
        this.restartStuckPipe(pipeMeta);
        LOGGER.info("Reloaded resource for stopped pipe {} before starting it.", (Object)pipeMeta.getStaticMeta().getPipeName());
    }

    protected void createPipeTask(int consensusGroupId, PipeStaticMeta pipeStaticMeta, PipeTaskMeta pipeTaskMeta) throws IllegalPathException {
        if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) {
            boolean needConstructSchemaRegionTask;
            PipeParameters extractorParameters = pipeStaticMeta.getExtractorParameters();
            DataRegionId dataRegionId = new DataRegionId(consensusGroupId);
            boolean needConstructDataRegionTask = StorageEngine.getInstance().getAllDataRegionIds().contains(dataRegionId) && DataRegionListeningFilter.shouldDataRegionBeListened(extractorParameters, dataRegionId);
            boolean bl = needConstructSchemaRegionTask = SchemaEngine.getInstance().getAllSchemaRegionIds().contains(new SchemaRegionId(consensusGroupId)) && !SchemaRegionListeningFilter.parseListeningPlanTypeSet(extractorParameters).isEmpty();
            if (needConstructDataRegionTask || needConstructSchemaRegionTask) {
                PipeDataNodeTask pipeTask = new PipeDataNodeTaskBuilder(pipeStaticMeta, consensusGroupId, pipeTaskMeta).build();
                pipeTask.create();
                this.pipeTaskManager.addPipeTask(pipeStaticMeta, consensusGroupId, (PipeTask)pipeTask);
            }
        }
        this.pipeMetaKeeper.getPipeMeta(pipeStaticMeta.getPipeName()).getRuntimeMeta().getConsensusGroupId2TaskMetaMap().put(consensusGroupId, pipeTaskMeta);
    }

    public List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChangesInternal(List<PipeMeta> pipeMetaListFromCoordinator) {
        if (this.isShutdown()) {
            return Collections.emptyList();
        }
        List exceptionMessages = super.handlePipeMetaChangesInternal(pipeMetaListFromCoordinator);
        try {
            Set<Integer> validSchemaRegionIds = this.clearSchemaRegionListeningQueueIfNecessary(pipeMetaListFromCoordinator);
            this.closeSchemaRegionListeningQueueIfNecessary(validSchemaRegionIds, exceptionMessages);
        }
        catch (Exception e) {
            LOGGER.warn("Failed to clear/close the schema region listening queue, because {}. Will wait until success or the region's state machine is stopped.", (Object)e.getMessage());
            exceptionMessages.add(new TPushPipeMetaRespExceptionMessage("", e.getMessage(), System.currentTimeMillis()));
        }
        return exceptionMessages;
    }

    private Set<Integer> clearSchemaRegionListeningQueueIfNecessary(List<PipeMeta> pipeMetaListFromCoordinator) throws IllegalPathException {
        HashMap<Integer, Long> schemaRegionId2ListeningQueueNewFirstIndex = new HashMap<Integer, Long>();
        for (PipeMeta pipeMetaFromCoordinator : pipeMetaListFromCoordinator) {
            if (SchemaRegionListeningFilter.parseListeningPlanTypeSet(pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters()).isEmpty()) continue;
            ConcurrentMap groupId2TaskMetaMap = pipeMetaFromCoordinator.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();
            for (SchemaRegionId regionId : SchemaEngine.getInstance().getAllSchemaRegionIds()) {
                int id = regionId.getId();
                PipeTaskMeta pipeTaskMeta = (PipeTaskMeta)groupId2TaskMetaMap.get(id);
                if (pipeTaskMeta == null) continue;
                ProgressIndex progressIndex = pipeTaskMeta.getProgressIndex();
                if (progressIndex instanceof MetaProgressIndex) {
                    if (((MetaProgressIndex)progressIndex).getIndex() + 1L >= schemaRegionId2ListeningQueueNewFirstIndex.getOrDefault(id, Long.MAX_VALUE)) continue;
                    schemaRegionId2ListeningQueueNewFirstIndex.put(id, ((MetaProgressIndex)progressIndex).getIndex() + 1L);
                    continue;
                }
                schemaRegionId2ListeningQueueNewFirstIndex.put(id, 0L);
            }
        }
        schemaRegionId2ListeningQueueNewFirstIndex.forEach((schemaRegionId, listeningQueueNewFirstIndex) -> PipeDataNodeAgent.runtime().schemaListener(new SchemaRegionId(schemaRegionId.intValue())).removeBefore((long)listeningQueueNewFirstIndex));
        return schemaRegionId2ListeningQueueNewFirstIndex.keySet();
    }

    private void closeSchemaRegionListeningQueueIfNecessary(Set<Integer> validSchemaRegionIds, List<TPushPipeMetaRespExceptionMessage> exceptionMessages) {
        if (!exceptionMessages.isEmpty()) {
            return;
        }
        PipeDataNodeAgent.runtime().listeningSchemaRegionIds().stream().filter(schemaRegionId -> !validSchemaRegionIds.contains(schemaRegionId.getId()) && PipeDataNodeAgent.runtime().isSchemaLeaderReady((SchemaRegionId)schemaRegionId)).forEach(schemaRegionId -> {
            try {
                SchemaRegionConsensusImpl.getInstance().write((ConsensusGroupId)schemaRegionId, (IConsensusRequest)new PipeOperateSchemaQueueNode(new PlanNodeId(""), false));
            }
            catch (ConsensusException e) {
                throw new PipeException("Failed to close listening queue for SchemaRegion " + schemaRegionId + ", because " + e.getMessage(), (Throwable)e);
            }
        });
    }

    protected void thawRate(String pipeName, long creationTime) {
        PipeDataNodeRemainingEventAndTimeMetrics.getInstance().thawRate(pipeName + "_" + creationTime);
    }

    protected void freezeRate(String pipeName, long creationTime) {
        PipeDataNodeRemainingEventAndTimeMetrics.getInstance().freezeRate(pipeName + "_" + creationTime);
    }

    protected boolean dropPipe(String pipeName, long creationTime) {
        if (!super.dropPipe(pipeName, creationTime)) {
            return false;
        }
        String taskId = pipeName + "_" + creationTime;
        PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
        PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
        return true;
    }

    protected boolean dropPipe(String pipeName) {
        PipeMeta pipeMeta = this.pipeMetaKeeper.getPipeMeta(pipeName);
        if (!super.dropPipe(pipeName)) {
            return false;
        }
        if (Objects.nonNull(pipeMeta)) {
            long creationTime = pipeMeta.getStaticMeta().getCreationTime();
            String taskId = pipeName + "_" + creationTime;
            PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
            PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
        }
        return true;
    }

    public void stopAllPipesWithCriticalException() {
        super.stopAllPipesWithCriticalException(CONFIG.getDataNodeId());
    }

    public void collectPipeMetaList(TDataNodeHeartbeatResp resp) throws TException {
        if (!this.tryReadLockWithTimeOut(10L)) {
            return;
        }
        try {
            this.collectPipeMetaListInternal(resp);
        }
        finally {
            this.releaseReadLock();
        }
    }

    private void collectPipeMetaListInternal(TDataNodeHeartbeatResp resp) throws TException {
        if (PipeDataNodeAgent.runtime().isShutdown()) {
            return;
        }
        Set dataRegionIds = StorageEngine.getInstance().getAllDataRegionIds().stream().map(ConsensusGroupId::getId).collect(Collectors.toSet());
        ArrayList<ByteBuffer> pipeMetaBinaryList = new ArrayList<ByteBuffer>();
        ArrayList<Boolean> pipeCompletedList = new ArrayList<Boolean>();
        ArrayList<Long> pipeRemainingEventCountList = new ArrayList<Long>();
        ArrayList<Double> pipeRemainingTimeList = new ArrayList<Double>();
        try {
            Optional logger = PipeDataNodeResourceManager.log().schedule(PipeDataNodeTaskAgent.class, PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(), PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(), this.pipeMetaKeeper.getPipeMetaCount());
            for (PipeMeta pipeMeta : this.pipeMetaKeeper.getPipeMetaList()) {
                pipeMetaBinaryList.add(pipeMeta.serialize());
                PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
                Map pipeTaskMap = this.pipeTaskManager.getPipeTasks(staticMeta);
                boolean isAllDataRegionCompleted = pipeTaskMap == null || pipeTaskMap.entrySet().stream().filter(entry -> dataRegionIds.contains(entry.getKey())).allMatch(entry -> ((PipeDataNodeTask)entry.getValue()).isCompleted());
                String extractorModeValue = pipeMeta.getStaticMeta().getExtractorParameters().getStringOrDefault(Arrays.asList("extractor.mode", "source.mode"), "live");
                boolean includeDataAndNeedDrop = (Boolean)DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(pipeMeta.getStaticMeta().getExtractorParameters()).getLeft() != false && (extractorModeValue.equalsIgnoreCase("query") || extractorModeValue.equalsIgnoreCase("snapshot"));
                boolean isCompleted = isAllDataRegionCompleted && includeDataAndNeedDrop;
                Pair<Long, Double> remainingEventAndTime = PipeDataNodeRemainingEventAndTimeMetrics.getInstance().getRemainingEventAndTime(staticMeta.getPipeName(), staticMeta.getCreationTime());
                pipeCompletedList.add(isCompleted);
                pipeRemainingEventCountList.add((Long)remainingEventAndTime.getLeft());
                pipeRemainingTimeList.add((Double)remainingEventAndTime.getRight());
                logger.ifPresent(l -> l.info("Reporting pipe meta: {}, isCompleted: {}, remainingEventCount: {}, estimatedRemainingTime: {}", new Object[]{pipeMeta.coreReportMessage(), isCompleted, remainingEventAndTime.getLeft(), remainingEventAndTime.getRight()}));
            }
            LOGGER.info("Reported {} pipe metas.", (Object)pipeMetaBinaryList.size());
        }
        catch (IOException | IllegalPathException e) {
            throw new TException(e);
        }
        resp.setPipeMetaList(pipeMetaBinaryList);
        resp.setPipeCompletedList(pipeCompletedList);
        resp.setPipeRemainingEventCountList(pipeRemainingEventCountList);
        resp.setPipeRemainingTimeList(pipeRemainingTimeList);
        PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
    }

    protected void collectPipeMetaListInternal(TPipeHeartbeatReq req, TPipeHeartbeatResp resp) throws TException {
        if (PipeDataNodeAgent.runtime().isShutdown()) {
            return;
        }
        LOGGER.info("Received pipe heartbeat request {} from config node.", (Object)req.heartbeatId);
        Set dataRegionIds = StorageEngine.getInstance().getAllDataRegionIds().stream().map(ConsensusGroupId::getId).collect(Collectors.toSet());
        ArrayList<ByteBuffer> pipeMetaBinaryList = new ArrayList<ByteBuffer>();
        ArrayList<Boolean> pipeCompletedList = new ArrayList<Boolean>();
        ArrayList<Long> pipeRemainingEventCountList = new ArrayList<Long>();
        ArrayList<Double> pipeRemainingTimeList = new ArrayList<Double>();
        try {
            Optional logger = PipeDataNodeResourceManager.log().schedule(PipeDataNodeTaskAgent.class, PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(), PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(), this.pipeMetaKeeper.getPipeMetaCount());
            for (PipeMeta pipeMeta : this.pipeMetaKeeper.getPipeMetaList()) {
                pipeMetaBinaryList.add(pipeMeta.serialize());
                PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
                Map pipeTaskMap = this.pipeTaskManager.getPipeTasks(staticMeta);
                boolean isAllDataRegionCompleted = pipeTaskMap == null || pipeTaskMap.entrySet().stream().filter(entry -> dataRegionIds.contains(entry.getKey())).allMatch(entry -> ((PipeDataNodeTask)entry.getValue()).isCompleted());
                String extractorModeValue = pipeMeta.getStaticMeta().getExtractorParameters().getStringOrDefault(Arrays.asList("extractor.mode", "source.mode"), "live");
                boolean includeDataAndNeedDrop = (Boolean)DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(pipeMeta.getStaticMeta().getExtractorParameters()).getLeft() != false && (extractorModeValue.equalsIgnoreCase("query") || extractorModeValue.equalsIgnoreCase("snapshot"));
                boolean isCompleted = isAllDataRegionCompleted && includeDataAndNeedDrop;
                Pair<Long, Double> remainingEventAndTime = PipeDataNodeRemainingEventAndTimeMetrics.getInstance().getRemainingEventAndTime(staticMeta.getPipeName(), staticMeta.getCreationTime());
                pipeCompletedList.add(isCompleted);
                pipeRemainingEventCountList.add((Long)remainingEventAndTime.getLeft());
                pipeRemainingTimeList.add((Double)remainingEventAndTime.getRight());
                logger.ifPresent(l -> l.info("Reporting pipe meta: {}, isCompleted: {}, remainingEventCount: {}, estimatedRemainingTime: {}", new Object[]{pipeMeta.coreReportMessage(), isCompleted, remainingEventAndTime.getLeft(), remainingEventAndTime.getRight()}));
            }
            LOGGER.info("Reported {} pipe metas.", (Object)pipeMetaBinaryList.size());
        }
        catch (IOException | IllegalPathException e) {
            throw new TException(e);
        }
        resp.setPipeMetaList(pipeMetaBinaryList);
        resp.setPipeCompletedList(pipeCompletedList);
        resp.setPipeRemainingEventCountList(pipeRemainingEventCountList);
        resp.setPipeRemainingTimeList(pipeRemainingTimeList);
        PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void restartAllStuckPipes() {
        Set<PipeMeta> stuckPipes;
        List<String> removedPipeName = this.removeOutdatedPipeInfoFromLastRestartTimeMap();
        if (!removedPipeName.isEmpty()) {
            long currentTime = System.currentTimeMillis();
            LOGGER.info("Pipes {} now can dynamically adjust their extraction strategies. Start to flush storage engine to trigger the adjustment.", removedPipeName);
            StorageEngine.getInstance().syncCloseAllProcessor();
            WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
            LOGGER.info("Finished flushing storage engine, time cost: {} ms.", (Object)(System.currentTimeMillis() - currentTime));
            LOGGER.info("Skipping restarting pipes this round because of the dynamic flushing.");
            return;
        }
        if (!this.tryWriteLockWithTimeOut(5L)) {
            return;
        }
        try {
            stuckPipes = this.findAllStuckPipes();
        }
        finally {
            this.releaseWriteLock();
        }
        stuckPipes.removeIf(pipeMeta -> {
            AtomicLong lastRestartTime = PIPE_NAME_TO_LAST_RESTART_TIME_MAP.get(pipeMeta.getStaticMeta().getPipeName());
            return lastRestartTime != null && System.currentTimeMillis() - lastRestartTime.get() < PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs();
        });
        stuckPipes.forEach(this::restartStuckPipe);
    }

    private List<String> removeOutdatedPipeInfoFromLastRestartTimeMap() {
        ArrayList<String> removedPipeName = new ArrayList<String>();
        PIPE_NAME_TO_LAST_RESTART_TIME_MAP.entrySet().removeIf(entry -> {
            boolean shouldRemove;
            AtomicLong lastRestartTime = (AtomicLong)entry.getValue();
            boolean bl = shouldRemove = lastRestartTime == null || PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs() <= System.currentTimeMillis() - lastRestartTime.get();
            if (shouldRemove) {
                removedPipeName.add((String)entry.getKey());
            }
            return shouldRemove;
        });
        return removedPipeName;
    }

    private Set<PipeMeta> findAllStuckPipes() {
        long totalFloatingMemorySizeInBytes;
        HashSet<PipeMeta> stuckPipes = new HashSet<PipeMeta>();
        if (System.currentTimeMillis() - LAST_FORCED_RESTART_TIME.get() > PipeConfig.getInstance().getPipeSubtaskExecutorForcedRestartIntervalMs()) {
            LAST_FORCED_RESTART_TIME.set(System.currentTimeMillis());
            for (PipeMeta pipeMeta : this.pipeMetaKeeper.getPipeMetaList()) {
                stuckPipes.add(pipeMeta);
            }
            if (!stuckPipes.isEmpty()) {
                LOGGER.warn("All {} pipe(s) will be restarted because of forced restart policy.", (Object)stuckPipes.size());
            }
            return stuckPipes;
        }
        long totalLinkedButDeletedTsFileResourceRamSize = PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsFileResourceRamSize();
        long totalInsertNodeFloatingMemoryUsageInBytes = this.getAllFloatingMemoryUsageInByte();
        if (totalInsertNodeFloatingMemoryUsageInBytes + totalLinkedButDeletedTsFileResourceRamSize >= (totalFloatingMemorySizeInBytes = PipeMemoryManager.getTotalFloatingMemorySizeInBytes())) {
            for (PipeMeta pipeMeta : this.pipeMetaKeeper.getPipeMetaList()) {
                stuckPipes.add(pipeMeta);
            }
            if (!stuckPipes.isEmpty()) {
                LOGGER.warn("All {} pipe(s) will be restarted because linked but deleted tsFiles' resource size {} and all insertNode's size {} exceeds limit {}.", new Object[]{stuckPipes.size(), totalLinkedButDeletedTsFileResourceRamSize, totalInsertNodeFloatingMemoryUsageInBytes, totalFloatingMemorySizeInBytes});
            }
            return stuckPipes;
        }
        Map<String, IoTDBDataRegionExtractor> taskId2ExtractorMap = PipeDataRegionExtractorMetrics.getInstance().getExtractorMap();
        for (PipeMeta pipeMeta : this.pipeMetaKeeper.getPipeMetaList()) {
            String pipeName = pipeMeta.getStaticMeta().getPipeName();
            List extractors = taskId2ExtractorMap.values().stream().filter(e -> e.getPipeName().equals(pipeName) && e.shouldExtractInsertion()).collect(Collectors.toList());
            if (extractors.isEmpty()) continue;
            if ((CONFIG.isEnableSeqSpaceCompaction() || CONFIG.isEnableUnseqSpaceCompaction() || CONFIG.isEnableCrossSpaceCompaction()) && this.mayDeletedTsFileSizeReachDangerousThreshold()) {
                LOGGER.warn("Pipe {} needs to restart because too many TsFiles are out-of-date.", (Object)pipeMeta.getStaticMeta());
                stuckPipes.add(pipeMeta);
                continue;
            }
            if (!((IoTDBDataRegionExtractor)((Object)extractors.get(0))).isStreamMode() || !extractors.stream().anyMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles) || !this.mayMemTablePinnedCountReachDangerousThreshold() && !this.mayWalSizeReachThrottleThreshold()) continue;
            LOGGER.warn("Pipe {} needs to restart because too many memtables are pinned. mayMemTablePinnedCountReachDangerousThreshold: {}, mayWalSizeReachThrottleThreshold: {}", new Object[]{pipeMeta.getStaticMeta(), this.mayMemTablePinnedCountReachDangerousThreshold(), this.mayWalSizeReachThrottleThreshold()});
            stuckPipes.add(pipeMeta);
        }
        return stuckPipes;
    }

    private boolean mayDeletedTsFileSizeReachDangerousThreshold() {
        try {
            long linkedButDeletedTsFileSize = PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsfileSize();
            double totalDisk = MetricService.getInstance().getAutoGauge(SystemMetric.SYS_DISK_TOTAL_SPACE.toString(), MetricLevel.CORE, new String[]{Tag.NAME.toString(), "system"}).getValue();
            return linkedButDeletedTsFileSize > 0L && totalDisk > 0.0 && (double)linkedButDeletedTsFileSize > (double)PipeConfig.getInstance().getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage() * totalDisk;
        }
        catch (Exception e) {
            LOGGER.warn("Failed to judge if deleted TsFile size reaches dangerous threshold.", (Throwable)e);
            return false;
        }
    }

    private boolean mayMemTablePinnedCountReachDangerousThreshold() {
        return PipeDataNodeResourceManager.wal().getPinnedWalCount() >= 5 * PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount() * StorageEngine.getInstance().getDataRegionNumber();
    }

    private boolean mayWalSizeReachThrottleThreshold() {
        return 3L * WALManager.getInstance().getTotalDiskUsage() > 2L * CONFIG.getThrottleThreshold();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restartStuckPipe(PipeMeta pipeMeta) {
        LOGGER.warn("Pipe {} will be restarted because it is stuck or has encountered issues such as data backlog or being stopped for too long.", (Object)pipeMeta.getStaticMeta());
        this.acquireWriteLock();
        try {
            long startTime = System.currentTimeMillis();
            PipeMeta originalPipeMeta = pipeMeta.deepCopy4TaskAgent();
            this.handleDropPipe(pipeMeta.getStaticMeta().getPipeName());
            long restartTime = System.currentTimeMillis();
            PIPE_NAME_TO_LAST_RESTART_TIME_MAP.computeIfAbsent(pipeMeta.getStaticMeta().getPipeName(), k -> new AtomicLong(restartTime)).set(restartTime);
            this.handleSinglePipeMetaChanges(originalPipeMeta);
            LOGGER.warn("Pipe {} was restarted because of stuck or data backlog, time cost: {} ms.", (Object)originalPipeMeta.getStaticMeta(), (Object)(System.currentTimeMillis() - startTime));
        }
        catch (Exception e) {
            LOGGER.warn("Failed to restart stuck pipe {}.", (Object)pipeMeta.getStaticMeta(), (Object)e);
        }
        finally {
            this.releaseWriteLock();
        }
    }

    public boolean isPipeTaskCurrentlyRestarted(String pipeName) {
        return PIPE_NAME_TO_LAST_RESTART_TIME_MAP.containsKey(pipeName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void markCompleted(String pipeName, int regionId) {
        this.acquireWriteLock();
        try {
            PipeDataNodeTask pipeDataNodeTask;
            if (this.pipeMetaKeeper.containsPipeMeta(pipeName) && Objects.nonNull(pipeDataNodeTask = (PipeDataNodeTask)this.pipeTaskManager.getPipeTask(this.pipeMetaKeeper.getPipeMeta(pipeName).getStaticMeta(), regionId))) {
                pipeDataNodeTask.markCompleted();
            }
        }
        finally {
            this.releaseWriteLock();
        }
    }

    public Set<Integer> getPipeTaskRegionIdSet(String pipeName, long creationTime) {
        PipeMeta pipeMeta = this.pipeMetaKeeper.getPipeMeta(pipeName);
        return pipeMeta == null || pipeMeta.getStaticMeta().getCreationTime() != creationTime ? Collections.emptySet() : pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap().keySet();
    }

    public boolean hasPipeReleaseRegionRelatedResource(int consensusGroupId) {
        if (!this.tryReadLockWithTimeOut(10L)) {
            LOGGER.warn("Failed to check if pipe has release region related resource with consensus group id: {}.", (Object)consensusGroupId);
            return false;
        }
        try {
            boolean bl = !this.pipeTaskManager.hasPipeTaskInConsensusGroup(consensusGroupId);
            return bl;
        }
        finally {
            this.releaseReadLock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ProgressIndex getPipeTaskProgressIndex(String pipeName, int consensusGroupId) {
        if (!this.tryReadLockWithTimeOut(10L)) {
            throw new PipeException(String.format("Failed to get pipe task progress index with pipe name: %s, consensus group id %s.", pipeName, consensusGroupId));
        }
        try {
            if (!this.pipeMetaKeeper.containsPipeMeta(pipeName)) {
                throw new PipeException("Pipe meta not found: " + pipeName);
            }
            ProgressIndex progressIndex = ((PipeTaskMeta)this.pipeMetaKeeper.getPipeMeta(pipeName).getRuntimeMeta().getConsensusGroupId2TaskMetaMap().get(consensusGroupId)).getProgressIndex();
            return progressIndex;
        }
        finally {
            this.releaseReadLock();
        }
    }

    public Map<ConsensusPipeName, PipeStatus> getAllConsensusPipe() {
        if (!this.tryReadLockWithTimeOut(10L)) {
            throw new PipeException("Failed to get all consensus pipe.");
        }
        try {
            Map map = (Map)StreamSupport.stream(this.pipeMetaKeeper.getPipeMetaList().spliterator(), false).filter(pipeMeta -> PipeType.CONSENSUS.equals((Object)pipeMeta.getStaticMeta().getPipeType())).collect(ImmutableMap.toImmutableMap(pipeMeta -> new ConsensusPipeName(pipeMeta.getStaticMeta().getPipeName()), pipeMeta -> (PipeStatus)pipeMeta.getRuntimeMeta().getStatus().get()));
            return map;
        }
        finally {
            this.releaseReadLock();
        }
    }
}

