/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.legacy.backpressure;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSample;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BackPressureStatsTracker {
    private static final Logger LOG = LoggerFactory.getLogger(BackPressureStatsTracker.class);
    static final int MAX_STACK_TRACE_DEPTH = 3;
    static final String EXPECTED_CLASS_NAME = "org.apache.flink.runtime.io.network.buffer.LocalBufferPool";
    static final String EXPECTED_METHOD_NAME = "requestBufferBlocking";
    private final Object lock = new Object();
    private final StackTraceSampleCoordinator coordinator;
    private final Cache<ExecutionJobVertex, OperatorBackPressureStats> operatorStatsCache;
    private final Set<ExecutionJobVertex> pendingStats = new HashSet<ExecutionJobVertex>();
    private final int cleanUpInterval;
    private final int numSamples;
    private final Time delayBetweenSamples;
    private boolean shutDown;

    public BackPressureStatsTracker(StackTraceSampleCoordinator coordinator, int cleanUpInterval, int numSamples, Time delayBetweenSamples) {
        this.coordinator = (StackTraceSampleCoordinator)Preconditions.checkNotNull((Object)coordinator, (String)"Stack trace sample coordinator");
        Preconditions.checkArgument((cleanUpInterval >= 0 ? 1 : 0) != 0, (Object)"Clean up interval");
        this.cleanUpInterval = cleanUpInterval;
        Preconditions.checkArgument((numSamples >= 1 ? 1 : 0) != 0, (Object)"Number of samples");
        this.numSamples = numSamples;
        this.delayBetweenSamples = (Time)Preconditions.checkNotNull((Object)delayBetweenSamples, (String)"Delay between samples");
        this.operatorStatsCache = CacheBuilder.newBuilder().concurrencyLevel(1).expireAfterAccess((long)cleanUpInterval, TimeUnit.MILLISECONDS).build();
    }

    public long getCleanUpInterval() {
        return this.cleanUpInterval;
    }

    public Optional<OperatorBackPressureStats> getOperatorBackPressureStats(ExecutionJobVertex vertex) {
        return Optional.ofNullable(this.operatorStatsCache.getIfPresent((Object)vertex));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean triggerStackTraceSample(ExecutionJobVertex vertex) {
        Object object = this.lock;
        synchronized (object) {
            Executor executor;
            if (this.shutDown) {
                return false;
            }
            if (!this.pendingStats.contains(vertex) && !vertex.getGraph().getState().isGloballyTerminalState() && (executor = vertex.getGraph().getFutureExecutor()) != null) {
                this.pendingStats.add(vertex);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
                }
                CompletableFuture<StackTraceSample> sample = this.coordinator.triggerStackTraceSample(vertex.getTaskVertices(), this.numSamples, this.delayBetweenSamples, 3);
                sample.handleAsync((BiFunction)new StackTraceSampleCompletionCallback(vertex), executor);
                return true;
            }
            return false;
        }
    }

    public void cleanUpOperatorStatsCache() {
        this.operatorStatsCache.cleanUp();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutDown() {
        Object object = this.lock;
        synchronized (object) {
            if (!this.shutDown) {
                this.operatorStatsCache.invalidateAll();
                this.pendingStats.clear();
                this.shutDown = true;
            }
        }
    }

    void invalidateOperatorStatsCache() {
        this.operatorStatsCache.invalidateAll();
    }

    class StackTraceSampleCompletionCallback
    implements BiFunction<StackTraceSample, Throwable, Void> {
        private final ExecutionJobVertex vertex;

        public StackTraceSampleCompletionCallback(ExecutionJobVertex vertex) {
            this.vertex = vertex;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void apply(StackTraceSample stackTraceSample, Throwable throwable) {
            Object object = BackPressureStatsTracker.this.lock;
            synchronized (object) {
                block15: {
                    if (!BackPressureStatsTracker.this.shutDown) break block15;
                    Void void_ = null;
                    BackPressureStatsTracker.this.pendingStats.remove(this.vertex);
                    return void_;
                }
                try {
                    JobStatus jobState = this.vertex.getGraph().getState();
                    if (jobState.isGloballyTerminalState()) {
                        LOG.debug("Ignoring sample, because job is in state " + (Object)((Object)jobState) + ".");
                    } else if (stackTraceSample != null) {
                        OperatorBackPressureStats stats = this.createStatsFromSample(stackTraceSample);
                        BackPressureStatsTracker.this.operatorStatsCache.put((Object)this.vertex, (Object)stats);
                    } else {
                        LOG.debug("Failed to gather stack trace sample.", throwable);
                    }
                    BackPressureStatsTracker.this.pendingStats.remove(this.vertex);
                }
                catch (Throwable t) {
                    try {
                        LOG.error("Error during stats completion.", t);
                    }
                    catch (Throwable throwable2) {
                        throw throwable2;
                    }
                    finally {
                        BackPressureStatsTracker.this.pendingStats.remove(this.vertex);
                    }
                }
                return null;
            }
        }

        private OperatorBackPressureStats createStatsFromSample(StackTraceSample sample) {
            Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = sample.getStackTraces();
            HashMap subtaskIndexMap = Maps.newHashMapWithExpectedSize((int)traces.size());
            Set<ExecutionAttemptID> sampledTasks = sample.getStackTraces().keySet();
            for (ExecutionVertex task : this.vertex.getTaskVertices()) {
                ExecutionAttemptID taskId = task.getCurrentExecutionAttempt().getAttemptId();
                if (sampledTasks.contains((Object)taskId)) {
                    subtaskIndexMap.put(taskId, task.getParallelSubtaskIndex());
                    continue;
                }
                LOG.debug("Outdated sample. A task, which is part of the sample has been reset.");
            }
            double[] backPressureRatio = new double[traces.size()];
            for (Map.Entry<ExecutionAttemptID, List<StackTraceElement[]>> entry : traces.entrySet()) {
                double ratio;
                int backPressureSamples = 0;
                List<StackTraceElement[]> taskTraces = entry.getValue();
                block2: for (StackTraceElement[] trace : taskTraces) {
                    for (int i = trace.length - 1; i >= 0; --i) {
                        StackTraceElement elem = trace[i];
                        if (!elem.getClassName().equals(BackPressureStatsTracker.EXPECTED_CLASS_NAME) || !elem.getMethodName().equals(BackPressureStatsTracker.EXPECTED_METHOD_NAME)) continue;
                        ++backPressureSamples;
                        continue block2;
                    }
                }
                int subtaskIndex = (Integer)subtaskIndexMap.get((Object)entry.getKey());
                int size = taskTraces.size();
                backPressureRatio[subtaskIndex] = ratio = size > 0 ? (double)backPressureSamples / (double)size : 0.0;
            }
            return new OperatorBackPressureStats(sample.getSampleId(), sample.getEndTime(), backPressureRatio);
        }
    }
}

