/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.elasticsearch;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import com.google.auto.value.AutoValue;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import javax.net.ssl.SSLContext;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.elasticsearch.AutoValue_ElasticsearchIO_ConnectionConfiguration;
import org.apache.beam.sdk.io.elasticsearch.AutoValue_ElasticsearchIO_Read;
import org.apache.beam.sdk.io.elasticsearch.AutoValue_ElasticsearchIO_RetryConfiguration;
import org.apache.beam.sdk.io.elasticsearch.AutoValue_ElasticsearchIO_Write;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.ssl.TrustStrategy;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class ElasticsearchIO {
    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIO.class);
    private static final ObjectMapper mapper = new ObjectMapper();

    public static Read read() {
        return new AutoValue_ElasticsearchIO_Read.Builder().setWithMetadata(false).setScrollKeepalive("5m").setBatchSize(100L).build();
    }

    public static Write write() {
        return new AutoValue_ElasticsearchIO_Write.Builder().setMaxBatchSize(1000L).setMaxBatchSizeBytes(0x500000L).setUsePartialUpdate(false).build();
    }

    private ElasticsearchIO() {
    }

    @VisibleForTesting
    static JsonNode parseResponse(HttpEntity responseEntity) throws IOException {
        return (JsonNode)mapper.readValue(responseEntity.getContent(), JsonNode.class);
    }

    static void checkForErrors(HttpEntity responseEntity, int backendVersion, boolean partialUpdate) throws IOException {
        JsonNode searchResult = ElasticsearchIO.parseResponse(responseEntity);
        boolean errors = searchResult.path("errors").asBoolean();
        if (errors) {
            StringBuilder errorMessages = new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
            JsonNode items = searchResult.path("items");
            for (JsonNode item : items) {
                JsonNode errorRoot;
                JsonNode error;
                String errorRootName = "";
                if (partialUpdate) {
                    errorRootName = "update";
                } else if (backendVersion == 2) {
                    errorRootName = "create";
                } else if (backendVersion >= 5) {
                    errorRootName = "index";
                }
                if ((error = (errorRoot = item.path(errorRootName)).get("error")) == null) continue;
                String type = error.path("type").asText();
                String reason = error.path("reason").asText();
                String docId = errorRoot.path("_id").asText();
                errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type));
                JsonNode causedBy = error.get("caused_by");
                if (causedBy == null) continue;
                String cbReason = causedBy.path("reason").asText();
                String cbType = causedBy.path("type").asText();
                errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType));
            }
            throw new IOException(errorMessages.toString());
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    static int getBackendVersion(ConnectionConfiguration connectionConfiguration) {
        try (RestClient restClient = connectionConfiguration.createClient();){
            Request request = new Request("GET", "");
            Response response = restClient.performRequest(request);
            JsonNode jsonNode = ElasticsearchIO.parseResponse(response.getEntity());
            int backendVersion = Integer.parseInt(jsonNode.path("version").path("number").asText().substring(0, 1));
            Preconditions.checkArgument((backendVersion == 2 || backendVersion == 5 || backendVersion == 6 || backendVersion == 7 ? 1 : 0) != 0, (String)"The Elasticsearch version to connect to is %s.x. This version of the ElasticsearchIO is only compatible with Elasticsearch v7.x, v6.x, v5.x and v2.x", (int)backendVersion);
            int n = backendVersion;
            return n;
        }
        catch (IOException e) {
            throw new IllegalArgumentException("Cannot get Elasticsearch version", e);
        }
    }

    @AutoValue
    public static abstract class Write
    extends PTransform<PCollection<String>, PDone> {
        abstract @Nullable ConnectionConfiguration getConnectionConfiguration();

        abstract long getMaxBatchSize();

        abstract long getMaxBatchSizeBytes();

        abstract @Nullable FieldValueExtractFn getIdFn();

        abstract @Nullable FieldValueExtractFn getIndexFn();

        abstract @Nullable FieldValueExtractFn getTypeFn();

        abstract @Nullable RetryConfiguration getRetryConfiguration();

        abstract boolean getUsePartialUpdate();

        abstract Builder builder();

        public Write withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
            Preconditions.checkArgument((connectionConfiguration != null ? 1 : 0) != 0, (Object)"connectionConfiguration can not be null");
            return this.builder().setConnectionConfiguration(connectionConfiguration).build();
        }

        public Write withMaxBatchSize(long batchSize) {
            Preconditions.checkArgument((batchSize > 0L ? 1 : 0) != 0, (String)"batchSize must be > 0, but was %s", (long)batchSize);
            return this.builder().setMaxBatchSize(batchSize).build();
        }

        public Write withMaxBatchSizeBytes(long batchSizeBytes) {
            Preconditions.checkArgument((batchSizeBytes > 0L ? 1 : 0) != 0, (String)"batchSizeBytes must be > 0, but was %s", (long)batchSizeBytes);
            return this.builder().setMaxBatchSizeBytes(batchSizeBytes).build();
        }

        public Write withIdFn(FieldValueExtractFn idFn) {
            Preconditions.checkArgument((idFn != null ? 1 : 0) != 0, (Object)"idFn must not be null");
            return this.builder().setIdFn(idFn).build();
        }

        public Write withIndexFn(FieldValueExtractFn indexFn) {
            Preconditions.checkArgument((indexFn != null ? 1 : 0) != 0, (Object)"indexFn must not be null");
            return this.builder().setIndexFn(indexFn).build();
        }

        public Write withTypeFn(FieldValueExtractFn typeFn) {
            Preconditions.checkArgument((typeFn != null ? 1 : 0) != 0, (Object)"typeFn must not be null");
            return this.builder().setTypeFn(typeFn).build();
        }

        public Write withUsePartialUpdate(boolean usePartialUpdate) {
            return this.builder().setUsePartialUpdate(usePartialUpdate).build();
        }

        public Write withRetryConfiguration(RetryConfiguration retryConfiguration) {
            Preconditions.checkArgument((retryConfiguration != null ? 1 : 0) != 0, (Object)"retryConfiguration is required");
            return this.builder().setRetryConfiguration(retryConfiguration).build();
        }

        public PDone expand(PCollection<String> input) {
            ConnectionConfiguration connectionConfiguration = this.getConnectionConfiguration();
            Preconditions.checkState((connectionConfiguration != null ? 1 : 0) != 0, (Object)"withConnectionConfiguration() is required");
            input.apply((PTransform)ParDo.of((DoFn)new WriteFn(this)));
            return PDone.in((Pipeline)input.getPipeline());
        }

        @VisibleForTesting
        static class WriteFn
        extends DoFn<String, Void> {
            private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
            private static final int DEFAULT_RETRY_ON_CONFLICT = 5;
            private static final Duration RETRY_INITIAL_BACKOFF = Duration.standardSeconds((long)5L);
            @VisibleForTesting
            static final String RETRY_ATTEMPT_LOG = "Error writing to Elasticsearch. Retry attempt[%d]";
            @VisibleForTesting
            static final String RETRY_FAILED_LOG = "Error writing to ES after %d attempt(s). No more attempts allowed";
            private transient FluentBackoff retryBackoff;
            private int backendVersion;
            private final Write spec;
            private transient RestClient restClient;
            private ArrayList<String> batch;
            private long currentBatchSizeBytes;

            @VisibleForTesting
            WriteFn(Write spec) {
                this.spec = spec;
            }

            @DoFn.Setup
            public void setup() throws IOException {
                ConnectionConfiguration connectionConfiguration = this.spec.getConnectionConfiguration();
                this.backendVersion = ElasticsearchIO.getBackendVersion(connectionConfiguration);
                this.restClient = connectionConfiguration.createClient();
                this.retryBackoff = FluentBackoff.DEFAULT.withMaxRetries(0).withInitialBackoff(RETRY_INITIAL_BACKOFF);
                if (this.spec.getRetryConfiguration() != null) {
                    this.retryBackoff = FluentBackoff.DEFAULT.withInitialBackoff(RETRY_INITIAL_BACKOFF).withMaxRetries(this.spec.getRetryConfiguration().getMaxAttempts() - 1).withMaxCumulativeBackoff(this.spec.getRetryConfiguration().getMaxDuration());
                }
                SimpleModule module = new SimpleModule();
                module.addSerializer(DocumentMetadata.class, (JsonSerializer)new DocumentMetadataSerializer());
                OBJECT_MAPPER.registerModule((Module)module);
            }

            @DoFn.StartBundle
            public void startBundle(DoFn.StartBundleContext context) {
                this.batch = new ArrayList();
                this.currentBatchSizeBytes = 0L;
            }

            private String getDocumentMetadata(String document) throws IOException {
                if (this.spec.getIndexFn() != null || this.spec.getTypeFn() != null || this.spec.getIdFn() != null) {
                    JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
                    DocumentMetadata metadata = new DocumentMetadata(this.spec.getIndexFn() != null ? WriteFn.lowerCaseOrNull((String)this.spec.getIndexFn().apply(parsedDocument)) : null, this.spec.getTypeFn() != null ? (String)this.spec.getTypeFn().apply(parsedDocument) : null, this.spec.getIdFn() != null ? (String)this.spec.getIdFn().apply(parsedDocument) : null, this.spec.getUsePartialUpdate() ? Integer.valueOf(5) : null);
                    return OBJECT_MAPPER.writeValueAsString((Object)metadata);
                }
                return "{}";
            }

            private static String lowerCaseOrNull(String input) {
                return input == null ? null : input.toLowerCase();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context) throws Exception {
                String document = (String)context.element();
                String documentMetadata = this.getDocumentMetadata(document);
                if (this.spec.getUsePartialUpdate()) {
                    this.batch.add(String.format("{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" : true }%n", documentMetadata, document));
                } else {
                    this.batch.add(String.format("{ \"index\" : %s }%n%s%n", documentMetadata, document));
                }
                this.currentBatchSizeBytes += (long)document.getBytes(StandardCharsets.UTF_8).length;
                if ((long)this.batch.size() >= this.spec.getMaxBatchSize() || this.currentBatchSizeBytes >= this.spec.getMaxBatchSizeBytes()) {
                    this.flushBatch();
                }
            }

            @DoFn.FinishBundle
            public void finishBundle(DoFn.FinishBundleContext context) throws IOException, InterruptedException {
                this.flushBatch();
            }

            private void flushBatch() throws IOException, InterruptedException {
                if (this.batch.isEmpty()) {
                    return;
                }
                StringBuilder bulkRequest = new StringBuilder();
                for (String json : this.batch) {
                    bulkRequest.append(json);
                }
                this.batch.clear();
                this.currentBatchSizeBytes = 0L;
                String endPoint = String.format("/%s/%s/_bulk", this.spec.getConnectionConfiguration().getIndex(), this.spec.getConnectionConfiguration().getType());
                NStringEntity requestBody = new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON);
                Request request = new Request("POST", endPoint);
                request.addParameters(Collections.emptyMap());
                request.setEntity((HttpEntity)requestBody);
                Response response = this.restClient.performRequest(request);
                BufferedHttpEntity responseEntity = new BufferedHttpEntity(response.getEntity());
                if (this.spec.getRetryConfiguration() != null && this.spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) {
                    responseEntity = this.handleRetry("POST", endPoint, Collections.emptyMap(), (HttpEntity)requestBody);
                }
                ElasticsearchIO.checkForErrors((HttpEntity)responseEntity, this.backendVersion, this.spec.getUsePartialUpdate());
            }

            private HttpEntity handleRetry(String method, String endpoint, Map<String, String> params, HttpEntity requestBody) throws IOException, InterruptedException {
                Sleeper sleeper = Sleeper.DEFAULT;
                BackOff backoff = this.retryBackoff.backoff();
                int attempt = 0;
                while (BackOffUtils.next((Sleeper)sleeper, (BackOff)backoff)) {
                    LOG.warn(String.format(RETRY_ATTEMPT_LOG, ++attempt));
                    Request request = new Request(method, endpoint);
                    request.addParameters(params);
                    request.setEntity(requestBody);
                    Response response = this.restClient.performRequest(request);
                    BufferedHttpEntity responseEntity = new BufferedHttpEntity(response.getEntity());
                    if (this.spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) continue;
                    return responseEntity;
                }
                throw new IOException(String.format(RETRY_FAILED_LOG, attempt));
            }

            @DoFn.Teardown
            public void closeClient() throws IOException {
                if (this.restClient != null) {
                    this.restClient.close();
                }
            }

            private class DocumentMetadataSerializer
            extends StdSerializer<DocumentMetadata> {
                private DocumentMetadataSerializer() {
                    super(DocumentMetadata.class);
                }

                public void serialize(DocumentMetadata value, JsonGenerator gen, SerializerProvider provider) throws IOException {
                    gen.writeStartObject();
                    if (value.index != null) {
                        gen.writeStringField("_index", value.index);
                    }
                    if (value.type != null) {
                        gen.writeStringField("_type", value.type);
                    }
                    if (value.id != null) {
                        gen.writeStringField("_id", value.id);
                    }
                    if (value.retryOnConflict != null && WriteFn.this.backendVersion <= 6) {
                        gen.writeNumberField("_retry_on_conflict", value.retryOnConflict.intValue());
                    }
                    if (value.retryOnConflict != null && WriteFn.this.backendVersion >= 7) {
                        gen.writeNumberField("retry_on_conflict", value.retryOnConflict.intValue());
                    }
                    gen.writeEndObject();
                }
            }

            private static class DocumentMetadata
            implements Serializable {
                final String index;
                final String type;
                final String id;
                final Integer retryOnConflict;

                DocumentMetadata(String index, String type, String id, Integer retryOnConflict) {
                    this.index = index;
                    this.type = type;
                    this.id = id;
                    this.retryOnConflict = retryOnConflict;
                }
            }
        }

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract Builder setConnectionConfiguration(ConnectionConfiguration var1);

            abstract Builder setMaxBatchSize(long var1);

            abstract Builder setMaxBatchSizeBytes(long var1);

            abstract Builder setIdFn(FieldValueExtractFn var1);

            abstract Builder setIndexFn(FieldValueExtractFn var1);

            abstract Builder setTypeFn(FieldValueExtractFn var1);

            abstract Builder setUsePartialUpdate(boolean var1);

            abstract Builder setRetryConfiguration(RetryConfiguration var1);

            abstract Write build();
        }

        public static interface FieldValueExtractFn
        extends SerializableFunction<JsonNode, String> {
        }
    }

    @AutoValue
    public static abstract class RetryConfiguration
    implements Serializable {
        @VisibleForTesting
        static final RetryPredicate DEFAULT_RETRY_PREDICATE = new DefaultRetryPredicate();

        abstract int getMaxAttempts();

        abstract Duration getMaxDuration();

        abstract RetryPredicate getRetryPredicate();

        abstract Builder builder();

        public static RetryConfiguration create(int maxAttempts, Duration maxDuration) {
            Preconditions.checkArgument((maxAttempts > 0 ? 1 : 0) != 0, (Object)"maxAttempts must be greater than 0");
            Preconditions.checkArgument((maxDuration != null && maxDuration.isLongerThan((ReadableDuration)Duration.ZERO) ? 1 : 0) != 0, (Object)"maxDuration must be greater than 0");
            return new AutoValue_ElasticsearchIO_RetryConfiguration.Builder().setMaxAttempts(maxAttempts).setMaxDuration(maxDuration).setRetryPredicate(DEFAULT_RETRY_PREDICATE).build();
        }

        @VisibleForTesting
        RetryConfiguration withRetryPredicate(RetryPredicate predicate) {
            Preconditions.checkArgument((predicate != null ? 1 : 0) != 0, (Object)"predicate must be provided");
            return this.builder().setRetryPredicate(predicate).build();
        }

        @VisibleForTesting
        static class DefaultRetryPredicate
        implements RetryPredicate {
            private int errorCode;

            DefaultRetryPredicate(int code) {
                this.errorCode = code;
            }

            DefaultRetryPredicate() {
                this(429);
            }

            private static boolean errorCodePresent(HttpEntity responseEntity, int errorCode) {
                try {
                    JsonNode json = ElasticsearchIO.parseResponse(responseEntity);
                    if (json.path("errors").asBoolean()) {
                        for (JsonNode item : json.path("items")) {
                            if (item.findValue("status").asInt() != errorCode) continue;
                            return true;
                        }
                    }
                }
                catch (IOException e) {
                    LOG.warn("Could not extract error codes from responseEntity {}", (Object)responseEntity);
                }
                return false;
            }

            @Override
            public boolean test(HttpEntity responseEntity) {
                return DefaultRetryPredicate.errorCodePresent(responseEntity, this.errorCode);
            }
        }

        @FunctionalInterface
        static interface RetryPredicate
        extends Predicate<HttpEntity>,
        Serializable {
        }

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract Builder setMaxAttempts(int var1);

            abstract Builder setMaxDuration(Duration var1);

            abstract Builder setRetryPredicate(RetryPredicate var1);

            abstract RetryConfiguration build();
        }
    }

    private static class BoundedElasticsearchReader
    extends BoundedSource.BoundedReader<String> {
        private final BoundedElasticsearchSource source;
        private RestClient restClient;
        private String current;
        private String scrollId;
        private ListIterator<String> batchIterator;

        private BoundedElasticsearchReader(BoundedElasticsearchSource source) {
            this.source = source;
        }

        public boolean start() throws IOException {
            String query;
            this.restClient = this.source.spec.getConnectionConfiguration().createClient();
            String string = query = this.source.spec.getQuery() != null ? (String)this.source.spec.getQuery().get() : null;
            if (query == null) {
                query = "{\"query\": { \"match_all\": {} }}";
            }
            if (this.source.backendVersion >= 5 && this.source.numSlices != null && this.source.numSlices > 1) {
                String sliceQuery = String.format("\"slice\": {\"id\": %s,\"max\": %s}", this.source.sliceId, this.source.numSlices);
                query = query.replaceFirst("\\{", "{" + sliceQuery + ",");
            }
            String endPoint = String.format("/%s/%s/_search", this.source.spec.getConnectionConfiguration().getIndex(), this.source.spec.getConnectionConfiguration().getType());
            HashMap<String, String> params = new HashMap<String, String>();
            params.put("scroll", this.source.spec.getScrollKeepalive());
            if (this.source.backendVersion == 2) {
                params.put("size", String.valueOf(this.source.spec.getBatchSize()));
                if (this.source.shardPreference != null) {
                    params.put("preference", "_shards:" + this.source.shardPreference);
                }
            }
            NStringEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON);
            Request request = new Request("GET", endPoint);
            request.addParameters(params);
            request.setEntity((HttpEntity)queryEntity);
            Response response = this.restClient.performRequest(request);
            JsonNode searchResult = ElasticsearchIO.parseResponse(response.getEntity());
            this.updateScrollId(searchResult);
            return this.readNextBatchAndReturnFirstDocument(searchResult);
        }

        private void updateScrollId(JsonNode searchResult) {
            this.scrollId = searchResult.path("_scroll_id").asText();
        }

        public boolean advance() throws IOException {
            if (this.batchIterator.hasNext()) {
                this.current = this.batchIterator.next();
                return true;
            }
            String requestBody = String.format("{\"scroll\" : \"%s\",\"scroll_id\" : \"%s\"}", this.source.spec.getScrollKeepalive(), this.scrollId);
            NStringEntity scrollEntity = new NStringEntity(requestBody, ContentType.APPLICATION_JSON);
            Request request = new Request("GET", "/_search/scroll");
            request.addParameters(Collections.emptyMap());
            request.setEntity((HttpEntity)scrollEntity);
            Response response = this.restClient.performRequest(request);
            JsonNode searchResult = ElasticsearchIO.parseResponse(response.getEntity());
            this.updateScrollId(searchResult);
            return this.readNextBatchAndReturnFirstDocument(searchResult);
        }

        private boolean readNextBatchAndReturnFirstDocument(JsonNode searchResult) {
            JsonNode hits = searchResult.path("hits").path("hits");
            if (hits.size() == 0) {
                this.current = null;
                this.batchIterator = null;
                return false;
            }
            ArrayList<String> batch = new ArrayList<String>();
            boolean withMetadata = this.source.spec.isWithMetadata();
            for (JsonNode hit : hits) {
                if (withMetadata) {
                    batch.add(hit.toString());
                    continue;
                }
                String document = hit.path("_source").toString();
                batch.add(document);
            }
            this.batchIterator = batch.listIterator();
            this.current = this.batchIterator.next();
            return true;
        }

        public String getCurrent() throws NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.current;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close() throws IOException {
            String requestBody = String.format("{\"scroll_id\" : [\"%s\"]}", this.scrollId);
            NStringEntity entity = new NStringEntity(requestBody, ContentType.APPLICATION_JSON);
            try {
                Request request = new Request("DELETE", "/_search/scroll");
                request.addParameters(Collections.emptyMap());
                request.setEntity((HttpEntity)entity);
                this.restClient.performRequest(request);
            }
            finally {
                if (this.restClient != null) {
                    this.restClient.close();
                }
            }
        }

        public BoundedSource<String> getCurrentSource() {
            return this.source;
        }
    }

    @VisibleForTesting
    public static class BoundedElasticsearchSource
    extends BoundedSource<String> {
        private int backendVersion;
        private final Read spec;
        private final @Nullable String shardPreference;
        private final @Nullable Integer numSlices;
        private final @Nullable Integer sliceId;
        private @Nullable Long estimatedByteSize;

        private BoundedElasticsearchSource(Read spec, @Nullable String shardPreference, @Nullable Integer numSlices, @Nullable Integer sliceId, @Nullable Long estimatedByteSize, int backendVersion) {
            this.backendVersion = backendVersion;
            this.spec = spec;
            this.shardPreference = shardPreference;
            this.numSlices = numSlices;
            this.estimatedByteSize = estimatedByteSize;
            this.sliceId = sliceId;
        }

        @VisibleForTesting
        BoundedElasticsearchSource(Read spec, @Nullable String shardPreference, @Nullable Integer numSlices, @Nullable Integer sliceId) {
            this.spec = spec;
            this.shardPreference = shardPreference;
            this.numSlices = numSlices;
            this.sliceId = sliceId;
        }

        public List<? extends BoundedSource<String>> split(long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
            ConnectionConfiguration connectionConfiguration = this.spec.getConnectionConfiguration();
            this.backendVersion = ElasticsearchIO.getBackendVersion(connectionConfiguration);
            ArrayList<BoundedElasticsearchSource> sources = new ArrayList<BoundedElasticsearchSource>();
            if (this.backendVersion == 2) {
                JsonNode statsJson = BoundedElasticsearchSource.getStats(connectionConfiguration, true);
                JsonNode shardsJson = statsJson.path("indices").path(connectionConfiguration.getIndex()).path("shards");
                Iterator shards = shardsJson.fields();
                while (shards.hasNext()) {
                    Map.Entry shardJson = (Map.Entry)shards.next();
                    String shardId = (String)shardJson.getKey();
                    sources.add(new BoundedElasticsearchSource(this.spec, shardId, null, null, null, this.backendVersion));
                }
                Preconditions.checkArgument((!sources.isEmpty() ? 1 : 0) != 0, (Object)"No shard found");
            } else if (this.backendVersion >= 5) {
                long indexSize = this.getEstimatedSizeBytes(options);
                float nbBundlesFloat = (float)indexSize / (float)desiredBundleSizeBytes;
                int nbBundles = (int)Math.ceil(nbBundlesFloat);
                if (nbBundles > 1024) {
                    nbBundles = 1024;
                }
                for (int i = 0; i < nbBundles; ++i) {
                    long estimatedByteSizeForBundle = this.getEstimatedSizeBytes(options) / (long)nbBundles;
                    sources.add(new BoundedElasticsearchSource(this.spec, null, nbBundles, i, estimatedByteSizeForBundle, this.backendVersion));
                }
            }
            return sources;
        }

        public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
            String query;
            if (this.estimatedByteSize != null) {
                return this.estimatedByteSize;
            }
            ConnectionConfiguration connectionConfiguration = this.spec.getConnectionConfiguration();
            JsonNode statsJson = BoundedElasticsearchSource.getStats(connectionConfiguration, false);
            JsonNode indexStats = statsJson.path("indices").path(connectionConfiguration.getIndex()).path("primaries");
            long indexSize = indexStats.path("store").path("size_in_bytes").asLong();
            LOG.debug("estimate source byte size: total index size " + indexSize);
            String string = query = this.spec.getQuery() != null ? (String)this.spec.getQuery().get() : null;
            if (query == null || query.isEmpty()) {
                this.estimatedByteSize = indexSize;
                return this.estimatedByteSize;
            }
            long totalCount = indexStats.path("docs").path("count").asLong();
            LOG.debug("estimate source byte size: total document count " + totalCount);
            if (totalCount == 0L) {
                this.estimatedByteSize = 1L;
                return this.estimatedByteSize;
            }
            String endPoint = String.format("/%s/%s/_count", connectionConfiguration.getIndex(), connectionConfiguration.getType());
            try (RestClient restClient = connectionConfiguration.createClient();){
                long count = this.queryCount(restClient, endPoint, query);
                LOG.debug("estimate source byte size: query document count " + count);
                this.estimatedByteSize = count == 0L ? Long.valueOf(1L) : Long.valueOf(indexSize / totalCount * count);
            }
            return this.estimatedByteSize;
        }

        private long queryCount(@Nonnull RestClient restClient, @Nonnull String endPoint, @Nonnull String query) throws IOException {
            Request request = new Request("GET", endPoint);
            request.setEntity((HttpEntity)new NStringEntity(query, ContentType.APPLICATION_JSON));
            JsonNode searchResult = ElasticsearchIO.parseResponse(restClient.performRequest(request).getEntity());
            return searchResult.path("count").asLong();
        }

        @VisibleForTesting
        static long estimateIndexSize(ConnectionConfiguration connectionConfiguration) throws IOException {
            JsonNode statsJson = BoundedElasticsearchSource.getStats(connectionConfiguration, false);
            JsonNode indexStats = statsJson.path("indices").path(connectionConfiguration.getIndex()).path("primaries");
            JsonNode store = indexStats.path("store");
            return store.path("size_in_bytes").asLong();
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            this.spec.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item((String)"shard", (String)this.shardPreference));
            builder.addIfNotNull(DisplayData.item((String)"numSlices", (Integer)this.numSlices));
            builder.addIfNotNull(DisplayData.item((String)"sliceId", (Integer)this.sliceId));
        }

        public BoundedSource.BoundedReader<String> createReader(PipelineOptions options) {
            return new BoundedElasticsearchReader(this);
        }

        public void validate() {
            this.spec.validate(null);
        }

        public Coder<String> getOutputCoder() {
            return StringUtf8Coder.of();
        }

        private static JsonNode getStats(ConnectionConfiguration connectionConfiguration, boolean shardLevel) throws IOException {
            HashMap<String, String> params = new HashMap<String, String>();
            if (shardLevel) {
                params.put("level", "shards");
            }
            String endpoint = String.format("/%s/_stats", connectionConfiguration.getIndex());
            try (RestClient restClient = connectionConfiguration.createClient();){
                Request request = new Request("GET", endpoint);
                request.addParameters(params);
                JsonNode jsonNode = ElasticsearchIO.parseResponse(restClient.performRequest(request).getEntity());
                return jsonNode;
            }
        }
    }

    @AutoValue
    public static abstract class Read
    extends PTransform<PBegin, PCollection<String>> {
        private static final long MAX_BATCH_SIZE = 10000L;

        abstract @Nullable ConnectionConfiguration getConnectionConfiguration();

        abstract @Nullable ValueProvider<String> getQuery();

        abstract boolean isWithMetadata();

        abstract String getScrollKeepalive();

        abstract long getBatchSize();

        abstract Builder builder();

        public Read withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
            Preconditions.checkArgument((connectionConfiguration != null ? 1 : 0) != 0, (Object)"connectionConfiguration can not be null");
            return this.builder().setConnectionConfiguration(connectionConfiguration).build();
        }

        public Read withQuery(String query) {
            Preconditions.checkArgument((query != null ? 1 : 0) != 0, (Object)"query can not be null");
            Preconditions.checkArgument((!query.isEmpty() ? 1 : 0) != 0, (Object)"query can not be empty");
            return this.withQuery((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)query));
        }

        public Read withQuery(ValueProvider<String> query) {
            Preconditions.checkArgument((query != null ? 1 : 0) != 0, (Object)"query can not be null");
            return this.builder().setQuery(query).build();
        }

        public Read withMetadata() {
            return this.builder().setWithMetadata(true).build();
        }

        public Read withScrollKeepalive(String scrollKeepalive) {
            Preconditions.checkArgument((scrollKeepalive != null ? 1 : 0) != 0, (Object)"scrollKeepalive can not be null");
            Preconditions.checkArgument((!"0m".equals(scrollKeepalive) ? 1 : 0) != 0, (Object)"scrollKeepalive can not be 0m");
            return this.builder().setScrollKeepalive(scrollKeepalive).build();
        }

        public Read withBatchSize(long batchSize) {
            Preconditions.checkArgument((batchSize > 0L && batchSize <= 10000L ? 1 : 0) != 0, (String)"batchSize must be > 0 and <= %s, but was: %s", (long)10000L, (long)batchSize);
            return this.builder().setBatchSize(batchSize).build();
        }

        public PCollection<String> expand(PBegin input) {
            ConnectionConfiguration connectionConfiguration = this.getConnectionConfiguration();
            Preconditions.checkState((connectionConfiguration != null ? 1 : 0) != 0, (Object)"withConnectionConfiguration() is required");
            return (PCollection)input.apply((PTransform)org.apache.beam.sdk.io.Read.from((BoundedSource)new BoundedElasticsearchSource(this, null, null, null)));
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item((String)"query", this.getQuery()));
            builder.addIfNotNull(DisplayData.item((String)"withMetadata", (Boolean)this.isWithMetadata()));
            builder.addIfNotNull(DisplayData.item((String)"batchSize", (Long)this.getBatchSize()));
            builder.addIfNotNull(DisplayData.item((String)"scrollKeepalive", (String)this.getScrollKeepalive()));
            this.getConnectionConfiguration().populateDisplayData(builder);
        }

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract Builder setConnectionConfiguration(ConnectionConfiguration var1);

            abstract Builder setQuery(ValueProvider<String> var1);

            abstract Builder setWithMetadata(boolean var1);

            abstract Builder setScrollKeepalive(String var1);

            abstract Builder setBatchSize(long var1);

            abstract Read build();
        }
    }

    @AutoValue
    public static abstract class ConnectionConfiguration
    implements Serializable {
        public abstract List<String> getAddresses();

        public abstract @Nullable String getUsername();

        public abstract @Nullable String getPassword();

        public abstract @Nullable String getKeystorePath();

        public abstract @Nullable String getKeystorePassword();

        public abstract String getIndex();

        public abstract String getType();

        public abstract @Nullable Integer getSocketTimeout();

        public abstract @Nullable Integer getConnectTimeout();

        public abstract boolean isTrustSelfSignedCerts();

        abstract Builder builder();

        public static ConnectionConfiguration create(String[] addresses, String index, String type) {
            Preconditions.checkArgument((addresses != null ? 1 : 0) != 0, (Object)"addresses can not be null");
            Preconditions.checkArgument((addresses.length > 0 ? 1 : 0) != 0, (Object)"addresses can not be empty");
            Preconditions.checkArgument((index != null ? 1 : 0) != 0, (Object)"index can not be null");
            Preconditions.checkArgument((type != null ? 1 : 0) != 0, (Object)"type can not be null");
            return new AutoValue_ElasticsearchIO_ConnectionConfiguration.Builder().setAddresses(Arrays.asList(addresses)).setIndex(index).setType(type).setTrustSelfSignedCerts(false).build();
        }

        public ConnectionConfiguration withUsername(String username) {
            Preconditions.checkArgument((username != null ? 1 : 0) != 0, (Object)"username can not be null");
            Preconditions.checkArgument((!username.isEmpty() ? 1 : 0) != 0, (Object)"username can not be empty");
            return this.builder().setUsername(username).build();
        }

        public ConnectionConfiguration withPassword(String password) {
            Preconditions.checkArgument((password != null ? 1 : 0) != 0, (Object)"password can not be null");
            Preconditions.checkArgument((!password.isEmpty() ? 1 : 0) != 0, (Object)"password can not be empty");
            return this.builder().setPassword(password).build();
        }

        public ConnectionConfiguration withKeystorePath(String keystorePath) {
            Preconditions.checkArgument((keystorePath != null ? 1 : 0) != 0, (Object)"keystorePath can not be null");
            Preconditions.checkArgument((!keystorePath.isEmpty() ? 1 : 0) != 0, (Object)"keystorePath can not be empty");
            return this.builder().setKeystorePath(keystorePath).build();
        }

        public ConnectionConfiguration withKeystorePassword(String keystorePassword) {
            Preconditions.checkArgument((keystorePassword != null ? 1 : 0) != 0, (Object)"keystorePassword can not be null");
            return this.builder().setKeystorePassword(keystorePassword).build();
        }

        public ConnectionConfiguration withTrustSelfSignedCerts(boolean trustSelfSignedCerts) {
            return this.builder().setTrustSelfSignedCerts(trustSelfSignedCerts).build();
        }

        public ConnectionConfiguration withSocketTimeout(Integer socketTimeout) {
            Preconditions.checkArgument((socketTimeout != null ? 1 : 0) != 0, (Object)"socketTimeout can not be null");
            return this.builder().setSocketTimeout(socketTimeout).build();
        }

        public ConnectionConfiguration withConnectTimeout(Integer connectTimeout) {
            Preconditions.checkArgument((connectTimeout != null ? 1 : 0) != 0, (Object)"connectTimeout can not be null");
            return this.builder().setConnectTimeout(connectTimeout).build();
        }

        private void populateDisplayData(DisplayData.Builder builder) {
            builder.add(DisplayData.item((String)"address", (String)this.getAddresses().toString()));
            builder.add(DisplayData.item((String)"index", (String)this.getIndex()));
            builder.add(DisplayData.item((String)"type", (String)this.getType()));
            builder.addIfNotNull(DisplayData.item((String)"username", (String)this.getUsername()));
            builder.addIfNotNull(DisplayData.item((String)"keystore.path", (String)this.getKeystorePath()));
            builder.addIfNotNull(DisplayData.item((String)"socketTimeout", (Integer)this.getSocketTimeout()));
            builder.addIfNotNull(DisplayData.item((String)"connectTimeout", (Integer)this.getConnectTimeout()));
            builder.addIfNotNull(DisplayData.item((String)"trustSelfSignedCerts", (Boolean)this.isTrustSelfSignedCerts()));
        }

        @VisibleForTesting
        RestClient createClient() throws IOException {
            HttpHost[] hosts = new HttpHost[this.getAddresses().size()];
            int i = 0;
            for (String address : this.getAddresses()) {
                URL url = new URL(address);
                hosts[i] = new HttpHost(url.getHost(), url.getPort(), url.getProtocol());
                ++i;
            }
            RestClientBuilder restClientBuilder = RestClient.builder((HttpHost[])hosts);
            if (this.getUsername() != null) {
                BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY, (Credentials)new UsernamePasswordCredentials(this.getUsername(), this.getPassword()));
                restClientBuilder.setHttpClientConfigCallback(arg_0 -> ConnectionConfiguration.lambda$createClient$0((CredentialsProvider)credentialsProvider, arg_0));
            }
            if (this.getKeystorePath() != null && !this.getKeystorePath().isEmpty()) {
                try {
                    KeyStore keyStore = KeyStore.getInstance("jks");
                    try (FileInputStream is = new FileInputStream(new File(this.getKeystorePath()));){
                        String keystorePassword = this.getKeystorePassword();
                        keyStore.load(is, keystorePassword == null ? null : keystorePassword.toCharArray());
                    }
                    TrustSelfSignedStrategy trustStrategy = this.isTrustSelfSignedCerts() ? new TrustSelfSignedStrategy() : null;
                    SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(keyStore, (TrustStrategy)trustStrategy).build();
                    SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext);
                    restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setSSLContext(sslContext).setSSLStrategy((SchemeIOSessionStrategy)sessionStrategy));
                }
                catch (Exception e) {
                    throw new IOException("Can't load the client certificate from the keystore", e);
                }
            }
            restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback(){

                public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
                    if (this.getConnectTimeout() != null) {
                        requestConfigBuilder.setConnectTimeout(this.getConnectTimeout().intValue());
                    }
                    if (this.getSocketTimeout() != null) {
                        requestConfigBuilder.setSocketTimeout(this.getSocketTimeout().intValue());
                    }
                    return requestConfigBuilder;
                }
            });
            return restClientBuilder.build();
        }

        private static /* synthetic */ HttpAsyncClientBuilder lambda$createClient$0(CredentialsProvider credentialsProvider, HttpAsyncClientBuilder httpAsyncClientBuilder) {
            return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
        }

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract Builder setAddresses(List<String> var1);

            abstract Builder setUsername(String var1);

            abstract Builder setPassword(String var1);

            abstract Builder setKeystorePath(String var1);

            abstract Builder setKeystorePassword(String var1);

            abstract Builder setIndex(String var1);

            abstract Builder setType(String var1);

            abstract Builder setSocketTimeout(Integer var1);

            abstract Builder setConnectTimeout(Integer var1);

            abstract Builder setTrustSelfSignedCerts(boolean var1);

            abstract ConnectionConfiguration build();
        }
    }
}

