/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.legacy.metrics;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetricFetcherImpl<T extends RestfulGateway>
implements MetricFetcher {
    private static final Logger LOG = LoggerFactory.getLogger(MetricFetcherImpl.class);
    private final GatewayRetriever<T> retriever;
    private final MetricQueryServiceRetriever queryServiceRetriever;
    private final Executor executor;
    private final Time timeout;
    private final MetricStore metrics = new MetricStore();
    private final MetricDumpSerialization.MetricDumpDeserializer deserializer = new MetricDumpSerialization.MetricDumpDeserializer();
    private final long updateInterval;
    @GuardedBy(value="this")
    private long lastUpdateTime;
    @GuardedBy(value="this")
    private CompletableFuture<Void> fetchMetricsFuture = FutureUtils.completedVoidFuture();

    public MetricFetcherImpl(GatewayRetriever<T> retriever, MetricQueryServiceRetriever queryServiceRetriever, Executor executor, Time timeout, long updateInterval) {
        this.retriever = Preconditions.checkNotNull(retriever);
        this.queryServiceRetriever = Preconditions.checkNotNull(queryServiceRetriever);
        this.executor = Preconditions.checkNotNull(executor);
        this.timeout = Preconditions.checkNotNull(timeout);
        Preconditions.checkArgument(updateInterval > 0L, "The update interval must be larger than 0.");
        this.updateInterval = updateInterval;
    }

    @Override
    public MetricStore getMetricStore() {
        return this.metrics;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void update() {
        MetricFetcherImpl metricFetcherImpl = this;
        synchronized (metricFetcherImpl) {
            long currentTime = System.currentTimeMillis();
            if (currentTime - this.lastUpdateTime > this.updateInterval && this.fetchMetricsFuture.isDone()) {
                this.lastUpdateTime = currentTime;
                this.fetchMetricsFuture = this.fetchMetrics();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public long getLastUpdateTime() {
        MetricFetcherImpl metricFetcherImpl = this;
        synchronized (metricFetcherImpl) {
            return this.lastUpdateTime;
        }
    }

    private CompletableFuture<Void> fetchMetrics() {
        LOG.debug("Start fetching metrics.");
        try {
            Optional<T> optionalLeaderGateway = this.retriever.getNow();
            if (optionalLeaderGateway.isPresent()) {
                RestfulGateway leaderGateway = (RestfulGateway)optionalLeaderGateway.get();
                CompletableFuture<MultipleJobsDetails> jobDetailsFuture = leaderGateway.requestMultipleJobDetails(this.timeout);
                jobDetailsFuture.whenCompleteAsync((jobDetails, throwable) -> {
                    if (throwable != null) {
                        LOG.debug("Fetching of JobDetails failed.", throwable);
                    } else {
                        ArrayList<String> toRetain = new ArrayList<String>(jobDetails.getJobs().size());
                        for (JobDetails job : jobDetails.getJobs()) {
                            toRetain.add(job.getJobId().toString());
                        }
                        this.metrics.retainJobs(toRetain);
                        this.metrics.updateCurrentExecutionAttempts(jobDetails.getJobs());
                    }
                }, this.executor);
                ArrayList<CompletableFuture<Void>> waitingMetricsFutures = new ArrayList<CompletableFuture<Void>>();
                CompletableFuture<Void> jmMetricsFuture = this.queryJmMetricsFuture(leaderGateway);
                waitingMetricsFutures.add(jmMetricsFuture);
                jmMetricsFuture.whenCompleteAsync((ignore, throwable) -> {
                    if (throwable != null) {
                        LOG.debug("Failed to fetch the leader's metrics.", throwable);
                    }
                }, this.executor);
                CompletableFuture<Void> tmMetricsFuture = this.queryTmMetricsFuture(leaderGateway);
                waitingMetricsFutures.add(tmMetricsFuture);
                tmMetricsFuture.whenCompleteAsync((ignore, throwable) -> {
                    if (throwable != null) {
                        LOG.debug("Failed to fetch the TaskManager's metrics.", throwable);
                    }
                }, this.executor);
                return FutureUtils.waitForAll(waitingMetricsFutures);
            }
        }
        catch (Exception e) {
            LOG.debug("Exception while fetching metrics.", (Throwable)e);
            return FutureUtils.completedExceptionally(e);
        }
        return FutureUtils.completedVoidFuture();
    }

    private CompletableFuture<Void> queryJmMetricsFuture(T leaderGateway) {
        CompletableFuture<Collection<String>> queryServiceAddressesFuture = leaderGateway.requestMetricQueryServiceAddresses(this.timeout);
        return queryServiceAddressesFuture.thenComposeAsync(queryServiceAddresses -> {
            ArrayList<CompletableFuture<Void>> queryMetricFutures = new ArrayList<CompletableFuture<Void>>();
            for (String queryServiceAddress : queryServiceAddresses) {
                queryMetricFutures.add(this.retrieveAndQueryMetrics(queryServiceAddress));
            }
            return FutureUtils.waitForAll(queryMetricFutures);
        }, this.executor);
    }

    private CompletableFuture<Void> queryTmMetricsFuture(T leaderGateway) {
        CompletableFuture<Collection<Tuple2<ResourceID, String>>> taskManagerQueryServiceGatewaysFuture = leaderGateway.requestTaskManagerMetricQueryServiceAddresses(this.timeout);
        return taskManagerQueryServiceGatewaysFuture.thenComposeAsync(queryServiceGateways -> {
            ArrayList queryMetricFutures = new ArrayList();
            List<String> taskManagersToRetain = queryServiceGateways.stream().map(tuple -> {
                queryMetricFutures.add(this.queryServiceRetriever.retrieveService((String)tuple.f1).thenComposeAsync(this::queryMetrics, this.executor));
                return ((ResourceID)tuple.f0).getResourceIdString();
            }).collect(Collectors.toList());
            this.metrics.retainTaskManagers(taskManagersToRetain);
            return FutureUtils.waitForAll(queryMetricFutures);
        }, this.executor);
    }

    private CompletableFuture<Void> retrieveAndQueryMetrics(String queryServiceAddress) {
        LOG.debug("Retrieve metric query service gateway for {}", (Object)queryServiceAddress);
        CompletableFuture<MetricQueryServiceGateway> queryServiceGatewayFuture = this.queryServiceRetriever.retrieveService(queryServiceAddress);
        return queryServiceGatewayFuture.thenComposeAsync(this::queryMetrics, this.executor);
    }

    private CompletableFuture<Void> queryMetrics(MetricQueryServiceGateway queryServiceGateway) {
        LOG.debug("Query metrics for {}.", (Object)queryServiceGateway.getAddress());
        return queryServiceGateway.queryMetrics(this.timeout).thenComposeAsync(result -> {
            this.metrics.addAll(this.deserializer.deserialize((MetricDumpSerialization.MetricSerializationResult)result));
            return FutureUtils.completedVoidFuture();
        }, this.executor);
    }

    @Nonnull
    public static <T extends RestfulGateway> MetricFetcherImpl<T> fromConfiguration(Configuration configuration, MetricQueryServiceRetriever metricQueryServiceGatewayRetriever, GatewayRetriever<T> dispatcherGatewayRetriever, ExecutorService executor) {
        Time timeout = Time.fromDuration(configuration.get(WebOptions.TIMEOUT));
        long updateInterval = configuration.get(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL).toMillis();
        return new MetricFetcherImpl<T>(dispatcherGatewayRetriever, metricQueryServiceGatewayRetriever, executor, timeout, updateInterval);
    }
}

