/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.loghttp;

import com.linecorp.armeria.server.Server;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import org.opensearch.dataprepper.HttpRequestExceptionHandler;
import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider;
import org.opensearch.dataprepper.http.HttpServerConfig;
import org.opensearch.dataprepper.http.certificate.CertificateProviderFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.codec.JsonDecoder;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.log.Log;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.plugins.server.CreateServer;
import org.opensearch.dataprepper.plugins.server.ServerConfiguration;
import org.opensearch.dataprepper.plugins.source.loghttp.ConvertConfiguration;
import org.opensearch.dataprepper.plugins.source.loghttp.HTTPSourceConfig;
import org.opensearch.dataprepper.plugins.source.loghttp.LogHTTPService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DataPrepperPlugin(name="http", pluginType=Source.class, pluginConfigurationType=HTTPSourceConfig.class)
public class HTTPSource
implements Source<Record<Log>> {
    private static final String PLUGIN_NAME = "http";
    private static final Logger LOG = LoggerFactory.getLogger(HTTPSource.class);
    private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}";
    public static final String REGEX_HEALTH = "regex:^/(?!health$).*$";
    static final String SERVER_CONNECTIONS = "serverConnections";
    private final HttpServerConfig sourceConfig;
    private final CertificateProviderFactory certificateProviderFactory;
    private final ArmeriaHttpAuthenticationProvider authenticationProvider;
    private final HttpRequestExceptionHandler httpRequestExceptionHandler;
    private final String pipelineName;
    private Server server;
    private final PluginMetrics pluginMetrics;
    private static final String HTTP_HEALTH_CHECK_PATH = "/health";
    private ByteDecoder byteDecoder;
    private final InputCodec codec;

    @DataPrepperPluginConstructor
    public HTTPSource(HTTPSourceConfig sourceConfig, PluginMetrics pluginMetrics, PluginFactory pluginFactory, PipelineDescription pipelineDescription) {
        this.sourceConfig = sourceConfig;
        this.pluginMetrics = pluginMetrics;
        this.pipelineName = pipelineDescription.getPipelineName();
        this.byteDecoder = new JsonDecoder();
        this.certificateProviderFactory = new CertificateProviderFactory((HttpServerConfig)sourceConfig);
        PluginModel authenticationConfiguration = sourceConfig.getAuthentication();
        if (authenticationConfiguration == null || authenticationConfiguration.getPluginName().equals("unauthenticated")) {
            LOG.warn("Creating http source without authentication. This is not secure.");
            LOG.warn("In order to set up Http Basic authentication for the http source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#authentication-configurations");
        }
        PluginSetting authenticationPluginSetting = authenticationConfiguration != null ? new PluginSetting(authenticationConfiguration.getPluginName(), authenticationConfiguration.getPluginSettings()) : new PluginSetting("unauthenticated", Collections.emptyMap());
        authenticationPluginSetting.setPipelineName(this.pipelineName);
        this.authenticationProvider = (ArmeriaHttpAuthenticationProvider)pluginFactory.loadPlugin(ArmeriaHttpAuthenticationProvider.class, authenticationPluginSetting, new Object[0]);
        this.httpRequestExceptionHandler = new HttpRequestExceptionHandler(pluginMetrics);
        PluginModel codecConfiguration = sourceConfig.getCodec();
        if (codecConfiguration == null) {
            this.codec = null;
        } else {
            PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
            this.codec = (InputCodec)pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings, new Object[0]);
        }
    }

    public void start(Buffer<Record<Log>> buffer) {
        if (buffer == null) {
            throw new IllegalStateException("Buffer provided is null");
        }
        if (this.server == null) {
            ServerConfiguration serverConfiguration = ConvertConfiguration.convertConfiguration(this.sourceConfig);
            CreateServer createServer = new CreateServer(serverConfiguration, LOG, this.pluginMetrics, PLUGIN_NAME, this.pipelineName);
            LogHTTPService logHTTPService = new LogHTTPService(serverConfiguration.getBufferTimeoutInMillis(), buffer, this.pluginMetrics, this.codec);
            this.server = createServer.createHTTPServer(buffer, this.certificateProviderFactory, this.authenticationProvider, this.httpRequestExceptionHandler, (Object)logHTTPService);
            this.pluginMetrics.gauge(SERVER_CONNECTIONS, (Object)this.server, Server::numConnections);
        }
        try {
            this.server.start().get();
        }
        catch (ExecutionException ex) {
            if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) {
                throw (RuntimeException)ex.getCause();
            }
            throw new RuntimeException(ex);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(ex);
        }
        LOG.info("Started http source on port " + this.sourceConfig.getPort() + "...");
    }

    public ByteDecoder getDecoder() {
        return this.byteDecoder;
    }

    public void stop() {
        if (this.server != null) {
            try {
                this.server.stop().get();
            }
            catch (ExecutionException ex) {
                if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) {
                    throw (RuntimeException)ex.getCause();
                }
                throw new RuntimeException(ex);
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(ex);
            }
        }
        LOG.info("Stopped http source.");
    }
}

