/*
 * Decompiled with CFR 0.152.
 */
package ddtrot.dd.trace.common.metrics;

import datadog.trace.api.Config;
import datadog.trace.api.Pair;
import datadog.trace.api.WellKnownTags;
import ddtrot.dd.communication.ddagent.DDAgentFeaturesDiscovery;
import ddtrot.dd.communication.ddagent.SharedCommunicationObjects;
import ddtrot.dd.trace.api.Functions;
import ddtrot.dd.trace.api.cache.DDCache;
import ddtrot.dd.trace.api.cache.DDCaches;
import ddtrot.dd.trace.bootstrap.instrumentation.api.UTF8BytesString;
import ddtrot.dd.trace.common.metrics.Aggregator;
import ddtrot.dd.trace.common.metrics.Batch;
import ddtrot.dd.trace.common.metrics.EventListener;
import ddtrot.dd.trace.common.metrics.InboxItem;
import ddtrot.dd.trace.common.metrics.MetricKey;
import ddtrot.dd.trace.common.metrics.MetricWriter;
import ddtrot.dd.trace.common.metrics.MetricsAggregator;
import ddtrot.dd.trace.common.metrics.OkHttpSink;
import ddtrot.dd.trace.common.metrics.SerializingMetricWriter;
import ddtrot.dd.trace.common.metrics.SignalItem;
import ddtrot.dd.trace.common.metrics.Sink;
import ddtrot.dd.trace.core.CoreSpan;
import ddtrot.dd.trace.core.DDTraceCoreInfo;
import ddtrot.dd.trace.core.monitor.HealthMetrics;
import ddtrot.dd.trace.util.AgentTaskScheduler;
import ddtrot.dd.trace.util.AgentThreadFactory;
import ddtrot.org.jctools.queues.MpscCompoundQueue;
import ddtrot.org.jctools.queues.SpmcArrayQueue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ConflatingMetricsAggregator
implements MetricsAggregator,
EventListener {
    private static final Logger log = LoggerFactory.getLogger(ConflatingMetricsAggregator.class);
    private static final Map<String, String> DEFAULT_HEADERS = Collections.singletonMap("Datadog-Meta-Tracer-Version", DDTraceCoreInfo.VERSION);
    private static final DDCache<String, UTF8BytesString> SERVICE_NAMES = DDCaches.newFixedSizeCache(32);
    private static final DDCache<CharSequence, UTF8BytesString> SPAN_KINDS = DDCaches.newFixedSizeCache(16);
    private static final DDCache<String, Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>> PEER_TAGS_CACHE = DDCaches.newFixedSizeCache(64);
    private static final Function<String, Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>> PEER_TAGS_CACHE_ADDER = key -> Pair.of(DDCaches.newFixedSizeCache(512), value -> UTF8BytesString.create(key + ":" + value));
    private static final CharSequence SYNTHETICS_ORIGIN = "synthetics";
    private static final Set<String> ELIGIBLE_SPAN_KINDS_FOR_METRICS = Collections.unmodifiableSet(new HashSet<String>(Arrays.asList("server", "client", "consumer", "producer")));
    private static final Set<String> ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION = Collections.unmodifiableSet(new HashSet<String>(Arrays.asList("client", "producer", "consumer")));
    private final Set<String> ignoredResources;
    private final Queue<Batch> batchPool;
    private final ConcurrentHashMap<MetricKey, Batch> pending;
    private final ConcurrentHashMap<MetricKey, MetricKey> keys;
    private final Thread thread;
    private final MpscCompoundQueue<InboxItem> inbox;
    private final Sink sink;
    private final Aggregator aggregator;
    private final long reportingInterval;
    private final TimeUnit reportingIntervalTimeUnit;
    private final DDAgentFeaturesDiscovery features;
    private final HealthMetrics healthMetrics;
    private volatile AgentTaskScheduler.Scheduled<?> cancellation;

    public ConflatingMetricsAggregator(Config config, SharedCommunicationObjects sharedCommunicationObjects, HealthMetrics healthMetrics) {
        this(config.getWellKnownTags(), config.getMetricsIgnoredResources(), sharedCommunicationObjects.featuresDiscovery(config), healthMetrics, new OkHttpSink(sharedCommunicationObjects.agentHttpClient, sharedCommunicationObjects.agentUrl.toString(), "v0.6/stats", config.isTracerMetricsBufferingEnabled(), false, DEFAULT_HEADERS), config.getTracerMetricsMaxAggregates(), config.getTracerMetricsMaxPending());
    }

    ConflatingMetricsAggregator(WellKnownTags wellKnownTags, Set<String> ignoredResources, DDAgentFeaturesDiscovery features, HealthMetrics healthMetric, Sink sink, int maxAggregates, int queueSize) {
        this(wellKnownTags, ignoredResources, features, healthMetric, sink, maxAggregates, queueSize, 10L, TimeUnit.SECONDS);
    }

    ConflatingMetricsAggregator(WellKnownTags wellKnownTags, Set<String> ignoredResources, DDAgentFeaturesDiscovery features, HealthMetrics healthMetric, Sink sink, int maxAggregates, int queueSize, long reportingInterval, TimeUnit timeUnit) {
        this(ignoredResources, features, healthMetric, sink, new SerializingMetricWriter(wellKnownTags, sink), maxAggregates, queueSize, reportingInterval, timeUnit);
    }

    ConflatingMetricsAggregator(Set<String> ignoredResources, DDAgentFeaturesDiscovery features, HealthMetrics healthMetric, Sink sink, MetricWriter metricWriter, int maxAggregates, int queueSize, long reportingInterval, TimeUnit timeUnit) {
        this.ignoredResources = ignoredResources;
        this.inbox = new MpscCompoundQueue(queueSize);
        this.batchPool = new SpmcArrayQueue<Batch>(maxAggregates);
        this.pending = new ConcurrentHashMap(maxAggregates * 4 / 3);
        this.keys = new ConcurrentHashMap();
        this.features = features;
        this.healthMetrics = healthMetric;
        this.sink = sink;
        this.aggregator = new Aggregator(metricWriter, this.batchPool, this.inbox, this.pending, this.keys.keySet(), maxAggregates, reportingInterval, timeUnit);
        this.thread = AgentThreadFactory.newAgentThread(AgentThreadFactory.AgentThread.METRICS_AGGREGATOR, this.aggregator);
        this.reportingInterval = reportingInterval;
        this.reportingIntervalTimeUnit = timeUnit;
    }

    @Override
    public void start() {
        this.sink.register(this);
        this.thread.start();
        this.cancellation = AgentTaskScheduler.get().scheduleAtFixedRate(new ReportTask(), this, this.reportingInterval, this.reportingInterval, this.reportingIntervalTimeUnit);
        log.debug("started metrics aggregator");
    }

    private boolean isMetricsEnabled() {
        if (this.features.getMetricsEndpoint() == null) {
            this.features.discoverIfOutdated();
        }
        return this.features.supportsMetrics();
    }

    @Override
    public boolean report() {
        boolean published;
        int attempts = 0;
        while (!(published = this.inbox.offer(SignalItem.ReportSignal.REPORT)) && ++attempts < 10) {
        }
        if (!published) {
            log.debug("Skipped metrics reporting because the queue is full");
        }
        return published;
    }

    @Override
    public Future<Boolean> forceReport() {
        if (!this.isMetricsEnabled()) {
            return CompletableFuture.completedFuture(false);
        }
        while (this.cancellation == null || this.cancellation.get() != null && !this.thread.isAlive()) {
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException e) {
                return CompletableFuture.completedFuture(false);
            }
        }
        SignalItem.ReportSignal reportSignal = new SignalItem.ReportSignal();
        boolean published = false;
        while (this.thread.isAlive() && !published) {
            published = this.inbox.offer(reportSignal);
            if (published) continue;
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException e) {
                log.debug("Failed to ask for report");
                break;
            }
        }
        if (published) {
            return reportSignal.future;
        }
        return CompletableFuture.completedFuture(false);
    }

    @Override
    public boolean publish(List<? extends CoreSpan<?>> trace) {
        boolean forceKeep = false;
        int counted = 0;
        if (this.features.supportsMetrics()) {
            for (CoreSpan<?> span : trace) {
                boolean isTopLevel = span.isTopLevel();
                CharSequence spanKind = span.unsafeGetTag("span.kind", "");
                if (!this.shouldComputeMetric(span, spanKind)) continue;
                CharSequence resourceName = span.getResourceName();
                if (resourceName != null && this.ignoredResources.contains(resourceName.toString())) {
                    forceKeep = false;
                    break;
                }
                ++counted;
                forceKeep |= this.publish(span, isTopLevel, spanKind);
            }
            this.healthMetrics.onClientStatTraceComputed(counted, trace.size(), !forceKeep);
        }
        return forceKeep;
    }

    private boolean shouldComputeMetric(CoreSpan<?> span, @Nonnull CharSequence spanKind) {
        return (span.isMeasured() || span.isTopLevel() || this.spanKindEligible(spanKind)) && span.getLongRunningVersion() <= 0 && span.getDurationNano() > 0L;
    }

    private boolean spanKindEligible(@Nonnull CharSequence spanKind) {
        return ELIGIBLE_SPAN_KINDS_FOR_METRICS.contains(spanKind.toString());
    }

    private boolean publish(CoreSpan<?> span, boolean isTopLevel, CharSequence spanKind) {
        MetricKey newKey = new MetricKey(span.getResourceName(), SERVICE_NAMES.computeIfAbsent(span.getServiceName(), Functions.UTF8_ENCODE), span.getOperationName(), span.getType(), span.getHttpStatusCode(), ConflatingMetricsAggregator.isSynthetic(span), span.getParentId() == 0L, SPAN_KINDS.computeIfAbsent(spanKind, UTF8BytesString::create), this.getPeerTags(span, spanKind.toString()));
        MetricKey key = this.keys.putIfAbsent(newKey, newKey);
        if (null == key) {
            key = newKey;
        }
        long tag = (span.getError() > 0 ? Long.MIN_VALUE : 0L) | (isTopLevel ? 0x4000000000000000L : 0L);
        long durationNanos = span.getDurationNano();
        Batch batch = this.pending.get(key);
        if (null != batch) {
            if (batch.add(tag, durationNanos)) {
                return false;
            }
            key = batch.getKey();
        }
        batch = this.newBatch(key);
        batch.add(tag, durationNanos);
        this.pending.put(key, batch);
        this.inbox.offer(batch);
        return span.getError() > 0;
    }

    private List<UTF8BytesString> getPeerTags(CoreSpan<?> span, String spanKind) {
        Object baseService;
        if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) {
            Set<String> eligiblePeerTags = this.features.peerTags();
            ArrayList<UTF8BytesString> peerTags = new ArrayList<UTF8BytesString>(eligiblePeerTags.size());
            for (String peerTag : eligiblePeerTags) {
                Object value = span.unsafeGetTag(peerTag);
                if (value == null) continue;
                Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>> cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER);
                peerTags.add(cacheAndCreator.getLeft().computeIfAbsent(value.toString(), cacheAndCreator.getRight()));
            }
            return peerTags;
        }
        if ("internal".equals(spanKind) && (baseService = span.unsafeGetTag("_dd.base_service")) != null) {
            Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>> cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent("_dd.base_service", PEER_TAGS_CACHE_ADDER);
            return Collections.singletonList(cacheAndCreator.getLeft().computeIfAbsent(baseService.toString(), cacheAndCreator.getRight()));
        }
        return Collections.emptyList();
    }

    private static boolean isSynthetic(CoreSpan<?> span) {
        return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString());
    }

    private Batch newBatch(MetricKey key) {
        Batch batch = this.batchPool.poll();
        if (null == batch) {
            return new Batch(key);
        }
        return batch.reset(key);
    }

    public void stop() {
        if (null != this.cancellation) {
            this.cancellation.cancel();
        }
        this.inbox.offer(SignalItem.StopSignal.STOP);
    }

    @Override
    public void close() {
        this.stop();
        try {
            this.thread.join(800L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    @Override
    public void onEvent(EventListener.EventType eventType, String message) {
        this.healthMetrics.onClientStatPayloadSent();
        switch (eventType) {
            case DOWNGRADED: {
                log.debug("Agent downgrade was detected");
                this.disable();
                this.healthMetrics.onClientStatDowngraded();
                break;
            }
            case BAD_PAYLOAD: {
                log.debug("bad metrics payload sent to trace agent: {}", (Object)message);
                this.healthMetrics.onClientStatErrorReceived();
                break;
            }
            case ERROR: {
                log.debug("trace agent errored receiving metrics payload: {}", (Object)message);
                this.healthMetrics.onClientStatErrorReceived();
                break;
            }
        }
    }

    private void disable() {
        this.features.discover();
        if (!this.features.supportsMetrics()) {
            log.debug("Disabling metric reporting because an agent downgrade was detected");
            this.pending.clear();
            this.batchPool.clear();
            this.inbox.clear();
            this.aggregator.clearAggregates();
        }
    }

    private static final class ReportTask
    implements AgentTaskScheduler.Task<ConflatingMetricsAggregator> {
        private ReportTask() {
        }

        @Override
        public void run(ConflatingMetricsAggregator target) {
            target.report();
        }
    }
}

