/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.processor;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import com.google.common.primitives.SignedBytes;
import java.io.File;
import java.io.Serializable;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.annotations.SingleThread;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.peerforwarder.RequiresPeerForwarding;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.trace.Span;
import org.opensearch.dataprepper.plugins.processor.ServiceMapProcessorConfig;
import org.opensearch.dataprepper.plugins.processor.ServiceMapRelationship;
import org.opensearch.dataprepper.plugins.processor.state.MapDbProcessorState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SingleThread
@DataPrepperPlugin(name="service_map", deprecatedName="service_map_stateful", pluginType=Processor.class, pluginConfigurationType=ServiceMapProcessorConfig.class)
public class ServiceMapStatefulProcessor
extends AbstractProcessor<Record<Event>, Record<Event>>
implements RequiresPeerForwarding {
    static final String SPANS_DB_SIZE = "spansDbSize";
    static final String TRACE_GROUP_DB_SIZE = "traceGroupDbSize";
    static final String SPANS_DB_COUNT = "spansDbCount";
    static final String TRACE_GROUP_DB_COUNT = "traceGroupDbCount";
    static final String RELATIONSHIP_COUNT = "relationshipCount";
    private static final Logger LOG = LoggerFactory.getLogger(ServiceMapStatefulProcessor.class);
    private static final String EMPTY_SUFFIX = "-empty";
    private static final String EVENT_TYPE = "event";
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final Collection<Record<Event>> EMPTY_COLLECTION = Collections.emptySet();
    private static final Integer TO_MILLIS = 1000;
    private static final AtomicInteger processorsCreated = new AtomicInteger(0);
    private static long previousTimestamp;
    private static long windowDurationMillis;
    private static CyclicBarrier allThreadsCyclicBarrier;
    private static volatile MapDbProcessorState<ServiceMapStateData> previousWindow;
    private static volatile MapDbProcessorState<ServiceMapStateData> currentWindow;
    private static volatile MapDbProcessorState<String> previousTraceGroupWindow;
    private static volatile MapDbProcessorState<String> currentTraceGroupWindow;
    private static volatile Set<ServiceNodeData> previousIsolatedServiceNodes;
    private static volatile Set<ServiceNodeData> currentIsolatedServiceNodes;
    private static final Set<ServiceMapRelationship> RELATIONSHIP_STATE;
    private static File dbPath;
    private static Clock clock;
    private final int thisProcessorId;

    @DataPrepperPluginConstructor
    public ServiceMapStatefulProcessor(ServiceMapProcessorConfig serviceMapProcessorConfig, PluginMetrics pluginMetrics, PipelineDescription pipelineDescription) {
        this((long)serviceMapProcessorConfig.getWindowDuration() * (long)TO_MILLIS.intValue(), new File(serviceMapProcessorConfig.getDbPath()), Clock.systemUTC(), pipelineDescription.getNumberOfProcessWorkers(), pluginMetrics);
    }

    ServiceMapStatefulProcessor(long windowDurationMillis, File databasePath, Clock clock, int processWorkers, PluginMetrics pluginMetrics) {
        super(pluginMetrics);
        ServiceMapStatefulProcessor.clock = clock;
        this.thisProcessorId = processorsCreated.getAndIncrement();
        if (this.isMasterInstance()) {
            previousTimestamp = ServiceMapStatefulProcessor.clock.millis();
            ServiceMapStatefulProcessor.windowDurationMillis = windowDurationMillis;
            dbPath = ServiceMapStatefulProcessor.createPath(databasePath);
            currentWindow = new MapDbProcessorState(dbPath, this.getNewDbName(), processWorkers);
            previousWindow = new MapDbProcessorState(dbPath, this.getNewDbName() + EMPTY_SUFFIX, processWorkers);
            currentTraceGroupWindow = new MapDbProcessorState(dbPath, this.getNewTraceDbName(), processWorkers);
            previousTraceGroupWindow = new MapDbProcessorState(dbPath, this.getNewTraceDbName() + EMPTY_SUFFIX, processWorkers);
            currentIsolatedServiceNodes = Sets.newConcurrentHashSet();
            previousIsolatedServiceNodes = Sets.newConcurrentHashSet();
            allThreadsCyclicBarrier = new CyclicBarrier(processWorkers);
        }
        pluginMetrics.gauge(SPANS_DB_SIZE, (Object)this, serviceMapStateful -> serviceMapStateful.getSpansDbSize());
        pluginMetrics.gauge(TRACE_GROUP_DB_SIZE, (Object)this, serviceMapStateful -> serviceMapStateful.getTraceGroupDbSize());
        pluginMetrics.gauge(SPANS_DB_COUNT, (Object)this, serviceMapStateful -> serviceMapStateful.getSpansDbCount());
        pluginMetrics.gauge(TRACE_GROUP_DB_COUNT, (Object)this, serviceMapStateful -> serviceMapStateful.getTraceGroupDbCount());
        pluginMetrics.gauge(RELATIONSHIP_COUNT, (Object)this, serviceMapStateful -> serviceMapStateful.getRelationshipCount());
    }

    private static File createPath(File path) {
        if (!path.exists() && !path.mkdirs()) {
            throw new RuntimeException(String.format("Unable to create the directory at the provided path: %s", path.getName()));
        }
        return path;
    }

    public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
        Collection<Record<Event>> relationships = this.windowDurationHasPassed() ? this.evaluateEdges() : EMPTY_COLLECTION;
        TreeMap batchStateData = new TreeMap(SignedBytes.lexicographicalComparator());
        records.forEach(i -> this.processSpan((Span)i.getData(), batchStateData));
        try {
            currentWindow.putAll(batchStateData);
        }
        catch (RuntimeException e) {
            LOG.error("Caught exception trying to put batch state data", (Throwable)e);
        }
        return relationships;
    }

    private void processSpan(Span span, Map<byte[], ServiceMapStateData> batchStateData) {
        if (span.getServiceName() != null) {
            byte[] traceId;
            String serviceName = span.getServiceName();
            String spanId = span.getSpanId();
            String parentSpanId = span.getParentSpanId();
            String spanKind = span.getKind();
            try {
                traceId = Hex.decodeHex((String)span.getTraceId());
            }
            catch (DecoderException e) {
                LOG.error("Caught DecoderException when decoding the traceId.", (Throwable)e);
                return;
            }
            currentIsolatedServiceNodes.add(new ServiceNodeData(traceId, serviceName));
            try {
                batchStateData.put(Hex.decodeHex((String)spanId), new ServiceMapStateData(serviceName, parentSpanId.isEmpty() ? null : Hex.decodeHex((String)parentSpanId), traceId, spanKind, span.getName()));
            }
            catch (Exception e) {
                LOG.error("Caught exception trying to put service map state data into batch", (Throwable)e);
            }
            if (parentSpanId.isEmpty()) {
                try {
                    currentTraceGroupWindow.put(traceId, (Object)span.getName());
                }
                catch (Exception e) {
                    LOG.error("Caught exception trying to put trace group name", (Throwable)e);
                }
            }
        }
    }

    private Collection<Record<Event>> evaluateEdges() {
        LOG.debug("Evaluating service map edges");
        try {
            HashSet<Record<Event>> serviceDependencyRecords = new HashSet<Record<Event>>();
            serviceDependencyRecords.addAll(this.iterateProcessorState(previousWindow));
            serviceDependencyRecords.addAll(this.iterateProcessorState(currentWindow));
            LOG.debug("Done evaluating service map edges");
            allThreadsCyclicBarrier.await();
            if (this.isMasterInstance()) {
                this.processIsolatedServiceMapNodes(serviceDependencyRecords);
                this.rotateWindows();
            }
            allThreadsCyclicBarrier.await();
            return serviceDependencyRecords;
        }
        catch (InterruptedException | BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
    }

    private Collection<Record<Event>> iterateProcessorState(MapDbProcessorState<ServiceMapStateData> processorState) {
        HashSet<Record<Event>> serviceDependencyRecords = new HashSet<Record<Event>>();
        if (processorState.getAll() != null && !processorState.getAll().isEmpty()) {
            processorState.getIterator(processorsCreated.get(), this.thisProcessorId).forEachRemaining(entry -> {
                ServiceMapStateData child = (ServiceMapStateData)entry.getValue();
                if (child.parentSpanId == null) {
                    return;
                }
                ServiceMapStateData parent = (ServiceMapStateData)currentWindow.get(child.parentSpanId);
                if (parent == null) {
                    parent = (ServiceMapStateData)previousWindow.get(child.parentSpanId);
                }
                String traceGroupName = this.getTraceGroupName(child.traceId);
                if (parent == null || parent.serviceName.equals(child.serviceName)) {
                    return;
                }
                previousIsolatedServiceNodes.remove(new ServiceNodeData(parent.traceId, parent.serviceName));
                currentIsolatedServiceNodes.remove(new ServiceNodeData(parent.traceId, parent.serviceName));
                previousIsolatedServiceNodes.remove(new ServiceNodeData(parent.traceId, child.serviceName));
                currentIsolatedServiceNodes.remove(new ServiceNodeData(parent.traceId, child.serviceName));
                ServiceMapRelationship destinationRelationship = ServiceMapRelationship.newDestinationRelationship(parent.serviceName, parent.spanKind, child.serviceName, child.name, traceGroupName);
                ServiceMapRelationship targetRelationship = ServiceMapRelationship.newTargetRelationship(child.serviceName, child.spanKind, child.serviceName, child.name, traceGroupName);
                this.addServiceMapRelationship(serviceDependencyRecords, destinationRelationship);
                this.addServiceMapRelationship(serviceDependencyRecords, targetRelationship);
            });
        }
        return serviceDependencyRecords;
    }

    private void addServiceMapRelationship(Collection<Record<Event>> serviceDependencyRecords, ServiceMapRelationship serviceMapRelationship) {
        if (!RELATIONSHIP_STATE.contains(serviceMapRelationship)) {
            try {
                JacksonEvent destinationRelationshipEvent = JacksonEvent.builder().withEventType(EVENT_TYPE).withData((Object)serviceMapRelationship).build();
                serviceDependencyRecords.add((Record<Event>)new Record((Object)destinationRelationshipEvent));
                RELATIONSHIP_STATE.add(serviceMapRelationship);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private String getTraceGroupName(byte[] traceId) {
        try {
            String traceGroupName = (String)currentTraceGroupWindow.get(traceId);
            return traceGroupName != null ? traceGroupName : (String)previousTraceGroupWindow.get(traceId);
        }
        catch (RuntimeException e) {
            LOG.error("Caught exception trying to get trace group name", (Throwable)e);
            return null;
        }
    }

    public void prepareForShutdown() {
        previousTimestamp = 0L;
    }

    public boolean isReadyForShutdown() {
        return currentWindow.size() == 0L;
    }

    public void shutdown() {
        previousWindow.delete();
        currentWindow.delete();
        previousTraceGroupWindow.delete();
        currentTraceGroupWindow.delete();
        previousIsolatedServiceNodes.clear();
        currentIsolatedServiceNodes.clear();
    }

    private void processIsolatedServiceMapNodes(Collection<Record<Event>> serviceDependencyRecords) {
        LOG.debug("Add isolated service nodes into service-map relationships.");
        previousIsolatedServiceNodes.forEach(serviceNodeData -> {
            String traceGroupName = this.getTraceGroupName(serviceNodeData.traceId);
            ServiceMapRelationship serviceMapRelationship = ServiceMapRelationship.newIsolatedService(serviceNodeData.serviceName, traceGroupName);
            this.addServiceMapRelationship(serviceDependencyRecords, serviceMapRelationship);
            LOG.debug("Added node {serviceName={}, traceGroupName={}} into service-map relationship.", (Object)serviceNodeData.serviceName, (Object)traceGroupName);
        });
        LOG.debug("Done adding isolated service nodes");
    }

    private void rotateWindows() throws InterruptedException {
        LOG.debug("Rotating service map windows at " + clock.instant().toString());
        Object tempWindow = previousWindow;
        previousWindow = currentWindow;
        currentWindow = tempWindow;
        currentWindow.clear();
        tempWindow = previousTraceGroupWindow;
        previousTraceGroupWindow = currentTraceGroupWindow;
        currentTraceGroupWindow = tempWindow;
        currentTraceGroupWindow.clear();
        Set<ServiceNodeData> tempNodesWindow = previousIsolatedServiceNodes;
        previousIsolatedServiceNodes = currentIsolatedServiceNodes;
        currentIsolatedServiceNodes = tempNodesWindow;
        currentIsolatedServiceNodes.clear();
        previousTimestamp = clock.millis();
        LOG.debug("Done rotating service map windows");
    }

    public double getSpansDbSize() {
        return currentWindow.sizeInBytes() + previousWindow.sizeInBytes();
    }

    public double getSpansDbCount() {
        return currentWindow.size() + previousWindow.size();
    }

    public double getTraceGroupDbSize() {
        return currentTraceGroupWindow.sizeInBytes() + previousTraceGroupWindow.sizeInBytes();
    }

    public double getTraceGroupDbCount() {
        return currentTraceGroupWindow.size() + previousTraceGroupWindow.size();
    }

    public double getRelationshipCount() {
        return RELATIONSHIP_STATE.size();
    }

    private String getNewDbName() {
        return "db-" + clock.millis();
    }

    private String getNewTraceDbName() {
        return "trace-db-" + clock.millis();
    }

    private boolean windowDurationHasPassed() {
        return clock.millis() - previousTimestamp >= windowDurationMillis;
    }

    private boolean isMasterInstance() {
        return this.thisProcessorId == 0;
    }

    public Collection<String> getIdentificationKeys() {
        return Collections.singleton("traceId");
    }

    static {
        RELATIONSHIP_STATE = Sets.newConcurrentHashSet();
    }

    private static class ServiceNodeData
    implements Serializable {
        public String serviceName;
        public byte[] traceId;

        public ServiceNodeData(byte[] traceId, String serviceName) {
            this.traceId = traceId;
            this.serviceName = serviceName;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ServiceNodeData that = (ServiceNodeData)o;
            if (!Objects.equals(this.serviceName, that.serviceName)) {
                return false;
            }
            return Arrays.equals(this.traceId, that.traceId);
        }

        public int hashCode() {
            int result = this.serviceName != null ? this.serviceName.hashCode() : 0;
            result = 31 * result + Arrays.hashCode(this.traceId);
            return result;
        }
    }

    private static class ServiceMapStateData
    implements Serializable {
        public String serviceName;
        public byte[] parentSpanId;
        public byte[] traceId;
        public String spanKind;
        public String name;

        public ServiceMapStateData() {
        }

        public ServiceMapStateData(String serviceName, byte[] parentSpanId, byte[] traceId, String spanKind, String name) {
            this.serviceName = serviceName;
            this.parentSpanId = parentSpanId;
            this.traceId = traceId;
            this.spanKind = spanKind;
            this.name = name;
        }
    }
}

