/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.storageengine.dataregion.compaction.schedule;

import com.google.common.util.concurrent.RateLimiter;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskStatus;
import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskQueue;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionWorker;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.comparator.DefaultCompactionTaskComparatorImpl;
import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CompactionTaskManager
implements IService {
    private static final Logger logger = LoggerFactory.getLogger((String)"COMPACTION");
    private static final long MAX_WAITING_TIME = 120000L;
    private static final CompactionTaskManager INSTANCE = new CompactionTaskManager();
    private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
    private WrappedThreadPoolExecutor taskExecutionPool;
    private volatile boolean stopAllCompactionWorker = false;
    private WrappedThreadPoolExecutor subCompactionTaskExecutionPool;
    private final FixedPriorityBlockingQueue<AbstractCompactionTask> candidateCompactionTaskQueue = new CompactionTaskQueue(this.config.getCandidateCompactionTaskQueueSize(), new DefaultCompactionTaskComparatorImpl());
    private final Map<String, Map<AbstractCompactionTask, Future<CompactionTaskSummary>>> storageGroupTasks = new ConcurrentHashMap<String, Map<AbstractCompactionTask, Future<CompactionTaskSummary>>>();
    private final AtomicInteger finishedTaskNum = new AtomicInteger(0);
    private final RateLimiter mergeWriteRateLimiter = RateLimiter.create((double)(this.config.getCompactionWriteThroughputMbPerSec() <= 0 ? Double.MAX_VALUE : (double)this.config.getCompactionWriteThroughputMbPerSec() * 1024.0 * 1024.0));
    private final RateLimiter compactionReadOperationRateLimiter = RateLimiter.create((double)(this.config.getCompactionReadOperationPerSec() <= 0 ? Double.MAX_VALUE : (double)this.config.getCompactionReadOperationPerSec()));
    private final RateLimiter compactionReadThroughputRateLimiter = RateLimiter.create((double)(this.config.getCompactionReadThroughputMbPerSec() <= 0 ? Double.MAX_VALUE : (double)this.config.getCompactionReadThroughputMbPerSec() * 1024.0 * 1024.0));
    private volatile boolean init = false;
    private AtomicLong compactionConfigVersion = new AtomicLong(0L);

    public static CompactionTaskManager getInstance() {
        return INSTANCE;
    }

    public boolean isStopAllCompactionWorker() {
        return this.stopAllCompactionWorker;
    }

    public long getCurrentCompactionConfigVersion() {
        return this.compactionConfigVersion.get();
    }

    public void incrCompactionConfigVersion() {
        this.compactionConfigVersion.incrementAndGet();
    }

    public synchronized void start() {
        if (!this.init) {
            this.initThreadPool();
            this.candidateCompactionTaskQueue.regsitPollLastHook(AbstractCompactionTask::resetCompactionCandidateStatusForAllSourceFiles);
            this.candidateCompactionTaskQueue.regsitPollLastHook(AbstractCompactionTask::handleTaskCleanup);
            this.init = true;
        }
        logger.info("Compaction task manager started.");
    }

    public boolean isInit() {
        return this.init;
    }

    private void initThreadPool() {
        int compactionThreadNum = IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount();
        this.taskExecutionPool = (WrappedThreadPoolExecutor)IoTDBThreadPoolFactory.newFixedThreadPool((int)compactionThreadNum, (String)ThreadName.COMPACTION_WORKER.getName());
        this.taskExecutionPool.disableErrorLog();
        this.subCompactionTaskExecutionPool = (WrappedThreadPoolExecutor)IoTDBThreadPoolFactory.newFixedThreadPool((int)(compactionThreadNum * IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum()), (String)ThreadName.COMPACTION_SUB_TASK.getName());
        this.subCompactionTaskExecutionPool.disableErrorLog();
        for (int i = 0; i < compactionThreadNum; ++i) {
            this.taskExecutionPool.submit((Runnable)new CompactionWorker(i, this.candidateCompactionTaskQueue));
        }
    }

    public void stop() {
        this.stopAllCompactionWorker = true;
        if (this.taskExecutionPool != null) {
            this.subCompactionTaskExecutionPool.shutdownNow();
            this.taskExecutionPool.shutdownNow();
            logger.info("Waiting for task taskExecutionPool to shut down");
            this.waitTermination();
            this.storageGroupTasks.clear();
            this.candidateCompactionTaskQueue.clear();
        }
    }

    public void waitAndStop(long milliseconds) {
        this.stopAllCompactionWorker = true;
        if (this.taskExecutionPool != null) {
            this.awaitTermination((ExecutorService)this.subCompactionTaskExecutionPool, milliseconds);
            this.awaitTermination((ExecutorService)this.taskExecutionPool, milliseconds);
            logger.info("Waiting for task taskExecutionPool to shut down in {} ms", (Object)milliseconds);
            this.waitTermination();
            this.storageGroupTasks.clear();
        }
    }

    public void waitAllCompactionFinish() {
        if (this.taskExecutionPool != null) {
            WrappedThreadPoolExecutor tmpThreadPool = this.taskExecutionPool;
            this.taskExecutionPool = null;
            this.candidateCompactionTaskQueue.clear();
            while (true) {
                int totalSize = 0;
                for (Map<AbstractCompactionTask, Future<CompactionTaskSummary>> taskMap : this.storageGroupTasks.values()) {
                    totalSize += taskMap.size();
                }
                if (totalSize <= 0) break;
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e) {
                    logger.error("Interrupted when waiting all task finish", (Throwable)e);
                    break;
                }
            }
            this.storageGroupTasks.clear();
            this.taskExecutionPool = tmpThreadPool;
            logger.info("All compaction task finish");
        }
    }

    private void waitTermination() {
        long startTime = System.currentTimeMillis();
        int timeMillis = 0;
        while (!this.subCompactionTaskExecutionPool.isTerminated() || !this.taskExecutionPool.isTerminated()) {
            try {
                Thread.sleep(200L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            long time = System.currentTimeMillis() - startTime;
            if ((timeMillis += 200) % 60000 != 0) continue;
            logger.info("CompactionManager has wait for {} seconds to stop", (Object)(time / 1000L));
        }
        this.taskExecutionPool = null;
        this.subCompactionTaskExecutionPool = null;
        this.init = false;
        this.storageGroupTasks.clear();
        logger.info("CompactionManager stopped");
    }

    private void awaitTermination(ExecutorService service, long milliseconds) {
        try {
            service.shutdownNow();
            service.awaitTermination(milliseconds, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            logger.warn("CompactionThreadPool can not be closed in {} ms", (Object)milliseconds);
            Thread.currentThread().interrupt();
        }
        service.shutdownNow();
    }

    public ServiceType getID() {
        return ServiceType.COMPACTION_SERVICE;
    }

    public boolean shouldSelectCrossSpaceCompactionTask() {
        int waitingQueueRestSize = this.candidateCompactionTaskQueue.getMaxSize() - this.candidateCompactionTaskQueue.size();
        return 5 * waitingQueueRestSize >= this.candidateCompactionTaskQueue.size();
    }

    public boolean isWaitingQueueFull() {
        return this.candidateCompactionTaskQueue.size() == this.candidateCompactionTaskQueue.getMaxSize();
    }

    public synchronized boolean addTaskToWaitingQueue(AbstractCompactionTask compactionTask) throws InterruptedException {
        if (this.init && !this.candidateCompactionTaskQueue.contains(compactionTask) && !this.isTaskRunning(compactionTask) && compactionTask.setSourceFilesToCompactionCandidate() && compactionTask.getCompactionConfigVersion() >= this.getCurrentCompactionConfigVersion()) {
            this.candidateCompactionTaskQueue.put(compactionTask);
            return true;
        }
        return false;
    }

    private boolean isTaskRunning(AbstractCompactionTask task) {
        String regionWithSG = CompactionTaskManager.getSgWithRegionId(task.getStorageGroupName(), task.getDataRegionId());
        return this.storageGroupTasks.computeIfAbsent(regionWithSG, x -> new ConcurrentHashMap()).containsKey(task);
    }

    public RateLimiter getMergeWriteRateLimiter() {
        return this.mergeWriteRateLimiter;
    }

    public RateLimiter getCompactionReadRateLimiter() {
        return this.compactionReadThroughputRateLimiter;
    }

    public RateLimiter getCompactionReadOperationRateLimiter() {
        return this.compactionReadOperationRateLimiter;
    }

    public void setWriteMergeRate(double throughputMbPerSec) {
        this.setRate(this.mergeWriteRateLimiter, throughputMbPerSec * 1024.0 * 1024.0);
    }

    public void setCompactionReadOperationRate(double readOperationPerSec) {
        this.setRate(this.compactionReadOperationRateLimiter, readOperationPerSec);
    }

    public void setCompactionReadThroughputRate(double throughputMbPerSec) {
        this.setRate(this.compactionReadThroughputRateLimiter, throughputMbPerSec * 1024.0 * 1024.0);
    }

    private void setRate(RateLimiter rateLimiter, double rate) {
        if (rate <= 0.0) {
            rate = Double.MAX_VALUE;
        }
        if (Math.abs(rateLimiter.getRate() - rate) > 1.0E-4) {
            rateLimiter.setRate(rate);
        }
    }

    public synchronized void removeRunningTaskFuture(AbstractCompactionTask task) {
        String regionWithSG = CompactionTaskManager.getSgWithRegionId(task.getStorageGroupName(), task.getDataRegionId());
        if (this.storageGroupTasks.containsKey(regionWithSG)) {
            this.storageGroupTasks.get(regionWithSG).remove(task);
        }
        this.finishedTaskNum.incrementAndGet();
    }

    public synchronized Future<Void> submitSubTask(Callable<Void> subCompactionTask) throws InterruptedException {
        if (this.subCompactionTaskExecutionPool != null && !this.subCompactionTaskExecutionPool.isShutdown()) {
            return this.subCompactionTaskExecutionPool.submit(subCompactionTask);
        }
        throw new InterruptedException();
    }

    public synchronized List<AbstractCompactionTask> abortCompaction(String storageGroupName) {
        ArrayList<AbstractCompactionTask> compactionTaskOfCurSG = new ArrayList<AbstractCompactionTask>();
        if (this.storageGroupTasks.containsKey(storageGroupName)) {
            for (Map.Entry<AbstractCompactionTask, Future<CompactionTaskSummary>> compactionTaskEntry : this.storageGroupTasks.get(storageGroupName).entrySet()) {
                compactionTaskEntry.getValue().cancel(true);
                compactionTaskOfCurSG.add(compactionTaskEntry.getKey());
            }
        }
        this.storageGroupTasks.remove(storageGroupName);
        this.candidateCompactionTaskQueue.clear();
        return compactionTaskOfCurSG;
    }

    public boolean isAnyTaskInListStillRunning(List<AbstractCompactionTask> compactionTasks) {
        boolean anyTaskRunning = false;
        for (AbstractCompactionTask task : compactionTasks) {
            anyTaskRunning = anyTaskRunning || task.isTaskRan() && !task.isTaskFinished();
        }
        return anyTaskRunning;
    }

    public int getExecutingTaskCount() {
        int runningTaskCnt = 0;
        for (Map<AbstractCompactionTask, Future<CompactionTaskSummary>> runningTaskMap : this.storageGroupTasks.values()) {
            runningTaskCnt += runningTaskMap.size();
        }
        return runningTaskCnt;
    }

    public int getTotalTaskCount() {
        return this.getExecutingTaskCount() + this.candidateCompactionTaskQueue.size();
    }

    public int getCompactionCandidateTaskCount() {
        return this.candidateCompactionTaskQueue.size();
    }

    public synchronized List<AbstractCompactionTask> getRunningCompactionTaskList() {
        ArrayList<AbstractCompactionTask> tasks = new ArrayList<AbstractCompactionTask>();
        for (Map<AbstractCompactionTask, Future<CompactionTaskSummary>> runningTaskMap : this.storageGroupTasks.values()) {
            tasks.addAll(runningTaskMap.keySet());
        }
        return tasks;
    }

    public long getFinishedTaskNum() {
        return this.finishedTaskNum.get();
    }

    public void recordTask(AbstractCompactionTask task, Future<CompactionTaskSummary> summary) {
        this.storageGroupTasks.computeIfAbsent(CompactionTaskManager.getSgWithRegionId(task.getStorageGroupName(), task.getDataRegionId()), x -> new ConcurrentHashMap()).put(task, summary);
    }

    private void getWaitingTaskStatus(Map<CompactionTaskType, Map<CompactionTaskStatus, Integer>> statistic) {
        List<AbstractCompactionTask> waitingTaskList = this.candidateCompactionTaskQueue.getAllElementAsList();
        for (AbstractCompactionTask task : waitingTaskList) {
            statistic.computeIfAbsent(task.getCompactionTaskType(), x -> new EnumMap(CompactionTaskStatus.class)).compute(CompactionTaskStatus.WAITING, (k, v) -> v == null ? 1 : v + 1);
        }
    }

    private void getRunningTaskStatus(Map<CompactionTaskType, Map<CompactionTaskStatus, Integer>> statistic) {
        List<AbstractCompactionTask> runningTaskList = this.getRunningCompactionTaskList();
        for (AbstractCompactionTask task : runningTaskList) {
            statistic.computeIfAbsent(task.getCompactionTaskType(), x -> new EnumMap(CompactionTaskStatus.class)).compute(CompactionTaskStatus.RUNNING, (k, v) -> v == null ? 1 : v + 1);
        }
    }

    public Map<CompactionTaskType, Map<CompactionTaskStatus, Integer>> getCompactionTaskStatistic() {
        EnumMap<CompactionTaskType, Map<CompactionTaskStatus, Integer>> statistic = new EnumMap<CompactionTaskType, Map<CompactionTaskStatus, Integer>>(CompactionTaskType.class);
        this.getWaitingTaskStatus(statistic);
        this.getRunningTaskStatus(statistic);
        return statistic;
    }

    public static String getSgWithRegionId(String storageGroupName, String dataRegionId) {
        return storageGroupName + "-" + dataRegionId;
    }

    public void restart() throws InterruptedException {
        this.stopAllCompactionWorker = true;
        if (IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount() > 0) {
            if (this.subCompactionTaskExecutionPool != null) {
                this.subCompactionTaskExecutionPool.shutdownNow();
                if (!this.subCompactionTaskExecutionPool.awaitTermination(120000L, TimeUnit.MILLISECONDS)) {
                    throw new InterruptedException("Has been waiting over 120 seconds for all sub compaction tasks to finish.");
                }
            }
            if (this.taskExecutionPool != null) {
                this.taskExecutionPool.shutdownNow();
                if (!this.taskExecutionPool.awaitTermination(120000L, TimeUnit.MILLISECONDS)) {
                    throw new InterruptedException("Has been waiting over 120 seconds for all compaction tasks to finish.");
                }
            }
            this.initThreadPool();
            this.finishedTaskNum.set(0);
            this.candidateCompactionTaskQueue.clear();
            this.init = true;
        }
        this.init = true;
        this.stopAllCompactionWorker = false;
        logger.info("Compaction task manager started.");
    }

    @TestOnly
    public void clearCandidateQueue() {
        this.candidateCompactionTaskQueue.clear();
    }

    @TestOnly
    public Future<CompactionTaskSummary> getCompactionTaskFutureMayBlock(AbstractCompactionTask task) throws InterruptedException, TimeoutException {
        String regionWithSG = CompactionTaskManager.getSgWithRegionId(task.getStorageGroupName(), task.getDataRegionId());
        long startTime = System.currentTimeMillis();
        while (!this.storageGroupTasks.containsKey(regionWithSG) || !this.storageGroupTasks.get(regionWithSG).containsKey(task)) {
            Thread.sleep(10L);
            if (System.currentTimeMillis() - startTime <= 20000L) continue;
            throw new TimeoutException("Timeout when waiting for task future");
        }
        return this.storageGroupTasks.get(regionWithSG).get(task);
    }
}

