/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.opentelemetry;

import com.google.protobuf.Message;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.EventServerFactory;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.opentelemetry.io.RequestCallback;
import org.apache.nifi.processors.opentelemetry.io.RequestCallbackProvider;
import org.apache.nifi.processors.opentelemetry.server.HttpServerFactory;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.SSLContextProvider;

@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@DefaultSchedule(period="25 ms")
@Tags(value={"OpenTelemetry", "OTel", "OTLP", "telemetry", "metrics", "traces", "logs"})
@CapabilityDescription(value="Collect OpenTelemetry messages over HTTP or gRPC. Supports standard Export Service Request messages for logs, metrics, and traces. Implements OpenTelemetry OTLP Specification 1.0.0 with OTLP/gRPC and OTLP/HTTP. Provides protocol detection using the HTTP Content-Type header.")
@WritesAttributes(value={@WritesAttribute(attribute="mime.type", description="Content-Type set to application/json"), @WritesAttribute(attribute="resource.type", description="OpenTelemetry Resource Type: LOGS, METRICS, or TRACES"), @WritesAttribute(attribute="resource.count", description="Count of resource elements included in messages")})
public class ListenOTLP
extends AbstractProcessor {
    static final PropertyDescriptor ADDRESS = new PropertyDescriptor.Builder().name("Address").displayName("Address").description("Internet Protocol Address on which to listen for OTLP Export Service Requests. The default value enables listening on all addresses.").required(true).defaultValue("0.0.0.0").addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    static final PropertyDescriptor PORT = new PropertyDescriptor.Builder().name("Port").displayName("Port").description("TCP port number on which to listen for OTLP Export Service Requests over HTTP and gRPC").required(true).defaultValue("4317").addValidator(StandardValidators.PORT_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL Context Service").displayName("SSL Context Service").description("SSL Context Service enables TLS communication for HTTPS").required(true).identifiesControllerService(SSLContextProvider.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    static final PropertyDescriptor CLIENT_AUTHENTICATION = new PropertyDescriptor.Builder().name("Client Authentication").displayName("Client Authentication").description("Client authentication policy for TLS communication with HTTPS").required(true).allowableValues((Enum[])ClientAuth.values()).defaultValue(ClientAuth.WANT.name()).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    static final PropertyDescriptor WORKER_THREADS = new PropertyDescriptor.Builder().name("Worker Threads").displayName("Worker Threads").description("Number of threads responsible for decoding and queuing incoming OTLP Export Service Requests").required(true).defaultValue("2").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    static final PropertyDescriptor QUEUE_CAPACITY = new PropertyDescriptor.Builder().name("Queue Capacity").displayName("Queue Capacity").description("Maximum number of OTLP request resource elements that can be received and queued").required(true).defaultValue("1000").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").displayName("Batch Size").description("Maximum number of OTLP request resource elements included in each FlowFile produced").required(true).defaultValue("100").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    static final Relationship SUCCESS = new Relationship.Builder().name("success").description("Export Service Requests containing OTLP Telemetry").build();
    private static final Set<Relationship> RELATIONSHIPS = Collections.singleton(SUCCESS);
    private static final List<PropertyDescriptor> DESCRIPTORS = List.of(ADDRESS, PORT, SSL_CONTEXT_SERVICE, CLIENT_AUTHENTICATION, WORKER_THREADS, QUEUE_CAPACITY, BATCH_SIZE);
    private static final String TRANSIT_URI_FORMAT = "https://%s:%d";
    private Iterator<RequestCallback> requestCallbackProvider;
    private EventServer server;

    public final Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return DESCRIPTORS;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) throws UnknownHostException {
        EventServerFactory eventServerFactory = this.createEventServerFactory(context);
        this.server = eventServerFactory.getEventServer();
    }

    @OnStopped
    public void onStopped() {
        if (this.server != null) {
            this.server.shutdown();
            this.server = null;
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        while (this.requestCallbackProvider.hasNext()) {
            RequestCallback requestCallback = this.requestCallbackProvider.next();
            this.processRequestCallback(session, requestCallback);
        }
    }

    int getPort() {
        return this.server.getListeningPort();
    }

    private void processRequestCallback(ProcessSession session, RequestCallback requestCallback) {
        String transitUri = requestCallback.getTransitUri();
        FlowFile flowFile = session.create();
        try {
            flowFile = session.write(flowFile, (OutputStreamCallback)requestCallback);
            flowFile = session.putAllAttributes(flowFile, requestCallback.getAttributes());
            session.getProvenanceReporter().receive(flowFile, transitUri);
            session.transfer(flowFile, SUCCESS);
        }
        catch (Exception e) {
            this.getLogger().warn("Request Transit URI [{}] processing failed {}", new Object[]{transitUri, flowFile, e});
            session.remove(flowFile);
        }
    }

    private EventServerFactory createEventServerFactory(ProcessContext context) throws UnknownHostException {
        String address = context.getProperty(ADDRESS).getValue();
        InetAddress serverAddress = InetAddress.getByName(address);
        int port = context.getProperty(PORT).asInteger();
        URI transitBaseUri = URI.create(String.format(TRANSIT_URI_FORMAT, serverAddress.getCanonicalHostName(), port));
        int batchSize = context.getProperty(BATCH_SIZE).asInteger();
        int queueCapacity = context.getProperty(QUEUE_CAPACITY).asInteger();
        LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<Message>(queueCapacity);
        this.requestCallbackProvider = new RequestCallbackProvider(transitBaseUri, batchSize, messages);
        SSLContextProvider sslContextProvider = (SSLContextProvider)context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextProvider.class);
        SSLContext sslContext = sslContextProvider.createContext();
        HttpServerFactory eventServerFactory = new HttpServerFactory(this.getLogger(), messages, serverAddress, port, sslContext);
        eventServerFactory.setThreadNamePrefix(String.format("%s[%s]", ((Object)((Object)this)).getClass().getSimpleName(), this.getIdentifier()));
        int workerThreads = context.getProperty(WORKER_THREADS).asInteger();
        eventServerFactory.setWorkerThreads(workerThreads);
        ClientAuth clientAuth = ClientAuth.valueOf((String)context.getProperty(CLIENT_AUTHENTICATION).getValue());
        eventServerFactory.setClientAuth(clientAuth);
        return eventServerFactory;
    }
}

