/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.elasticsearch;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.elasticsearch.AutoValue_ElasticsearchIO_ConnectionConfiguration;
import org.apache.beam.sdk.io.elasticsearch.AutoValue_ElasticsearchIO_Read;
import org.apache.beam.sdk.io.elasticsearch.AutoValue_ElasticsearchIO_Write;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

public class ElasticsearchIO {
    public static Read read() {
        return new AutoValue_ElasticsearchIO_Read.Builder().setScrollKeepalive("5m").setBatchSize(100L).build();
    }

    public static Write write() {
        return new AutoValue_ElasticsearchIO_Write.Builder().setMaxBatchSize(1000L).setMaxBatchSizeBytes(0x500000L).build();
    }

    private ElasticsearchIO() {
    }

    private static JsonObject parseResponse(Response response) throws IOException {
        InputStream content = response.getEntity().getContent();
        InputStreamReader inputStreamReader = new InputStreamReader(content, "UTF-8");
        JsonObject jsonObject = (JsonObject)new Gson().fromJson((Reader)inputStreamReader, JsonObject.class);
        return jsonObject;
    }

    public static abstract class Write
    extends PTransform<PCollection<String>, PDone> {
        @Nullable
        abstract ConnectionConfiguration getConnectionConfiguration();

        abstract long getMaxBatchSize();

        abstract long getMaxBatchSizeBytes();

        abstract Builder builder();

        public Write withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
            Preconditions.checkArgument((connectionConfiguration != null ? 1 : 0) != 0, (Object)"ElasticsearchIO.write().withConnectionConfiguration(configuration) called with null configuration");
            return this.builder().setConnectionConfiguration(connectionConfiguration).build();
        }

        public Write withMaxBatchSize(long batchSize) {
            Preconditions.checkArgument((batchSize > 0L ? 1 : 0) != 0, (Object)"ElasticsearchIO.write().withMaxBatchSize(batchSize) called with incorrect <= 0 value");
            return this.builder().setMaxBatchSize(batchSize).build();
        }

        public Write withMaxBatchSizeBytes(long batchSizeBytes) {
            Preconditions.checkArgument((batchSizeBytes > 0L ? 1 : 0) != 0, (Object)"ElasticsearchIO.write().withMaxBatchSizeBytes(batchSizeBytes) called with incorrect <= 0 value");
            return this.builder().setMaxBatchSizeBytes(batchSizeBytes).build();
        }

        public void validate(PCollection<String> input) {
            Preconditions.checkState((this.getConnectionConfiguration() != null ? 1 : 0) != 0, (Object)"ElasticsearchIO.write() requires a connection configuration to be set via withConnectionConfiguration(configuration)");
        }

        public PDone expand(PCollection<String> input) {
            input.apply((PTransform)ParDo.of((DoFn)new WriteFn(this)));
            return PDone.in((Pipeline)input.getPipeline());
        }

        @VisibleForTesting
        static class WriteFn
        extends DoFn<String, Void> {
            private final Write spec;
            private transient RestClient restClient;
            private ArrayList<String> batch;
            private long currentBatchSizeBytes;

            WriteFn(Write spec) {
                this.spec = spec;
            }

            @DoFn.Setup
            public void createClient() throws Exception {
                this.restClient = this.spec.getConnectionConfiguration().createClient();
            }

            @DoFn.StartBundle
            public void startBundle(DoFn.Context context) throws Exception {
                this.batch = new ArrayList();
                this.currentBatchSizeBytes = 0L;
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context) throws Exception {
                String document = (String)context.element();
                this.batch.add(String.format("{ \"index\" : {} }%n%s%n", document));
                this.currentBatchSizeBytes += (long)document.getBytes().length;
                if ((long)this.batch.size() >= this.spec.getMaxBatchSize() || this.currentBatchSizeBytes >= this.spec.getMaxBatchSizeBytes()) {
                    this.finishBundle((DoFn.Context)context);
                }
            }

            @DoFn.FinishBundle
            public void finishBundle(DoFn.Context context) throws Exception {
                if (this.batch.isEmpty()) {
                    return;
                }
                StringBuilder bulkRequest = new StringBuilder();
                for (String json : this.batch) {
                    bulkRequest.append(json);
                }
                this.batch.clear();
                this.currentBatchSizeBytes = 0L;
                String endPoint = String.format("/%s/%s/_bulk", this.spec.getConnectionConfiguration().getIndex(), this.spec.getConnectionConfiguration().getType());
                NStringEntity requestBody = new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON);
                Response response = this.restClient.performRequest("POST", endPoint, Collections.emptyMap(), (HttpEntity)requestBody, new Header[]{new BasicHeader("", "")});
                JsonObject searchResult = ElasticsearchIO.parseResponse(response);
                boolean errors = searchResult.getAsJsonPrimitive("errors").getAsBoolean();
                if (errors) {
                    StringBuilder errorMessages = new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
                    JsonArray items = searchResult.getAsJsonArray("items");
                    for (JsonElement item : items) {
                        JsonObject creationObject = item.getAsJsonObject().getAsJsonObject("create");
                        JsonObject error = creationObject.getAsJsonObject("error");
                        if (error == null) continue;
                        String type = error.getAsJsonPrimitive("type").getAsString();
                        String reason = error.getAsJsonPrimitive("reason").getAsString();
                        String docId = creationObject.getAsJsonPrimitive("_id").getAsString();
                        errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type));
                        JsonObject causedBy = error.getAsJsonObject("caused_by");
                        if (causedBy == null) continue;
                        String cbReason = causedBy.getAsJsonPrimitive("reason").getAsString();
                        String cbType = causedBy.getAsJsonPrimitive("type").getAsString();
                        errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType));
                    }
                    throw new IOException(errorMessages.toString());
                }
            }

            @DoFn.Teardown
            public void closeClient() throws Exception {
                if (this.restClient != null) {
                    this.restClient.close();
                }
            }
        }

        static abstract class Builder {
            Builder() {
            }

            abstract Builder setConnectionConfiguration(ConnectionConfiguration var1);

            abstract Builder setMaxBatchSize(long var1);

            abstract Builder setMaxBatchSizeBytes(long var1);

            abstract Write build();
        }
    }

    private static class BoundedElasticsearchReader
    extends BoundedSource.BoundedReader<String> {
        private final BoundedElasticsearchSource source;
        private RestClient restClient;
        private String current;
        private String scrollId;
        private ListIterator<String> batchIterator;

        private BoundedElasticsearchReader(BoundedElasticsearchSource source) {
            this.source = source;
        }

        public boolean start() throws IOException {
            this.restClient = this.source.spec.getConnectionConfiguration().createClient();
            String query = this.source.spec.getQuery();
            if (query == null) {
                query = "{ \"query\": { \"match_all\": {} } }";
            }
            String endPoint = String.format("/%s/%s/_search", this.source.spec.getConnectionConfiguration().getIndex(), this.source.spec.getConnectionConfiguration().getType());
            HashMap<String, String> params = new HashMap<String, String>();
            params.put("scroll", this.source.spec.getScrollKeepalive());
            params.put("size", String.valueOf(this.source.spec.getBatchSize()));
            if (this.source.shardPreference != null) {
                params.put("preference", "_shards:" + this.source.shardPreference);
            }
            NStringEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON);
            Response response = this.restClient.performRequest("GET", endPoint, params, (HttpEntity)queryEntity, new Header[]{new BasicHeader("", "")});
            JsonObject searchResult = ElasticsearchIO.parseResponse(response);
            this.updateScrollId(searchResult);
            return this.readNextBatchAndReturnFirstDocument(searchResult);
        }

        private void updateScrollId(JsonObject searchResult) {
            this.scrollId = searchResult.getAsJsonPrimitive("_scroll_id").getAsString();
        }

        public boolean advance() throws IOException {
            if (this.batchIterator.hasNext()) {
                this.current = this.batchIterator.next();
                return true;
            }
            String requestBody = String.format("{\"scroll\" : \"%s\",\"scroll_id\" : \"%s\"}", this.source.spec.getScrollKeepalive(), this.scrollId);
            NStringEntity scrollEntity = new NStringEntity(requestBody, ContentType.APPLICATION_JSON);
            Response response = this.restClient.performRequest("GET", "/_search/scroll", Collections.emptyMap(), (HttpEntity)scrollEntity, new Header[]{new BasicHeader("", "")});
            JsonObject searchResult = ElasticsearchIO.parseResponse(response);
            this.updateScrollId(searchResult);
            return this.readNextBatchAndReturnFirstDocument(searchResult);
        }

        private boolean readNextBatchAndReturnFirstDocument(JsonObject searchResult) {
            JsonArray hits = searchResult.getAsJsonObject("hits").getAsJsonArray("hits");
            if (hits.size() == 0) {
                this.current = null;
                this.batchIterator = null;
                return false;
            }
            ArrayList<String> batch = new ArrayList<String>();
            for (JsonElement hit : hits) {
                String document = hit.getAsJsonObject().getAsJsonObject("_source").toString();
                batch.add(document);
            }
            this.batchIterator = batch.listIterator();
            this.current = this.batchIterator.next();
            return true;
        }

        public String getCurrent() throws NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.current;
        }

        public void close() throws IOException {
            String requestBody = String.format("{\"scroll_id\" : [\"%s\"]}", this.scrollId);
            NStringEntity entity = new NStringEntity(requestBody, ContentType.APPLICATION_JSON);
            try {
                this.restClient.performRequest("DELETE", "/_search/scroll", Collections.emptyMap(), (HttpEntity)entity, new Header[]{new BasicHeader("", "")});
            }
            finally {
                if (this.restClient != null) {
                    this.restClient.close();
                }
            }
        }

        public BoundedSource<String> getCurrentSource() {
            return this.source;
        }
    }

    @VisibleForTesting
    static class BoundedElasticsearchSource
    extends BoundedSource<String> {
        private final Read spec;
        @Nullable
        private final String shardPreference;

        BoundedElasticsearchSource(Read spec, @Nullable String shardPreference) {
            this.spec = spec;
            this.shardPreference = shardPreference;
        }

        public List<? extends BoundedSource<String>> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
            ArrayList<BoundedElasticsearchSource> sources = new ArrayList<BoundedElasticsearchSource>();
            JsonObject statsJson = this.getStats(true);
            JsonObject shardsJson = statsJson.getAsJsonObject("indices").getAsJsonObject(this.spec.getConnectionConfiguration().getIndex()).getAsJsonObject("shards");
            Set shards = shardsJson.entrySet();
            for (Map.Entry shardJson : shards) {
                String shardId = (String)shardJson.getKey();
                JsonArray value = (JsonArray)shardJson.getValue();
                boolean isPrimaryShard = value.get(0).getAsJsonObject().getAsJsonObject("routing").getAsJsonPrimitive("primary").getAsBoolean();
                if (!isPrimaryShard) continue;
                sources.add(new BoundedElasticsearchSource(this.spec, shardId));
            }
            Preconditions.checkArgument((!sources.isEmpty() ? 1 : 0) != 0, (Object)"No primary shard found");
            return sources;
        }

        public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
            JsonObject statsJson = this.getStats(false);
            JsonObject indexStats = statsJson.getAsJsonObject("indices").getAsJsonObject(this.spec.getConnectionConfiguration().getIndex()).getAsJsonObject("primaries");
            JsonObject store = indexStats.getAsJsonObject("store");
            return store.getAsJsonPrimitive("size_in_bytes").getAsLong();
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            this.spec.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item((String)"shard", (String)this.shardPreference));
        }

        public BoundedSource.BoundedReader<String> createReader(PipelineOptions options) throws IOException {
            return new BoundedElasticsearchReader(this);
        }

        public void validate() {
            this.spec.validate((PBegin)null);
        }

        public Coder<String> getDefaultOutputCoder() {
            return StringUtf8Coder.of();
        }

        private JsonObject getStats(boolean shardLevel) throws IOException {
            HashMap<String, String> params = new HashMap<String, String>();
            if (shardLevel) {
                params.put("level", "shards");
            }
            String endpoint = String.format("/%s/_stats", this.spec.getConnectionConfiguration().getIndex());
            try (RestClient restClient = this.spec.getConnectionConfiguration().createClient();){
                JsonObject jsonObject = ElasticsearchIO.parseResponse(restClient.performRequest("GET", endpoint, params, new Header[]{new BasicHeader("", "")}));
                return jsonObject;
            }
        }
    }

    public static abstract class Read
    extends PTransform<PBegin, PCollection<String>> {
        private static final long MAX_BATCH_SIZE = 10000L;

        @Nullable
        abstract ConnectionConfiguration getConnectionConfiguration();

        @Nullable
        abstract String getQuery();

        abstract String getScrollKeepalive();

        abstract long getBatchSize();

        abstract Builder builder();

        public Read withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
            Preconditions.checkArgument((connectionConfiguration != null ? 1 : 0) != 0, (Object)"ElasticsearchIO.read().withConnectionConfiguration(configuration) called with null configuration");
            return this.builder().setConnectionConfiguration(connectionConfiguration).build();
        }

        public Read withQuery(String query) {
            Preconditions.checkArgument((!Strings.isNullOrEmpty((String)query) ? 1 : 0) != 0, (Object)"ElasticsearchIO.read().withQuery(query) called with null or empty query");
            return this.builder().setQuery(query).build();
        }

        public Read withScrollKeepalive(String scrollKeepalive) {
            Preconditions.checkArgument((scrollKeepalive != null && !scrollKeepalive.equals("0m") ? 1 : 0) != 0, (Object)"ElasticsearchIO.read().withScrollKeepalive(keepalive) called with null or \"0m\" keepalive");
            return this.builder().setScrollKeepalive(scrollKeepalive).build();
        }

        public Read withBatchSize(long batchSize) {
            Preconditions.checkArgument((batchSize > 0L ? 1 : 0) != 0, (String)"ElasticsearchIO.read().withBatchSize(batchSize) called with a negative or equal to 0 value: %s", (Object[])new Object[]{batchSize});
            Preconditions.checkArgument((batchSize <= 10000L ? 1 : 0) != 0, (String)"ElasticsearchIO.read().withBatchSize(batchSize) called with a too large value (over %s): %s", (Object[])new Object[]{10000L, batchSize});
            return this.builder().setBatchSize(batchSize).build();
        }

        public PCollection<String> expand(PBegin input) {
            return (PCollection)input.apply((PTransform)org.apache.beam.sdk.io.Read.from((BoundedSource)new BoundedElasticsearchSource(this, null)));
        }

        public void validate(PBegin input) {
            Preconditions.checkState((this.getConnectionConfiguration() != null ? 1 : 0) != 0, (Object)"ElasticsearchIO.read() requires a connection configuration to be set via withConnectionConfiguration(configuration)");
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item((String)"query", (String)this.getQuery()));
            this.getConnectionConfiguration().populateDisplayData(builder);
        }

        static abstract class Builder {
            Builder() {
            }

            abstract Builder setConnectionConfiguration(ConnectionConfiguration var1);

            abstract Builder setQuery(String var1);

            abstract Builder setScrollKeepalive(String var1);

            abstract Builder setBatchSize(long var1);

            abstract Read build();
        }
    }

    public static abstract class ConnectionConfiguration
    implements Serializable {
        abstract List<String> getAddresses();

        @Nullable
        abstract String getUsername();

        @Nullable
        abstract String getPassword();

        abstract String getIndex();

        abstract String getType();

        abstract Builder builder();

        public static ConnectionConfiguration create(String[] addresses, String index, String type) {
            Preconditions.checkArgument((addresses != null ? 1 : 0) != 0, (Object)"ConnectionConfiguration.create(addresses, index, type) called with null address");
            Preconditions.checkArgument((addresses.length != 0 ? 1 : 0) != 0, (Object)"ConnectionConfiguration.create(addresses, index, type) called with empty addresses");
            Preconditions.checkArgument((index != null ? 1 : 0) != 0, (Object)"ConnectionConfiguration.create(addresses, index, type) called with null index");
            Preconditions.checkArgument((type != null ? 1 : 0) != 0, (Object)"ConnectionConfiguration.create(addresses, index, type) called with null type");
            return new AutoValue_ElasticsearchIO_ConnectionConfiguration.Builder().setAddresses(Arrays.asList(addresses)).setIndex(index).setType(type).build();
        }

        public ConnectionConfiguration withUsername(String username) {
            Preconditions.checkArgument((username != null ? 1 : 0) != 0, (Object)"ConnectionConfiguration.create().withUsername(username) called with null username");
            Preconditions.checkArgument((!username.isEmpty() ? 1 : 0) != 0, (Object)"ConnectionConfiguration.create().withUsername(username) called with empty username");
            return this.builder().setUsername(username).build();
        }

        public ConnectionConfiguration withPassword(String password) {
            Preconditions.checkArgument((password != null ? 1 : 0) != 0, (Object)"ConnectionConfiguration.create().withPassword(password) called with null password");
            Preconditions.checkArgument((!password.isEmpty() ? 1 : 0) != 0, (Object)"ConnectionConfiguration.create().withPassword(password) called with empty password");
            return this.builder().setPassword(password).build();
        }

        private void populateDisplayData(DisplayData.Builder builder) {
            builder.add(DisplayData.item((String)"address", (String)this.getAddresses().toString()));
            builder.add(DisplayData.item((String)"index", (String)this.getIndex()));
            builder.add(DisplayData.item((String)"type", (String)this.getType()));
            builder.addIfNotNull(DisplayData.item((String)"username", (String)this.getUsername()));
        }

        private RestClient createClient() throws MalformedURLException {
            HttpHost[] hosts = new HttpHost[this.getAddresses().size()];
            int i = 0;
            for (String address : this.getAddresses()) {
                URL url = new URL(address);
                hosts[i] = new HttpHost(url.getHost(), url.getPort(), url.getProtocol());
                ++i;
            }
            RestClientBuilder restClientBuilder = RestClient.builder((HttpHost[])hosts);
            if (this.getUsername() != null) {
                BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY, (Credentials)new UsernamePasswordCredentials(this.getUsername(), this.getPassword()));
                restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback((CredentialsProvider)credentialsProvider){
                    final /* synthetic */ CredentialsProvider val$credentialsProvider;
                    {
                        this.val$credentialsProvider = credentialsProvider;
                    }

                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                        return httpAsyncClientBuilder.setDefaultCredentialsProvider(this.val$credentialsProvider);
                    }
                });
            }
            return restClientBuilder.build();
        }

        static abstract class Builder {
            Builder() {
            }

            abstract Builder setAddresses(List<String> var1);

            abstract Builder setUsername(String var1);

            abstract Builder setPassword(String var1);

            abstract Builder setIndex(String var1);

            abstract Builder setType(String var1);

            abstract ConnectionConfiguration build();
        }
    }
}

