/*
 * Decompiled with CFR 0.152.
 */
package spectator-agent.spectator.atlas;

import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.inject.Inject;
import javax.inject.Singleton;
import spectator-agent.spectator.api.AbstractRegistry;
import spectator-agent.spectator.api.Clock;
import spectator-agent.spectator.api.Counter;
import spectator-agent.spectator.api.DistributionSummary;
import spectator-agent.spectator.api.Gauge;
import spectator-agent.spectator.api.Id;
import spectator-agent.spectator.api.Measurement;
import spectator-agent.spectator.api.Meter;
import spectator-agent.spectator.api.Registry;
import spectator-agent.spectator.api.Timer;
import spectator-agent.spectator.api.Utils;
import spectator-agent.spectator.atlas.AtlasConfig;
import spectator-agent.spectator.atlas.AtlasCounter;
import spectator-agent.spectator.atlas.AtlasDistributionSummary;
import spectator-agent.spectator.atlas.AtlasGauge;
import spectator-agent.spectator.atlas.AtlasMaxGauge;
import spectator-agent.spectator.atlas.AtlasMeter;
import spectator-agent.spectator.atlas.AtlasTimer;
import spectator-agent.spectator.atlas.MeasurementConsumer;
import spectator-agent.spectator.atlas.OverridableClock;
import spectator-agent.spectator.atlas.Publisher;
import spectator-agent.spectator.atlas.RollupPolicy;
import spectator-agent.spectator.atlas.StepClock;
import spectator-agent.spectator.atlas.SubscriptionManager;
import spectator-agent.spectator.atlas.impl.Consolidator;
import spectator-agent.spectator.atlas.impl.DefaultPublisher;
import spectator-agent.spectator.atlas.impl.EvalPayload;
import spectator-agent.spectator.atlas.impl.Evaluator;
import spectator-agent.spectator.atlas.impl.EvaluatorConfig;
import spectator-agent.spectator.atlas.impl.PublishPayload;
import spectator-agent.spectator.atlas.shaded.spectator-atlas.json.databind.ObjectMapper;
import spectator-agent.spectator.impl.Scheduler;
import spectator-agent.spectator.ipc.http.HttpClient;

@Singleton
public final class AtlasRegistry
extends AbstractRegistry
implements AutoCloseable {
    private static final String PUBLISH_TASK_TIMER = "spectator.atlas.publishTaskTime";
    private final Clock stepClock;
    private final AtlasConfig config;
    private final Duration step;
    private final long stepMillis;
    private final long meterTTL;
    private final URI uri;
    private final Duration lwcStep;
    private final long lwcStepMillis;
    private final Duration configRefreshFrequency;
    private final URI evalUri;
    private final int batchSize;
    private final int numThreads;
    private final Map<String, String> commonTags;
    private final Registry debugRegistry;
    private final RollupPolicy rollupPolicy;
    private final Publisher publisher;
    private Scheduler scheduler;
    private final SubscriptionManager subManager;
    private final Evaluator evaluator;
    private final boolean parallelPolling;
    private long lastPollTimestamp = -1L;
    private long lastFlushTimestamp = -1L;
    private final ConcurrentHashMap<Id, Consolidator> atlasMeasurements = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, Lock> publishTaskLocks = new ConcurrentHashMap();

    @Inject
    public AtlasRegistry(Clock clock, AtlasConfig config) {
        this(clock, config, null);
    }

    AtlasRegistry(Clock clock, AtlasConfig config, HttpClient client) {
        super(new OverridableClock(clock), config);
        this.config = config;
        this.stepClock = new StepClock(clock, config.lwcStep().toMillis());
        this.step = config.step();
        this.stepMillis = this.step.toMillis();
        this.meterTTL = config.meterTTL().toMillis();
        this.uri = URI.create(config.uri());
        this.lwcStep = config.lwcStep();
        this.lwcStepMillis = this.lwcStep.toMillis();
        if (this.lwcStepMillis > this.stepMillis) {
            throw new IllegalArgumentException("lwcStep cannot be larger than step (" + this.lwcStep + " > " + this.step + ")");
        }
        if (this.stepMillis % this.lwcStepMillis != 0L) {
            throw new IllegalArgumentException("step is not an even multiple of lwcStep (" + this.step + " % " + this.lwcStep + " != 0)");
        }
        this.configRefreshFrequency = config.configRefreshFrequency();
        this.evalUri = URI.create(config.evalUri());
        this.batchSize = config.batchSize();
        this.numThreads = config.numThreads();
        this.commonTags = new TreeMap<String, String>(config.commonTags());
        this.debugRegistry = Optional.ofNullable(config.debugRegistry()).orElse(this);
        this.rollupPolicy = config.rollupPolicy();
        HttpClient httpClient = client != null ? client : HttpClient.create(this.debugRegistry);
        Publisher pub = config.publisher();
        this.publisher = pub == null ? new DefaultPublisher(config, httpClient, this.debugRegistry) : pub;
        EvaluatorConfig evalConfig = EvaluatorConfig.fromAtlasConfig(config);
        this.subManager = new SubscriptionManager(new ObjectMapper(), httpClient, clock, config);
        this.evaluator = new Evaluator(evalConfig);
        this.parallelPolling = evalConfig.parallelMeasurementPolling();
        if (config.autoStart()) {
            this.start();
        }
    }

    public void start() {
        if (this.scheduler == null) {
            this.logger.info("common tags: {}", (Object)this.commonTags);
            this.publisher.init();
            Scheduler.Options options = new Scheduler.Options().withFrequency(Scheduler.Policy.FIXED_RATE_SKIP_IF_LONG, this.step).withInitialDelay(Duration.ofMillis(this.config.initialPollingDelay(this.clock(), this.stepMillis))).withStopOnFailure(false);
            this.scheduler = new Scheduler(this.debugRegistry, "spectator-reg-atlas", this.numThreads);
            this.scheduler.schedule(options, this::sendToAtlas);
            this.logger.info("started collecting metrics every {} reporting to {}", (Object)this.step, (Object)this.uri);
            Scheduler.Options lwcOptions = new Scheduler.Options().withFrequency(Scheduler.Policy.FIXED_RATE_SKIP_IF_LONG, this.lwcStep).withInitialDelay(Duration.ofMillis(this.config.initialPollingDelay(this.clock(), this.lwcStepMillis))).withStopOnFailure(false);
            this.scheduler.schedule(lwcOptions, this::sendToLWC);
            Scheduler.Options subOptions = new Scheduler.Options().withFrequency(Scheduler.Policy.FIXED_DELAY, this.configRefreshFrequency).withStopOnFailure(false);
            this.scheduler.schedule(subOptions, this::fetchSubscriptions);
        } else {
            this.logger.warn("registry already started, ignoring duplicate request");
        }
    }

    public void stop() {
        if (this.scheduler == null) {
            this.logger.warn("registry stopped, but was never started");
        } else {
            this.scheduler.shutdown();
            this.scheduler = null;
            this.logger.info("stopped collecting metrics every {} reporting to {}", (Object)this.step, (Object)this.uri);
            try {
                OverridableClock overridableClock = (OverridableClock)this.clock();
                long now = this.clock().wallTime();
                overridableClock.setWallTime(now);
                this.logger.info("flushing data for previous interval to Atlas");
                this.sendToAtlas();
                this.logger.info("flushing data for final interval to Atlas");
                overridableClock.setWallTime(now / this.lwcStepMillis * this.lwcStepMillis + this.lwcStepMillis);
                this.pollMeters(overridableClock.wallTime());
                overridableClock.setWallTime(now / this.stepMillis * this.stepMillis + this.stepMillis);
                this.sendToAtlas();
            }
            catch (Exception e) {
                this.logger.warn("failed to flush data to Atlas", e);
            }
            try {
                this.publisher.close();
            }
            catch (Exception e) {
                this.logger.debug("failed to cleanly shutdown publisher");
            }
        }
    }

    @Override
    public void close() {
        this.stop();
    }

    long lastCompletedTimestamp(long s) {
        long now = this.clock().wallTime();
        return now / s * s;
    }

    private Timer publishTaskTimer(String id) {
        return this.debugRegistry.timer(PUBLISH_TASK_TIMER, "id", id);
    }

    private void timePublishTask(String id, Runnable task) {
        this.timePublishTask(id, id, task);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void timePublishTask(String id, String lockName, Runnable task) {
        Lock lock = this.publishTaskLocks.computeIfAbsent(lockName, n -> new ReentrantLock());
        lock.lock();
        try {
            this.publishTaskTimer(id).recordRunnable(task);
        }
        finally {
            lock.unlock();
        }
    }

    void sendToAtlas() {
        this.timePublishTask("sendToAtlas", () -> {
            if (this.config.enabled()) {
                long t = this.lastCompletedTimestamp(this.stepMillis);
                if (t > this.lastFlushTimestamp) {
                    this.pollMeters(t);
                    this.logger.debug("sending to Atlas for time: {}", (Object)t);
                    ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
                    for (RollupPolicy.Result batch : this.getBatches(t)) {
                        PublishPayload p = new PublishPayload(batch.commonTags(), batch.measurements());
                        CompletableFuture<Void> future = this.publisher.publish(p);
                        futures.add(future);
                    }
                    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
                    this.lastFlushTimestamp = t;
                } else {
                    this.logger.debug("skipping duplicate flush attempt for time: {}", (Object)t);
                }
            } else {
                this.logger.debug("publishing is disabled, skipping collection");
            }
            this.removeExpiredMeters();
        });
    }

    void sendToLWC() {
        this.timePublishTask("sendToLWC", () -> {
            long t = this.lastCompletedTimestamp(this.lwcStepMillis);
            this.pollMeters(t);
            if (this.config.lwcEnabled()) {
                this.logger.debug("sending to LWC for time: {}", (Object)t);
                try {
                    EvalPayload payload = this.evaluator.eval(t, this.parallelPolling);
                    if (!payload.getMetrics().isEmpty()) {
                        ArrayList futures = new ArrayList();
                        payload.consumeBatches(this.batchSize, p -> futures.add(this.publisher.publish((EvalPayload)p)));
                        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
                    }
                }
                catch (Exception e) {
                    this.logger.warn("failed to send metrics for subscriptions (uri={})", (Object)this.evalUri, (Object)e);
                }
            } else {
                this.logger.debug("lwc is disabled, skipping subscriptions");
            }
        });
    }

    void pollMeters(long t) {
        this.timePublishTask("pollMeters", "atlasMeasurements", () -> {
            if (t > this.lastPollTimestamp) {
                MeasurementConsumer consumer = (id, timestamp, value) -> {
                    Consolidator consolidator = Utils.computeIfAbsent(this.atlasMeasurements, id, k -> {
                        int multiple = (int)(this.stepMillis / this.lwcStepMillis);
                        return Consolidator.create(k, this.stepMillis, multiple);
                    });
                    consolidator.update(timestamp, value);
                    this.evaluator.update(id, timestamp, value);
                };
                this.logger.debug("collecting measurements for time: {}", (Object)t);
                this.publishTaskTimer("pollMeasurements").recordRunnable(() -> StreamSupport.stream(this.spliterator(), this.parallelPolling).forEach(meter -> ((AtlasMeter)meter).measure(t, consumer)));
                this.lastPollTimestamp = t;
            }
        });
    }

    @Override
    public void removeExpiredMeters() {
        long now = this.clock().wallTime();
        int total = 0;
        int expired = 0;
        Iterator<Meter> it = this.iterator();
        while (it.hasNext()) {
            ++total;
            AtlasMeter m = (AtlasMeter)it.next();
            if (!m.hasExpired(now)) continue;
            ++expired;
            it.remove();
        }
        this.logger.debug("removed {} expired meters out of {} total", (Object)expired, (Object)total);
        this.cleanupCachedState();
    }

    private void fetchSubscriptions() {
        if (this.config.lwcEnabled()) {
            this.subManager.refresh();
            this.evaluator.sync(this.subManager.subscriptions());
        } else {
            this.logger.debug("lwc is disabled, skipping subscription config refresh");
        }
    }

    List<RollupPolicy.Result> getBatches(long t) {
        int n = this.atlasMeasurements.size();
        ArrayList<RollupPolicy.Result> batches = new ArrayList<RollupPolicy.Result>(n / this.batchSize + 1);
        this.timePublishTask("getBatches", "atlasMeasurements", () -> {
            this.debugRegistry.distributionSummary("spectator.registrySize").record(n);
            ArrayList<Measurement> input = new ArrayList<Measurement>(n);
            Iterator<Map.Entry<Id, Consolidator>> it = this.atlasMeasurements.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Id, Consolidator> entry = it.next();
                Consolidator consolidator = entry.getValue();
                consolidator.update(t, Double.NaN);
                double v = consolidator.value(t);
                if (!Double.isNaN(v)) {
                    input.add(new Measurement(entry.getKey(), t, v));
                }
                if (!consolidator.isEmpty()) continue;
                it.remove();
            }
            List results = (List)this.rollupPolicy.apply(input);
            int rollupSize = results.stream().mapToInt(r -> r.measurements().size()).sum();
            this.debugRegistry.distributionSummary("spectator.rollupResultSize").record(rollupSize);
            for (RollupPolicy.Result result : results) {
                List<Measurement> ms = result.measurements();
                for (int i = 0; i < ms.size(); i += this.batchSize) {
                    List<Measurement> batch = ms.subList(i, Math.min(ms.size(), i + this.batchSize));
                    batches.add(new RollupPolicy.Result(result.commonTags(), batch));
                }
            }
        });
        return batches;
    }

    @Override
    public Stream<Measurement> measurements() {
        long t = this.lastCompletedTimestamp(this.stepMillis);
        this.pollMeters(t);
        this.removeExpiredMeters();
        return this.getBatches(t).stream().flatMap(r -> r.measurements().stream());
    }

    @Override
    protected Counter newCounter(Id id) {
        return new AtlasCounter(id, this.clock(), this.meterTTL, this.lwcStepMillis);
    }

    @Override
    protected DistributionSummary newDistributionSummary(Id id) {
        return new AtlasDistributionSummary(id, this.clock(), this.meterTTL, this.lwcStepMillis);
    }

    @Override
    protected Timer newTimer(Id id) {
        return new AtlasTimer(id, this.clock(), this.meterTTL, this.lwcStepMillis);
    }

    @Override
    protected Gauge newGauge(Id id) {
        return new AtlasGauge(id, this.stepClock, this.meterTTL);
    }

    @Override
    protected Gauge newMaxGauge(Id id) {
        return new AtlasMaxGauge(id, this.clock(), this.meterTTL, this.lwcStepMillis);
    }
}

