/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.elasticsearch.writer;

import com.google.common.annotations.VisibleForTesting;
import com.typesafe.config.Config;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.util.List;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import org.apache.commons.math3.util.Pair;
import org.apache.gobblin.elasticsearch.writer.ElasticsearchWriterBase;
import org.apache.gobblin.elasticsearch.writer.ElasticsearchWriterConfigurationKeys;
import org.apache.gobblin.elasticsearch.writer.FutureCallbackHolder;
import org.apache.gobblin.password.PasswordManager;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.Batch;
import org.apache.gobblin.writer.BatchAsyncDataWriter;
import org.apache.gobblin.writer.WriteCallback;
import org.apache.gobblin.writer.WriteResponse;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticsearchRestWriter
extends ElasticsearchWriterBase
implements BatchAsyncDataWriter<Object> {
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchRestWriter.class);
    private final RestHighLevelClient client;
    private final RestClient lowLevelClient;

    ElasticsearchRestWriter(Config config) throws IOException {
        super(config);
        int threadCount = ConfigUtils.getInt((Config)config, (String)ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_SIZE, (Integer)5);
        try {
            PasswordManager passwordManager = PasswordManager.getInstance();
            Boolean sslEnabled = ConfigUtils.getBoolean((Config)config, (String)ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED, (boolean)false);
            if (sslEnabled.booleanValue()) {
                String keyStoreType = ConfigUtils.getString((Config)config, (String)ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE, (String)"pkcs12");
                String keyStoreFilePassword = passwordManager.readPassword(ConfigUtils.getString((Config)config, (String)ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_PASSWORD, (String)""));
                String identityFilepath = ConfigUtils.getString((Config)config, (String)ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_LOCATION, (String)"");
                String trustStoreType = ConfigUtils.getString((Config)config, (String)ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE, (String)"jks");
                String trustStoreFilePassword = passwordManager.readPassword(ConfigUtils.getString((Config)config, (String)ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_PASSWORD, (String)""));
                String cacertsFilepath = ConfigUtils.getString((Config)config, (String)ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_LOCATION, (String)"");
                String truststoreAbsolutePath = Paths.get(cacertsFilepath, new String[0]).toAbsolutePath().normalize().toString();
                log.info("Truststore absolutePath is:" + truststoreAbsolutePath);
                this.lowLevelClient = ElasticsearchRestWriter.buildRestClient(this.hostAddresses, threadCount, true, keyStoreType, keyStoreFilePassword, identityFilepath, trustStoreType, trustStoreFilePassword, cacertsFilepath);
            } else {
                this.lowLevelClient = ElasticsearchRestWriter.buildRestClient(this.hostAddresses, threadCount);
            }
            this.client = new RestHighLevelClient(this.lowLevelClient);
            log.info("Elasticsearch Rest Writer configured successfully with: indexName={}, indexType={}, idMappingEnabled={}, typeMapperClassName={}, ssl={}", new Object[]{this.indexName, this.indexType, this.idMappingEnabled, this.typeMapper.getClass().getCanonicalName(), sslEnabled});
        }
        catch (Exception e) {
            throw new IOException("Failed to instantiate rest elasticsearch client", e);
        }
    }

    @Override
    int getDefaultPort() {
        return 9200;
    }

    private static RestClient buildRestClient(List<InetSocketTransportAddress> hosts, int threadCount) throws Exception {
        return ElasticsearchRestWriter.buildRestClient(hosts, threadCount, false, null, null, null, null, null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static RestClient buildRestClient(List<InetSocketTransportAddress> hosts, int threadCount, boolean sslEnabled, String keyStoreType, String keyStoreFilePassword, String identityFilepath, String trustStoreType, String trustStoreFilePassword, String cacertsFilepath) throws Exception {
        HttpHost[] httpHosts = new HttpHost[hosts.size()];
        String scheme = sslEnabled ? "https" : "http";
        for (int h = 0; h < httpHosts.length; ++h) {
            InetSocketTransportAddress host = hosts.get(h);
            httpHosts[h] = new HttpHost(host.getAddress(), host.getPort(), scheme);
        }
        RestClientBuilder builder = RestClient.builder((HttpHost[])httpHosts);
        if (sslEnabled) {
            log.info("ssl configuration: trustStoreType = {}, cacertsFilePath = {}", (Object)trustStoreType, (Object)cacertsFilepath);
            KeyStore truststore = KeyStore.getInstance(trustStoreType);
            try (FileInputStream trustInputStream = new FileInputStream(cacertsFilepath);){
                truststore.load(trustInputStream, trustStoreFilePassword.toCharArray());
            }
            SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null);
            log.info("ssl key configuration: keyStoreType = {}, keyFilePath = {}", (Object)keyStoreType, (Object)identityFilepath);
            KeyStore keystore = KeyStore.getInstance(keyStoreType);
            try (FileInputStream keyInputStream = new FileInputStream(identityFilepath);){
                keystore.load(keyInputStream, keyStoreFilePassword.toCharArray());
            }
            sslBuilder.loadKeyMaterial(keystore, keyStoreFilePassword.toCharArray());
            SSLContext sslContext = sslBuilder.build();
            builder = builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier((HostnameVerifier)new NoopHostnameVerifier()).setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build()));
        } else {
            builder = builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build()));
        }
        builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectionRequestTimeout(0));
        return builder.build();
    }

    public Future<WriteResponse> write(Batch<Object> batch, @Nullable WriteCallback callback) {
        Pair<BulkRequest, FutureCallbackHolder> preparedBatch = this.prepareBatch(batch, callback);
        try {
            this.client.bulkAsync((BulkRequest)preparedBatch.getFirst(), ((FutureCallbackHolder)preparedBatch.getSecond()).getActionListener(), new Header[0]);
            return ((FutureCallbackHolder)preparedBatch.getSecond()).getFuture();
        }
        catch (Exception e) {
            throw new RuntimeException("Caught unexpected exception while calling bulkAsync API", e);
        }
    }

    public void flush() throws IOException {
    }

    @Override
    public void close() throws IOException {
        super.close();
        this.lowLevelClient.close();
    }

    @VisibleForTesting
    public RestHighLevelClient getRestHighLevelClient() {
        return this.client;
    }

    @VisibleForTesting
    public RestClient getRestLowLevelClient() {
        return this.lowLevelClient;
    }
}

