/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.otlp;

import com.linecorp.armeria.server.Server;
import io.grpc.BindableService;
import io.opentelemetry.proto.collector.logs.v1.LogsServiceGrpc;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.otel.codec.OTelOutputFormat;
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoOpensearchCodec;
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoStandardCodec;
import org.opensearch.dataprepper.plugins.server.CreateServer;
import org.opensearch.dataprepper.plugins.server.ServerConfiguration;
import org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsGrpcService;
import org.opensearch.dataprepper.plugins.source.otelmetrics.OTelMetricsGrpcService;
import org.opensearch.dataprepper.plugins.source.oteltrace.OTelTraceGrpcService;
import org.opensearch.dataprepper.plugins.source.otlp.ConvertConfiguration;
import org.opensearch.dataprepper.plugins.source.otlp.OTLPSourceConfig;
import org.opensearch.dataprepper.plugins.source.otlp.certificate.CertificateProviderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DataPrepperPlugin(name="otlp", pluginType=Source.class, pluginConfigurationType=OTLPSourceConfig.class)
public class OTLPSource
implements Source<Record<Object>> {
    private static final Logger LOG = LoggerFactory.getLogger(OTLPSource.class);
    static final String SERVER_CONNECTIONS = "serverConnections";
    private final OTLPSourceConfig otlpSourceConfig;
    private final String pipelineName;
    private final PluginMetrics pluginMetrics;
    private final GrpcAuthenticationProvider authenticationProvider;
    private final CertificateProviderFactory certificateProviderFactory;
    private Server server;

    @DataPrepperPluginConstructor
    public OTLPSource(OTLPSourceConfig otlpSourceConfig, PluginMetrics pluginMetrics, PluginFactory pluginFactory, PipelineDescription pipelineDescription) {
        this(otlpSourceConfig, pluginMetrics, pluginFactory, new CertificateProviderFactory(otlpSourceConfig), pipelineDescription);
    }

    OTLPSource(OTLPSourceConfig otlpSourceConfig, PluginMetrics pluginMetrics, PluginFactory pluginFactory, CertificateProviderFactory certificateProviderFactory, PipelineDescription pipelineDescription) {
        otlpSourceConfig.validateAndInitializeCertAndKeyFileInS3();
        this.otlpSourceConfig = otlpSourceConfig;
        this.pluginMetrics = pluginMetrics;
        this.certificateProviderFactory = certificateProviderFactory;
        this.pipelineName = pipelineDescription.getPipelineName();
        this.authenticationProvider = this.createAuthenticationProvider(pluginFactory);
    }

    public void start(Buffer<Record<Object>> buffer) {
        if (buffer == null) {
            throw new IllegalStateException("Buffer provided is null");
        }
        if (this.server == null) {
            Buffer<Record<Object>> metricBuffer = buffer;
            OTelLogsGrpcService oTelLogsGrpcService = new OTelLogsGrpcService((int)((double)this.otlpSourceConfig.getRequestTimeoutInMillis() * 0.8), (OTelProtoCodec.OTelProtoDecoder)(this.otlpSourceConfig.getLogsOutputFormat() == OTelOutputFormat.OPENSEARCH ? new OTelProtoOpensearchCodec.OTelProtoDecoder() : new OTelProtoStandardCodec.OTelProtoDecoder()), buffer, this.pluginMetrics);
            OTelMetricsGrpcService oTelMetricsGrpcService = new OTelMetricsGrpcService((int)((double)this.otlpSourceConfig.getRequestTimeoutInMillis() * 0.8), (OTelProtoCodec.OTelProtoDecoder)(this.otlpSourceConfig.getMetricsOutputFormat() == OTelOutputFormat.OPENSEARCH ? new OTelProtoOpensearchCodec.OTelProtoDecoder() : new OTelProtoStandardCodec.OTelProtoDecoder()), metricBuffer, this.pluginMetrics);
            OTelTraceGrpcService oTelTraceGrpcService = new OTelTraceGrpcService((int)((double)this.otlpSourceConfig.getRequestTimeoutInMillis() * 0.8), (OTelProtoCodec.OTelProtoDecoder)(this.otlpSourceConfig.getTracesOutputFormat() == OTelOutputFormat.OPENSEARCH ? new OTelProtoOpensearchCodec.OTelProtoDecoder() : new OTelProtoStandardCodec.OTelProtoDecoder()), buffer, this.pluginMetrics);
            ServerConfiguration serverConfiguration = ConvertConfiguration.convertConfiguration(this.otlpSourceConfig);
            CreateServer createServer = new CreateServer(serverConfiguration, LOG, this.pluginMetrics, "otlp", this.pipelineName);
            CertificateProvider certificateProvider = null;
            if (this.otlpSourceConfig.isSsl() || this.otlpSourceConfig.useAcmCertForSSL()) {
                certificateProvider = this.certificateProviderFactory.getCertificateProvider();
            }
            ArrayList<CreateServer.GRPCServiceConfig> serviceConfigs = new ArrayList<CreateServer.GRPCServiceConfig>();
            serviceConfigs.add(new CreateServer.GRPCServiceConfig((BindableService)oTelLogsGrpcService, this.otlpSourceConfig.getLogsPath(), LogsServiceGrpc.getExportMethod()));
            serviceConfigs.add(new CreateServer.GRPCServiceConfig((BindableService)oTelMetricsGrpcService, this.otlpSourceConfig.getMetricsPath(), MetricsServiceGrpc.getExportMethod()));
            serviceConfigs.add(new CreateServer.GRPCServiceConfig((BindableService)oTelTraceGrpcService, this.otlpSourceConfig.getTracesPath(), TraceServiceGrpc.getExportMethod()));
            this.server = createServer.createGRPCServer(this.authenticationProvider, serviceConfigs, certificateProvider);
            this.pluginMetrics.gauge(SERVER_CONNECTIONS, (Object)this.server, Server::numConnections);
        }
        try {
            this.server.start().get();
        }
        catch (ExecutionException ex) {
            if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) {
                throw (RuntimeException)ex.getCause();
            }
            throw new RuntimeException(ex);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(ex);
        }
        LOG.info("Started otlp source on port {}.", (Object)this.otlpSourceConfig.getPort());
    }

    public void stop() {
        if (this.server != null) {
            try {
                this.server.stop().get();
            }
            catch (ExecutionException ex) {
                if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) {
                    throw (RuntimeException)ex.getCause();
                }
                throw new RuntimeException(ex);
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(ex);
            }
        }
        LOG.info("Stopped otlp source.");
    }

    private GrpcAuthenticationProvider createAuthenticationProvider(PluginFactory pluginFactory) {
        PluginModel authenticationConfiguration = this.otlpSourceConfig.getAuthentication();
        if (authenticationConfiguration == null || authenticationConfiguration.getPluginName().equals("unauthenticated")) {
            LOG.warn("Creating otlp source without authentication. This is not secure.");
            LOG.warn("In order to set up Http Basic authentication for otlp source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otlp-source#authentication-configurations");
        }
        PluginSetting authenticationPluginSetting = authenticationConfiguration != null ? new PluginSetting(authenticationConfiguration.getPluginName(), authenticationConfiguration.getPluginSettings()) : new PluginSetting("unauthenticated", Collections.emptyMap());
        authenticationPluginSetting.setPipelineName(this.pipelineName);
        return (GrpcAuthenticationProvider)pluginFactory.loadPlugin(GrpcAuthenticationProvider.class, authenticationPluginSetting, new Object[0]);
    }
}

