/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.metrics;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.View;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
import org.apache.flink.runtime.akka.ActorUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.ViewUpdater;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetricRegistryImpl
implements MetricRegistry {
    static final Logger LOG = LoggerFactory.getLogger(MetricRegistryImpl.class);
    private final Object lock = new Object();
    private final List<MetricReporter> reporters;
    private final ScheduledExecutorService executor;
    private final ScopeFormats scopeFormats;
    private final char globalDelimiter;
    private final List<Character> delimiters;
    private final CompletableFuture<Void> terminationFuture;
    private final long maximumFramesize;
    @Nullable
    private ActorRef queryService;
    @Nullable
    private String metricQueryServicePath;
    private ViewUpdater viewUpdater;
    private boolean isShutdown;

    public MetricRegistryImpl(MetricRegistryConfiguration config) {
        this.maximumFramesize = config.getQueryServiceMessageSizeLimit();
        this.scopeFormats = config.getScopeFormats();
        this.globalDelimiter = config.getDelimiter();
        this.delimiters = new ArrayList<Character>(10);
        this.terminationFuture = new CompletableFuture();
        this.isShutdown = false;
        this.reporters = new ArrayList<MetricReporter>(4);
        List<Tuple2<String, Configuration>> reporterConfigurations = config.getReporterConfigurations();
        this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));
        this.queryService = null;
        this.metricQueryServicePath = null;
        if (reporterConfigurations.isEmpty()) {
            LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
        } else {
            for (Tuple2<String, Configuration> reporterConfiguration : reporterConfigurations) {
                String namedReporter = (String)reporterConfiguration.f0;
                Configuration reporterConfig = (Configuration)reporterConfiguration.f1;
                String className = reporterConfig.getString("class", null);
                if (className == null) {
                    LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported.");
                    continue;
                }
                try {
                    String configuredPeriod = reporterConfig.getString("interval", null);
                    TimeUnit timeunit = TimeUnit.SECONDS;
                    long period = 10L;
                    if (configuredPeriod != null) {
                        try {
                            String[] interval = configuredPeriod.split(" ");
                            period = Long.parseLong(interval[0]);
                            timeunit = TimeUnit.valueOf(interval[1]);
                        }
                        catch (Exception e) {
                            LOG.error("Cannot parse report interval from config: " + configuredPeriod + " - please use values like '10 SECONDS' or '500 MILLISECONDS'. Using default reporting interval.");
                        }
                    }
                    Class<?> reporterClass = Class.forName(className);
                    MetricReporter reporterInstance = (MetricReporter)reporterClass.newInstance();
                    MetricConfig metricConfig = new MetricConfig();
                    reporterConfig.addAllToProperties((Properties)metricConfig);
                    LOG.info("Configuring {} with {}.", (Object)namedReporter, (Object)metricConfig);
                    reporterInstance.open(metricConfig);
                    if (reporterInstance instanceof Scheduled) {
                        LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", new Object[]{period, timeunit.name(), namedReporter, className});
                        this.executor.scheduleWithFixedDelay(new ReporterTask((Scheduled)reporterInstance), period, period, timeunit);
                    } else {
                        LOG.info("Reporting metrics for reporter {} of type {}.", (Object)namedReporter, (Object)className);
                    }
                    this.reporters.add(reporterInstance);
                    String delimiterForReporter = reporterConfig.getString("scope.delimiter", String.valueOf(this.globalDelimiter));
                    if (delimiterForReporter.length() != 1) {
                        LOG.warn("Failed to parse delimiter '{}' for reporter '{}', using global delimiter '{}'.", new Object[]{delimiterForReporter, namedReporter, Character.valueOf(this.globalDelimiter)});
                        delimiterForReporter = String.valueOf(this.globalDelimiter);
                    }
                    this.delimiters.add(Character.valueOf(delimiterForReporter.charAt(0)));
                }
                catch (Throwable t) {
                    LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", (Object)namedReporter, (Object)t);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startQueryService(ActorSystem actorSystem, ResourceID resourceID) {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.isShutdown() ? 1 : 0) != 0, (Object)"The metric registry has already been shut down.");
            try {
                this.queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID, this.maximumFramesize);
                this.metricQueryServicePath = AkkaUtils.getAkkaURL(actorSystem, this.queryService);
            }
            catch (Exception e) {
                LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", (Throwable)e);
            }
        }
    }

    @Override
    @Nullable
    public String getMetricQueryServicePath() {
        return this.metricQueryServicePath;
    }

    @Override
    public char getDelimiter() {
        return this.globalDelimiter;
    }

    @Override
    public char getDelimiter(int reporterIndex) {
        try {
            return this.delimiters.get(reporterIndex).charValue();
        }
        catch (IndexOutOfBoundsException e) {
            LOG.warn("Delimiter for reporter index {} not found, returning global delimiter.", (Object)reporterIndex);
            return this.globalDelimiter;
        }
    }

    @Override
    public int getNumberReporters() {
        return this.reporters.size();
    }

    @VisibleForTesting
    public List<MetricReporter> getReporters() {
        return this.reporters;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isShutdown() {
        Object object = this.lock;
        synchronized (object) {
            return this.isShutdown;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> shutdown() {
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutdown) {
                return this.terminationFuture;
            }
            this.isShutdown = true;
            ArrayList<CompletableFuture> terminationFutures = new ArrayList<CompletableFuture>(3);
            Time gracePeriod = Time.seconds((long)1L);
            if (this.queryService != null) {
                CompletableFuture<Void> queryServiceTerminationFuture = ActorUtils.nonBlockingShutDown(gracePeriod.toMilliseconds(), TimeUnit.MILLISECONDS, this.queryService);
                terminationFutures.add(queryServiceTerminationFuture);
            }
            Throwable throwable = null;
            for (MetricReporter reporter : this.reporters) {
                try {
                    reporter.close();
                }
                catch (Throwable t) {
                    throwable = ExceptionUtils.firstOrSuppressed((Throwable)t, throwable);
                }
            }
            this.reporters.clear();
            if (throwable != null) {
                terminationFutures.add(FutureUtils.completedExceptionally(new FlinkException("Could not shut down the metric reporters properly.", throwable)));
            }
            CompletableFuture executorShutdownFuture = ExecutorUtils.nonBlockingShutdown((long)gracePeriod.toMilliseconds(), (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{this.executor});
            terminationFutures.add(executorShutdownFuture);
            FutureUtils.completeAll(terminationFutures).whenComplete((ignored, error) -> {
                if (error != null) {
                    this.terminationFuture.completeExceptionally((Throwable)error);
                } else {
                    this.terminationFuture.complete(null);
                }
            });
            return this.terminationFuture;
        }
    }

    @Override
    public ScopeFormats getScopeFormats() {
        return this.scopeFormats;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void register(Metric metric, String metricName, AbstractMetricGroup group) {
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutdown()) {
                LOG.warn("Cannot register metric, because the MetricRegistry has already been shut down.");
            } else {
                if (this.reporters != null) {
                    for (int i = 0; i < this.reporters.size(); ++i) {
                        MetricReporter reporter = this.reporters.get(i);
                        try {
                            if (reporter == null) continue;
                            FrontMetricGroup<AbstractMetricGroup> front = new FrontMetricGroup<AbstractMetricGroup>(i, group);
                            reporter.notifyOfAddedMetric(metric, metricName, front);
                            continue;
                        }
                        catch (Exception e) {
                            LOG.warn("Error while registering metric.", (Throwable)e);
                        }
                    }
                }
                try {
                    if (this.queryService != null) {
                        MetricQueryService.notifyOfAddedMetric(this.queryService, metric, metricName, group);
                    }
                }
                catch (Exception e) {
                    LOG.warn("Error while registering metric.", (Throwable)e);
                }
                try {
                    if (metric instanceof View) {
                        if (this.viewUpdater == null) {
                            this.viewUpdater = new ViewUpdater(this.executor);
                        }
                        this.viewUpdater.notifyOfAddedView((View)metric);
                    }
                }
                catch (Exception e) {
                    LOG.warn("Error while registering metric.", (Throwable)e);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutdown()) {
                LOG.warn("Cannot unregister metric, because the MetricRegistry has already been shut down.");
            } else {
                if (this.reporters != null) {
                    for (int i = 0; i < this.reporters.size(); ++i) {
                        try {
                            MetricReporter reporter = this.reporters.get(i);
                            if (reporter == null) continue;
                            FrontMetricGroup<AbstractMetricGroup> front = new FrontMetricGroup<AbstractMetricGroup>(i, group);
                            reporter.notifyOfRemovedMetric(metric, metricName, front);
                            continue;
                        }
                        catch (Exception e) {
                            LOG.warn("Error while registering metric.", (Throwable)e);
                        }
                    }
                }
                try {
                    if (this.queryService != null) {
                        MetricQueryService.notifyOfRemovedMetric(this.queryService, metric);
                    }
                }
                catch (Exception e) {
                    LOG.warn("Error while registering metric.", (Throwable)e);
                }
                try {
                    if (metric instanceof View && this.viewUpdater != null) {
                        this.viewUpdater.notifyOfRemovedView((View)metric);
                    }
                }
                catch (Exception e) {
                    LOG.warn("Error while registering metric.", (Throwable)e);
                }
            }
        }
    }

    @Nullable
    @VisibleForTesting
    public ActorRef getQueryService() {
        return this.queryService;
    }

    private static final class ReporterTask
    extends TimerTask {
        private final Scheduled reporter;

        private ReporterTask(Scheduled reporter) {
            this.reporter = reporter;
        }

        @Override
        public void run() {
            try {
                this.reporter.report();
            }
            catch (Throwable t) {
                LOG.warn("Error while reporting metrics", t);
            }
        }
    }
}

