/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.cnc.tracing;

import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.core.cnc.RequestTracer;
import com.couchbase.client.core.cnc.events.tracing.OverThresholdRequestsRecordedEvent;
import com.couchbase.client.core.cnc.tracing.ThresholdRequestSpan;
import com.couchbase.client.core.deps.org.jctools.queues.MpscArrayQueue;
import com.couchbase.client.core.env.ThresholdRequestTracerConfig;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.msg.Request;
import com.couchbase.client.core.msg.Response;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.util.HostAndPort;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import reactor.core.publisher.Mono;

public class ThresholdRequestTracer
implements RequestTracer {
    private static final AtomicInteger REQUEST_TRACER_ID = new AtomicInteger();
    private static final String KEY_TOTAL_MICROS = "total_duration_us";
    private static final String KEY_DISPATCH_MICROS = "last_dispatch_duration_us";
    private static final String KEY_TOTAL_DISPATCH_MICROS = "total_dispatch_duration_us";
    private static final String KEY_ENCODE_MICROS = "encode_duration_us";
    private static final String KEY_SERVER_MICROS = "last_server_duration_us";
    private static final String KEY_TOTAL_SERVER_MICROS = "total_server_duration_us";
    private static final String KEY_OPERATION_ID = "operation_id";
    private static final String KEY_OPERATION_NAME = "operation_name";
    private static final String KEY_LAST_LOCAL_SOCKET = "last_local_socket";
    private static final String KEY_LAST_REMOTE_SOCKET = "last_remote_socket";
    private static final String KEY_LAST_LOCAL_ID = "last_local_id";
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final Queue<Request<?>> overThresholdQueue;
    private final EventBus eventBus;
    private final Thread worker;
    private final long kvThreshold;
    private final long queryThreshold;
    private final long viewThreshold;
    private final long searchThreshold;
    private final long analyticsThreshold;
    private final long emitIntervalNanos;
    private final int sampleSize;

    public static Builder builder(EventBus eventBus) {
        return new Builder(eventBus);
    }

    public static ThresholdRequestTracer create(EventBus eventBus) {
        return ThresholdRequestTracer.create(eventBus, ThresholdRequestTracerConfig.create());
    }

    public static ThresholdRequestTracer create(EventBus eventBus, ThresholdRequestTracerConfig config) {
        return new ThresholdRequestTracer(eventBus, config);
    }

    private ThresholdRequestTracer(EventBus eventBus, ThresholdRequestTracerConfig config) {
        this.eventBus = eventBus;
        this.overThresholdQueue = new MpscArrayQueue(config.queueLength());
        this.kvThreshold = config.kvThreshold().toNanos();
        this.analyticsThreshold = config.analyticsThreshold().toNanos();
        this.searchThreshold = config.searchThreshold().toNanos();
        this.viewThreshold = config.viewThreshold().toNanos();
        this.queryThreshold = config.queryThreshold().toNanos();
        this.sampleSize = config.sampleSize();
        this.emitIntervalNanos = config.emitInterval().toNanos();
        this.worker = new Thread(new Worker());
        this.worker.setDaemon(true);
    }

    @Override
    public RequestSpan requestSpan(String name, RequestSpan parent) {
        return new ThresholdRequestSpan(this);
    }

    void finish(ThresholdRequestSpan span) {
        Request<? extends Response> request;
        if (span.requestContext() == null || !this.isOverThreshold(request = span.requestContext().request()) || !this.overThresholdQueue.offer(request)) {
            // empty if block
        }
    }

    private boolean isOverThreshold(Request<?> request) {
        long tookNanos = request.context().logicalRequestLatency();
        ServiceType serviceType = request.serviceType();
        if (serviceType == ServiceType.KV && tookNanos >= this.kvThreshold) {
            return true;
        }
        if (serviceType == ServiceType.QUERY && tookNanos >= this.queryThreshold) {
            return true;
        }
        if (serviceType == ServiceType.ANALYTICS && tookNanos >= this.analyticsThreshold) {
            return true;
        }
        if (serviceType == ServiceType.SEARCH && tookNanos >= this.searchThreshold) {
            return true;
        }
        return serviceType == ServiceType.VIEWS && tookNanos >= this.viewThreshold;
    }

    @Override
    public Mono<Void> start() {
        return Mono.defer(() -> {
            if (this.running.compareAndSet(false, true)) {
                this.worker.start();
            }
            return Mono.empty();
        });
    }

    @Override
    public Mono<Void> stop(Duration timeout) {
        return Mono.defer(() -> {
            if (this.running.compareAndSet(true, false)) {
                this.worker.interrupt();
            }
            return Mono.empty();
        });
    }

    public static class Builder {
        private final EventBus eventBus;
        private final ThresholdRequestTracerConfig.Builder config = ThresholdRequestTracerConfig.builder();

        Builder(EventBus eventBus) {
            this.eventBus = eventBus;
        }

        public ThresholdRequestTracer build() {
            return new ThresholdRequestTracer(this.eventBus, this.config.build());
        }

        public Builder emitInterval(Duration emitInterval) {
            this.config.emitInterval(emitInterval);
            return this;
        }

        public Builder queueLength(int queueLength) {
            this.config.queueLength(queueLength);
            return this;
        }

        public Builder kvThreshold(Duration kvThreshold) {
            this.config.kvThreshold(kvThreshold);
            return this;
        }

        public Builder queryThreshold(Duration queryThreshold) {
            this.config.queryThreshold(queryThreshold);
            return this;
        }

        public Builder viewThreshold(Duration viewThreshold) {
            this.config.viewThreshold(viewThreshold);
            return this;
        }

        public Builder searchThreshold(Duration searchThreshold) {
            this.config.searchThreshold(searchThreshold);
            return this;
        }

        public Builder analyticsThreshold(Duration analyticsThreshold) {
            this.config.analyticsThreshold(analyticsThreshold);
            return this;
        }

        public Builder sampleSize(int sampleSize) {
            this.config.sampleSize(sampleSize);
            return this;
        }
    }

    private class Worker
    implements Runnable {
        private final long workerSleepMs = Long.parseLong(System.getProperty("com.couchbase.thresholdRequestTracerSleep", "100"));
        private final boolean newOutputFormat = Boolean.parseBoolean(System.getProperty("com.couchbase.thresholdRequestTracerNewOutputFormat", "false"));
        private final Comparator<Request<?>> THRESHOLD_COMPARATOR = Comparator.comparingLong(o -> o.context().logicalRequestLatency());
        private final Queue<Request<?>> kvThresholds = new PriorityQueue(this.THRESHOLD_COMPARATOR);
        private final Queue<Request<?>> n1qlThresholds = new PriorityQueue(this.THRESHOLD_COMPARATOR);
        private final Queue<Request<?>> viewThresholds = new PriorityQueue(this.THRESHOLD_COMPARATOR);
        private final Queue<Request<?>> ftsThresholds = new PriorityQueue(this.THRESHOLD_COMPARATOR);
        private final Queue<Request<?>> analyticsThresholds = new PriorityQueue(this.THRESHOLD_COMPARATOR);
        private long kvThresholdCount = 0L;
        private long n1qlThresholdCount = 0L;
        private long viewThresholdCount = 0L;
        private long ftsThresholdCount = 0L;
        private long analyticsThresholdCount = 0L;
        private long lastThresholdLog;
        private boolean hasThresholdWritten;

        private Worker() {
        }

        @Override
        public void run() {
            Thread.currentThread().setName("cb-tracing-" + REQUEST_TRACER_ID.incrementAndGet());
            while (ThresholdRequestTracer.this.running.get()) {
                try {
                    this.handleOverThresholdQueue();
                    Thread.sleep(this.workerSleepMs);
                }
                catch (InterruptedException ex) {
                    if (!ThresholdRequestTracer.this.running.get()) {
                        return;
                    }
                    Thread.currentThread().interrupt();
                }
                catch (Exception exception) {}
            }
        }

        private void handleOverThresholdQueue() {
            long now = System.nanoTime();
            if (now > this.lastThresholdLog + ThresholdRequestTracer.this.emitIntervalNanos) {
                if (this.newOutputFormat) {
                    this.prepareAndlogOverThresholdNew();
                } else {
                    this.prepareAndlogOverThresholdOld();
                }
                this.lastThresholdLog = now;
            }
            Request request;
            while ((request = (Request)ThresholdRequestTracer.this.overThresholdQueue.poll()) != null) {
                ServiceType serviceType = request.serviceType();
                if (serviceType == ServiceType.KV) {
                    this.updateThreshold(this.kvThresholds, request);
                    ++this.kvThresholdCount;
                    continue;
                }
                if (serviceType == ServiceType.QUERY) {
                    this.updateThreshold(this.n1qlThresholds, request);
                    ++this.n1qlThresholdCount;
                    continue;
                }
                if (serviceType == ServiceType.VIEWS) {
                    this.updateThreshold(this.viewThresholds, request);
                    ++this.viewThresholdCount;
                    continue;
                }
                if (serviceType == ServiceType.SEARCH) {
                    this.updateThreshold(this.ftsThresholds, request);
                    ++this.ftsThresholdCount;
                    continue;
                }
                if (serviceType != ServiceType.ANALYTICS) continue;
                this.updateThreshold(this.analyticsThresholds, request);
                ++this.analyticsThresholdCount;
            }
            return;
        }

        private void prepareAndlogOverThresholdNew() {
            if (!this.hasThresholdWritten) {
                return;
            }
            this.hasThresholdWritten = false;
            HashMap<String, Object> output = new HashMap<String, Object>();
            if (!this.kvThresholds.isEmpty()) {
                output.put("kv", this.convertThresholdMetadataNew(this.kvThresholds, this.kvThresholdCount));
                this.kvThresholds.clear();
                this.kvThresholdCount = 0L;
            }
            if (!this.n1qlThresholds.isEmpty()) {
                output.put("query", this.convertThresholdMetadataNew(this.n1qlThresholds, this.n1qlThresholdCount));
                this.n1qlThresholds.clear();
                this.n1qlThresholdCount = 0L;
            }
            if (!this.viewThresholds.isEmpty()) {
                output.put("views", this.convertThresholdMetadataNew(this.viewThresholds, this.viewThresholdCount));
                this.viewThresholds.clear();
                this.viewThresholdCount = 0L;
            }
            if (!this.ftsThresholds.isEmpty()) {
                output.put("search", this.convertThresholdMetadataNew(this.ftsThresholds, this.ftsThresholdCount));
                this.ftsThresholds.clear();
                this.ftsThresholdCount = 0L;
            }
            if (!this.analyticsThresholds.isEmpty()) {
                output.put("analytics", this.convertThresholdMetadataNew(this.analyticsThresholds, this.analyticsThresholdCount));
                this.analyticsThresholds.clear();
                this.analyticsThresholdCount = 0L;
            }
            this.logOverThreshold(output, null);
        }

        private void prepareAndlogOverThresholdOld() {
            if (!this.hasThresholdWritten) {
                return;
            }
            this.hasThresholdWritten = false;
            ArrayList<Map<String, Object>> output = new ArrayList<Map<String, Object>>();
            if (!this.kvThresholds.isEmpty()) {
                output.add(this.convertThresholdMetadataOld(this.kvThresholds, this.kvThresholdCount, "kv"));
                this.kvThresholds.clear();
                this.kvThresholdCount = 0L;
            }
            if (!this.n1qlThresholds.isEmpty()) {
                output.add(this.convertThresholdMetadataOld(this.n1qlThresholds, this.n1qlThresholdCount, "query"));
                this.n1qlThresholds.clear();
                this.n1qlThresholdCount = 0L;
            }
            if (!this.viewThresholds.isEmpty()) {
                output.add(this.convertThresholdMetadataOld(this.viewThresholds, this.viewThresholdCount, "views"));
                this.viewThresholds.clear();
                this.viewThresholdCount = 0L;
            }
            if (!this.ftsThresholds.isEmpty()) {
                output.add(this.convertThresholdMetadataOld(this.ftsThresholds, this.ftsThresholdCount, "search"));
                this.ftsThresholds.clear();
                this.ftsThresholdCount = 0L;
            }
            if (!this.analyticsThresholds.isEmpty()) {
                output.add(this.convertThresholdMetadataOld(this.analyticsThresholds, this.analyticsThresholdCount, "analytics"));
                this.analyticsThresholds.clear();
                this.analyticsThresholdCount = 0L;
            }
            this.logOverThreshold(null, output);
        }

        private Map<String, Object> convertThresholdMetadataNew(Queue<Request<?>> requests, long count) {
            HashMap<String, Object> output = new HashMap<String, Object>();
            ArrayList top = new ArrayList();
            for (Request request : requests) {
                long totalServerDuration;
                long serverDuration;
                long totalDispatchDuration;
                long dispatchDuration;
                long encodeDuration;
                String localId;
                HashMap<String, Object> entry = new HashMap<String, Object>();
                entry.put(ThresholdRequestTracer.KEY_TOTAL_MICROS, TimeUnit.NANOSECONDS.toMicros(request.context().logicalRequestLatency()));
                String operationId = request.operationId();
                if (operationId != null) {
                    entry.put(ThresholdRequestTracer.KEY_OPERATION_ID, operationId);
                }
                entry.put(ThresholdRequestTracer.KEY_OPERATION_NAME, request.name());
                HostAndPort local = request.context().lastDispatchedFrom();
                HostAndPort peer = request.context().lastDispatchedTo();
                if (local != null) {
                    entry.put(ThresholdRequestTracer.KEY_LAST_LOCAL_SOCKET, RedactableArgument.redactSystem(local).toString());
                }
                if (peer != null) {
                    entry.put(ThresholdRequestTracer.KEY_LAST_REMOTE_SOCKET, RedactableArgument.redactSystem(peer).toString());
                }
                if ((localId = request.context().lastChannelId()) != null) {
                    entry.put(ThresholdRequestTracer.KEY_LAST_LOCAL_ID, RedactableArgument.redactSystem(localId).toString());
                }
                if ((encodeDuration = request.context().encodeLatency()) > 0L) {
                    entry.put(ThresholdRequestTracer.KEY_ENCODE_MICROS, TimeUnit.NANOSECONDS.toMicros(encodeDuration));
                }
                if ((dispatchDuration = request.context().dispatchLatency()) > 0L) {
                    entry.put(ThresholdRequestTracer.KEY_DISPATCH_MICROS, TimeUnit.NANOSECONDS.toMicros(dispatchDuration));
                }
                if ((totalDispatchDuration = request.context().totalDispatchLatency()) > 0L) {
                    entry.put(ThresholdRequestTracer.KEY_TOTAL_DISPATCH_MICROS, TimeUnit.NANOSECONDS.toMicros(totalDispatchDuration));
                }
                if ((serverDuration = request.context().serverLatency()) > 0L) {
                    entry.put(ThresholdRequestTracer.KEY_SERVER_MICROS, serverDuration);
                }
                if ((totalServerDuration = request.context().totalServerLatency()) > 0L) {
                    entry.put(ThresholdRequestTracer.KEY_TOTAL_SERVER_MICROS, totalServerDuration);
                }
                top.add(entry);
            }
            top.sort((o1, o2) -> ((Long)o2.get(ThresholdRequestTracer.KEY_TOTAL_MICROS)).compareTo((Long)o1.get(ThresholdRequestTracer.KEY_TOTAL_MICROS)));
            output.put("total_count", count);
            output.put("top_requests", top);
            return output;
        }

        private Map<String, Object> convertThresholdMetadataOld(Queue<Request<?>> requests, long count, String ident) {
            HashMap<String, Object> output = new HashMap<String, Object>();
            ArrayList top = new ArrayList();
            for (Request request : requests) {
                long serverDuration;
                long dispatchDuration;
                long encodeDuration;
                String localId;
                HashMap<String, Object> entry = new HashMap<String, Object>();
                entry.put("total_us", TimeUnit.NANOSECONDS.toMicros(request.context().logicalRequestLatency()));
                String operationId = request.operationId();
                if (operationId != null) {
                    entry.put("last_operation_id", operationId);
                }
                entry.put(ThresholdRequestTracer.KEY_OPERATION_NAME, request.getClass().getSimpleName());
                HostAndPort local = request.context().lastDispatchedFrom();
                HostAndPort peer = request.context().lastDispatchedTo();
                if (local != null) {
                    entry.put("last_local_address", RedactableArgument.redactSystem(local).toString());
                }
                if (peer != null) {
                    entry.put("last_remote_address", RedactableArgument.redactSystem(peer).toString());
                }
                if ((localId = request.context().lastChannelId()) != null) {
                    entry.put(ThresholdRequestTracer.KEY_LAST_LOCAL_ID, RedactableArgument.redactSystem(localId).toString());
                }
                if ((encodeDuration = request.context().encodeLatency()) > 0L) {
                    entry.put("encode_us", TimeUnit.NANOSECONDS.toMicros(encodeDuration));
                }
                if ((dispatchDuration = request.context().dispatchLatency()) > 0L) {
                    entry.put("last_dispatch_us", TimeUnit.NANOSECONDS.toMicros(dispatchDuration));
                }
                if ((serverDuration = request.context().serverLatency()) > 0L) {
                    entry.put("server_us", serverDuration);
                }
                top.add(entry);
            }
            top.sort((o1, o2) -> ((Long)o2.get("total_us")).compareTo((Long)o1.get("total_us")));
            output.put("service", ident);
            output.put("count", count);
            output.put("top", top);
            return output;
        }

        void logOverThreshold(Map<String, Object> toLogNew, List<Map<String, Object>> toLogOld) {
            ThresholdRequestTracer.this.eventBus.publish(new OverThresholdRequestsRecordedEvent(Duration.ofNanos(ThresholdRequestTracer.this.emitIntervalNanos), toLogNew, toLogOld));
        }

        private void updateThreshold(Queue<Request<?>> thresholds, Request<?> request) {
            thresholds.add(request);
            while (thresholds.size() > ThresholdRequestTracer.this.sampleSize) {
                thresholds.remove();
            }
            this.hasThresholdWritten = true;
        }
    }
}

