/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.ThreadInfoSample;
import org.apache.flink.runtime.util.JvmUtils;
import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;

class ThreadInfoSampleService
implements Closeable {
    private final ScheduledExecutorService scheduledExecutor;

    ThreadInfoSampleService(ScheduledExecutorService scheduledExecutor) {
        this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor, "scheduledExecutor must not be null");
    }

    public CompletableFuture<Map<ExecutionAttemptID, Collection<ThreadInfoSample>>> requestThreadInfoSamples(Map<Long, ExecutionAttemptID> threads, ThreadInfoSamplesRequest requestParams) {
        Preconditions.checkNotNull(threads, "threads must not be null");
        Preconditions.checkNotNull(requestParams, "requestParams must not be null");
        CompletableFuture<Map<ExecutionAttemptID, Collection<ThreadInfoSample>>> resultFuture = new CompletableFuture<Map<ExecutionAttemptID, Collection<ThreadInfoSample>>>();
        this.scheduledExecutor.execute(() -> this.requestThreadInfoSamples(threads, requestParams.getNumSamples(), requestParams.getDelayBetweenSamples(), requestParams.getMaxStackTraceDepth(), CollectionUtil.newHashMapWithExpectedSize(threads.size()), resultFuture));
        return resultFuture;
    }

    private void requestThreadInfoSamples(Map<Long, ExecutionAttemptID> threads, int numSamples, Duration delayBetweenSamples, int maxStackTraceDepth, Map<ExecutionAttemptID, Collection<ThreadInfoSample>> currentTraces, CompletableFuture<Map<ExecutionAttemptID, Collection<ThreadInfoSample>>> resultFuture) {
        Map<Long, ThreadInfoSample> threadInfoSample = JvmUtils.createThreadInfoSample(threads.keySet(), maxStackTraceDepth);
        if (!threadInfoSample.isEmpty()) {
            for (Map.Entry<Long, ThreadInfoSample> entry : threadInfoSample.entrySet()) {
                ExecutionAttemptID executionAttemptID = threads.get(entry.getKey());
                Collection threadInfoSamples = currentTraces.computeIfAbsent(executionAttemptID, key -> new ArrayList());
                threadInfoSamples.add(entry.getValue());
            }
            if (numSamples > 1) {
                this.scheduledExecutor.schedule(() -> this.requestThreadInfoSamples(threads, numSamples - 1, delayBetweenSamples, maxStackTraceDepth, currentTraces, resultFuture), delayBetweenSamples.toMillis(), TimeUnit.MILLISECONDS);
            } else {
                resultFuture.complete(currentTraces);
            }
        } else if (!currentTraces.isEmpty()) {
            resultFuture.complete(currentTraces);
        } else {
            String ids = threads.values().stream().map(e -> e == null ? "unknown" : e.toString()).collect(Collectors.joining(", ", "[", "]"));
            resultFuture.completeExceptionally(new IllegalStateException(String.format("Cannot sample tasks %s. The tasks are not running.", ids)));
        }
    }

    @Override
    public void close() throws IOException {
        this.scheduledExecutor.shutdownNow();
    }
}

