/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.datastore;

import com.google.api.client.http.HttpRequestInitializer;
import com.google.auth.Credentials;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.datastore.v1.CommitRequest;
import com.google.datastore.v1.Entity;
import com.google.datastore.v1.EntityResult;
import com.google.datastore.v1.Filter;
import com.google.datastore.v1.GqlQuery;
import com.google.datastore.v1.Key;
import com.google.datastore.v1.Mutation;
import com.google.datastore.v1.PartitionId;
import com.google.datastore.v1.PropertyFilter;
import com.google.datastore.v1.PropertyOrder;
import com.google.datastore.v1.Query;
import com.google.datastore.v1.QueryResultBatch;
import com.google.datastore.v1.RunQueryRequest;
import com.google.datastore.v1.RunQueryResponse;
import com.google.datastore.v1.Value;
import com.google.datastore.v1.client.Datastore;
import com.google.datastore.v1.client.DatastoreException;
import com.google.datastore.v1.client.DatastoreFactory;
import com.google.datastore.v1.client.DatastoreHelper;
import com.google.datastore.v1.client.DatastoreOptions;
import com.google.datastore.v1.client.QuerySplitter;
import com.google.protobuf.Int32Value;
import com.google.rpc.Code;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.datastore.AutoValue_DatastoreV1_Read;
import org.apache.beam.sdk.io.gcp.datastore.MovingAverage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DatastoreV1 {
    @VisibleForTesting
    static final int DATASTORE_BATCH_UPDATE_ENTITIES_START = 200;
    @VisibleForTesting
    static final int DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT = 500;
    @VisibleForTesting
    static final int DATASTORE_BATCH_UPDATE_ENTITIES_MIN = 10;
    @VisibleForTesting
    static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 9000000;

    DatastoreV1() {
    }

    public Read read() {
        return new AutoValue_DatastoreV1_Read.Builder().setNumQuerySplits(0).build();
    }

    public Write write() {
        return new Write(null, null);
    }

    public DeleteEntity deleteEntity() {
        return new DeleteEntity(null, null);
    }

    public DeleteKey deleteKey() {
        return new DeleteKey(null, null);
    }

    static boolean isValidKey(Key key) {
        List elementList = key.getPathList();
        if (elementList.isEmpty()) {
            return false;
        }
        Key.PathElement lastElement = (Key.PathElement)elementList.get(elementList.size() - 1);
        return lastElement.getId() != 0L || !lastElement.getName().isEmpty();
    }

    @VisibleForTesting
    static class V1DatastoreFactory
    implements Serializable {
        V1DatastoreFactory() {
        }

        public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
            return this.getDatastore(pipelineOptions, projectId, null);
        }

        public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId, @Nullable String localhost) {
            Credentials credential = ((GcpOptions)pipelineOptions.as(GcpOptions.class)).getGcpCredential();
            Object initializer = credential != null ? new ChainingHttpRequestInitializer(new HttpRequestInitializer[]{new HttpCredentialsAdapter(credential), new RetryHttpRequestInitializer()}) : new RetryHttpRequestInitializer();
            DatastoreOptions.Builder builder = new DatastoreOptions.Builder().projectId(projectId).initializer((HttpRequestInitializer)initializer);
            if (localhost != null) {
                builder.localHost(localhost);
            } else {
                builder.host("batch-datastore.googleapis.com");
            }
            return DatastoreFactory.get().create(builder.build());
        }

        public QuerySplitter getQuerySplitter() {
            return DatastoreHelper.getQuerySplitter();
        }
    }

    @VisibleForTesting
    static class DeleteKeyFn
    extends SimpleFunction<Key, Mutation> {
        DeleteKeyFn() {
        }

        public Mutation apply(Key key) {
            Preconditions.checkArgument((boolean)DatastoreV1.isValidKey(key), (String)"Keys to be deleted from the Cloud Datastore must be complete:\n%s", (Object)key);
            return DatastoreHelper.makeDelete((Key)key).build();
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            builder.add(DisplayData.item((String)"deleteKeyFn", ((Object)((Object)this)).getClass()).withLabel("Create Delete Mutation"));
        }
    }

    @VisibleForTesting
    static class DeleteEntityFn
    extends SimpleFunction<Entity, Mutation> {
        DeleteEntityFn() {
        }

        public Mutation apply(Entity entity) {
            Preconditions.checkArgument((boolean)DatastoreV1.isValidKey(entity.getKey()), (String)"Entities to be deleted from the Cloud Datastore must have complete keys:\n%s", (Object)entity);
            return DatastoreHelper.makeDelete((Key)entity.getKey()).build();
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            builder.add(DisplayData.item((String)"deleteEntityFn", ((Object)((Object)this)).getClass()).withLabel("Create Delete Mutation"));
        }
    }

    @VisibleForTesting
    static class UpsertFn
    extends SimpleFunction<Entity, Mutation> {
        UpsertFn() {
        }

        public Mutation apply(Entity entity) {
            Preconditions.checkArgument((boolean)DatastoreV1.isValidKey(entity.getKey()), (String)"Entities to be written to the Cloud Datastore must have complete keys:\n%s", (Object)entity);
            return DatastoreHelper.makeUpsert((Entity)entity).build();
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            builder.add(DisplayData.item((String)"upsertFn", ((Object)((Object)this)).getClass()).withLabel("Create Upsert Mutation"));
        }
    }

    @VisibleForTesting
    static class DatastoreWriterFn
    extends DoFn<Mutation, Void> {
        private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class);
        private final ValueProvider<String> projectId;
        @Nullable
        private final String localhost;
        private transient Datastore datastore;
        private final V1DatastoreFactory datastoreFactory;
        private final List<Mutation> mutations = new ArrayList<Mutation>();
        private int mutationsSize = 0;
        private WriteBatcher writeBatcher;
        private static final int MAX_RETRIES = 5;
        private static final FluentBackoff BUNDLE_WRITE_BACKOFF = FluentBackoff.DEFAULT.withMaxRetries(5).withInitialBackoff(Duration.standardSeconds((long)5L));

        DatastoreWriterFn(String projectId, @Nullable String localhost) {
            this((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)projectId), localhost, new V1DatastoreFactory(), new WriteBatcherImpl());
        }

        DatastoreWriterFn(ValueProvider<String> projectId, @Nullable String localhost) {
            this(projectId, localhost, new V1DatastoreFactory(), new WriteBatcherImpl());
        }

        @VisibleForTesting
        DatastoreWriterFn(ValueProvider<String> projectId, @Nullable String localhost, V1DatastoreFactory datastoreFactory, WriteBatcher writeBatcher) {
            this.projectId = (ValueProvider)Preconditions.checkNotNull(projectId, (Object)"projectId");
            this.localhost = localhost;
            this.datastoreFactory = datastoreFactory;
            this.writeBatcher = writeBatcher;
        }

        @DoFn.StartBundle
        public void startBundle(DoFn.StartBundleContext c) {
            this.datastore = this.datastoreFactory.getDatastore(c.getPipelineOptions(), (String)this.projectId.get(), this.localhost);
            this.writeBatcher.start();
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws Exception {
            Mutation write = (Mutation)c.element();
            int size = write.getSerializedSize();
            if (this.mutations.size() > 0 && this.mutationsSize + size >= 9000000) {
                this.flushBatch();
            }
            this.mutations.add((Mutation)c.element());
            this.mutationsSize += size;
            if (this.mutations.size() >= this.writeBatcher.nextBatchSize(System.currentTimeMillis())) {
                this.flushBatch();
            }
        }

        @DoFn.FinishBundle
        public void finishBundle() throws Exception {
            if (!this.mutations.isEmpty()) {
                this.flushBatch();
            }
        }

        private void flushBatch() throws DatastoreException, IOException, InterruptedException {
            LOG.debug("Writing batch of {} mutations", (Object)this.mutations.size());
            Sleeper sleeper = Sleeper.DEFAULT;
            BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
            while (true) {
                long endTime;
                CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
                commitRequest.addAllMutations(this.mutations);
                commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
                long startTime = System.currentTimeMillis();
                try {
                    this.datastore.commit(commitRequest.build());
                    endTime = System.currentTimeMillis();
                    this.writeBatcher.addRequestLatency(endTime, endTime - startTime, this.mutations.size());
                }
                catch (DatastoreException exception) {
                    if (exception.getCode() == Code.DEADLINE_EXCEEDED) {
                        endTime = System.currentTimeMillis();
                        this.writeBatcher.addRequestLatency(endTime, endTime - startTime, this.mutations.size());
                    }
                    LOG.error("Error writing batch of {} mutations to Datastore ({}): {}", new Object[]{this.mutations.size(), exception.getCode(), exception.getMessage()});
                    if (BackOffUtils.next((Sleeper)sleeper, (BackOff)backoff)) continue;
                    LOG.error("Aborting after {} retries.", (Object)5);
                    throw exception;
                }
                break;
            }
            LOG.debug("Successfully wrote {} mutations", (Object)this.mutations.size());
            this.mutations.clear();
            this.mutationsSize = 0;
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item((String)"projectId", this.projectId).withLabel("Output Project"));
        }
    }

    @VisibleForTesting
    static class WriteBatcherImpl
    implements WriteBatcher,
    Serializable {
        static final int DATASTORE_BATCH_TARGET_LATENCY_MS = 5000;
        private transient MovingAverage meanLatencyPerEntityMs;

        WriteBatcherImpl() {
        }

        @Override
        public void start() {
            this.meanLatencyPerEntityMs = new MovingAverage(120000L, 10000L, 1, 1);
        }

        @Override
        public void addRequestLatency(long timeSinceEpochMillis, long latencyMillis, int numMutations) {
            this.meanLatencyPerEntityMs.add(timeSinceEpochMillis, latencyMillis / (long)numMutations);
        }

        @Override
        public int nextBatchSize(long timeSinceEpochMillis) {
            if (!this.meanLatencyPerEntityMs.hasValue(timeSinceEpochMillis)) {
                return 200;
            }
            long recentMeanLatency = Math.max(this.meanLatencyPerEntityMs.get(timeSinceEpochMillis), 1L);
            return (int)Math.max(10L, Math.min(500L, 5000L / recentMeanLatency));
        }
    }

    @VisibleForTesting
    static interface WriteBatcher {
        public void start();

        public void addRequestLatency(long var1, long var3, int var5);

        public int nextBatchSize(long var1);
    }

    private static abstract class Mutate<T>
    extends PTransform<PCollection<T>, PDone> {
        protected ValueProvider<String> projectId;
        @Nullable
        protected String localhost;
        private final SimpleFunction<T, Mutation> mutationFn;

        Mutate(@Nullable ValueProvider<String> projectId, @Nullable String localhost, SimpleFunction<T, Mutation> mutationFn) {
            this.projectId = projectId;
            this.localhost = localhost;
            this.mutationFn = (SimpleFunction)Preconditions.checkNotNull(mutationFn);
        }

        public PDone expand(PCollection<T> input) {
            ((PCollection)input.apply("Convert to Mutation", (PTransform)MapElements.via(this.mutationFn))).apply("Write Mutation to Datastore", (PTransform)ParDo.of((DoFn)new DatastoreWriterFn(this.projectId, this.localhost)));
            return PDone.in((Pipeline)input.getPipeline());
        }

        public void validate(PipelineOptions options) {
            Preconditions.checkNotNull(this.projectId, (Object)"projectId ValueProvider");
            if (this.projectId.isAccessible()) {
                Preconditions.checkNotNull((Object)this.projectId.get(), (Object)"projectId");
            }
            Preconditions.checkNotNull(this.mutationFn, (Object)"mutationFn");
        }

        public String toString() {
            return MoreObjects.toStringHelper(((Object)((Object)this)).getClass()).add("projectId", this.projectId).add("mutationFn", (Object)this.mutationFn.getClass().getName()).toString();
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item((String)"projectId", this.projectId).withLabel("Output Project")).include("mutationFn", this.mutationFn);
        }

        public String getProjectId() {
            return (String)this.projectId.get();
        }
    }

    public static class DeleteKey
    extends Mutate<Key> {
        DeleteKey(@Nullable ValueProvider<String> projectId, @Nullable String localhost) {
            super(projectId, localhost, new DeleteKeyFn());
        }

        public DeleteKey withProjectId(String projectId) {
            Preconditions.checkNotNull((Object)projectId, (Object)"projectId");
            return this.withProjectId((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)projectId));
        }

        public DeleteKey withLocalhost(String localhost) {
            Preconditions.checkNotNull((Object)localhost, (Object)"localhost");
            return new DeleteKey((ValueProvider<String>)this.projectId, localhost);
        }

        public DeleteKey withProjectId(ValueProvider<String> projectId) {
            Preconditions.checkNotNull(projectId, (Object)"projectId ValueProvider");
            return new DeleteKey(projectId, this.localhost);
        }
    }

    public static class DeleteEntity
    extends Mutate<Entity> {
        DeleteEntity(@Nullable ValueProvider<String> projectId, @Nullable String localhost) {
            super(projectId, localhost, new DeleteEntityFn());
        }

        public DeleteEntity withProjectId(String projectId) {
            Preconditions.checkNotNull((Object)projectId, (Object)"projectId");
            return this.withProjectId((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)projectId));
        }

        public DeleteEntity withProjectId(ValueProvider<String> projectId) {
            Preconditions.checkNotNull(projectId, (Object)"projectId ValueProvider");
            return new DeleteEntity(projectId, this.localhost);
        }

        public DeleteEntity withLocalhost(String localhost) {
            Preconditions.checkNotNull((Object)localhost, (Object)"localhost");
            return new DeleteEntity((ValueProvider<String>)this.projectId, localhost);
        }
    }

    public static class Write
    extends Mutate<Entity> {
        Write(@Nullable ValueProvider<String> projectId, @Nullable String localhost) {
            super(projectId, localhost, new UpsertFn());
        }

        public Write withProjectId(String projectId) {
            Preconditions.checkNotNull((Object)projectId, (Object)"projectId");
            return this.withProjectId((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)projectId));
        }

        public Write withProjectId(ValueProvider<String> projectId) {
            Preconditions.checkNotNull(projectId, (Object)"projectId ValueProvider");
            return new Write(projectId, this.localhost);
        }

        public Write withLocalhost(String localhost) {
            Preconditions.checkNotNull((Object)localhost, (Object)"localhost");
            return new Write((ValueProvider<String>)this.projectId, localhost);
        }
    }

    public static abstract class Read
    extends PTransform<PBegin, PCollection<Entity>> {
        private static final Logger LOG = LoggerFactory.getLogger(Read.class);
        public static final int NUM_QUERY_SPLITS_MAX = 50000;
        static final int NUM_QUERY_SPLITS_MIN = 12;
        static final long DEFAULT_BUNDLE_SIZE_BYTES = 0x4000000L;
        static final int QUERY_BATCH_LIMIT = 500;

        @Nullable
        public abstract ValueProvider<String> getProjectId();

        @Nullable
        public abstract Query getQuery();

        @Nullable
        public abstract ValueProvider<String> getLiteralGqlQuery();

        @Nullable
        public abstract ValueProvider<String> getNamespace();

        public abstract int getNumQuerySplits();

        @Nullable
        public abstract String getLocalhost();

        public abstract String toString();

        abstract Builder toBuilder();

        static int getEstimatedNumSplits(Datastore datastore, Query query, @Nullable String namespace) {
            int numSplits;
            try {
                long estimatedSizeBytes = Read.getEstimatedSizeBytes(datastore, query, namespace);
                LOG.info("Estimated size bytes for the query is: {}", (Object)estimatedSizeBytes);
                numSplits = (int)Math.min(50000L, Math.round((double)estimatedSizeBytes / 6.7108864E7));
            }
            catch (Exception e) {
                LOG.warn("Failed the fetch estimatedSizeBytes for query: {}", (Object)query, (Object)e);
                numSplits = 12;
            }
            return Math.max(numSplits, 12);
        }

        private static long queryLatestStatisticsTimestamp(Datastore datastore, @Nullable String namespace) throws DatastoreException {
            Query.Builder query = Query.newBuilder();
            if (Strings.isNullOrEmpty((String)namespace)) {
                query.addKindBuilder().setName("__Stat_Total__");
            } else {
                query.addKindBuilder().setName("__Stat_Ns_Total__");
            }
            query.addOrder(DatastoreHelper.makeOrder((String)"timestamp", (PropertyOrder.Direction)PropertyOrder.Direction.DESCENDING));
            query.setLimit(Int32Value.newBuilder().setValue(1));
            RunQueryRequest request = Read.makeRequest(query.build(), namespace);
            RunQueryResponse response = datastore.runQuery(request);
            QueryResultBatch batch = response.getBatch();
            if (batch.getEntityResultsCount() == 0) {
                throw new NoSuchElementException("Datastore total statistics unavailable");
            }
            Entity entity = batch.getEntityResults(0).getEntity();
            return ((Value)entity.getProperties().get("timestamp")).getTimestampValue().getSeconds() * 1000000L;
        }

        static long getEstimatedSizeBytes(Datastore datastore, Query query, @Nullable String namespace) throws DatastoreException {
            String ourKind = query.getKind(0).getName();
            long latestTimestamp = Read.queryLatestStatisticsTimestamp(datastore, namespace);
            LOG.info("Latest stats timestamp for kind {} is {}", (Object)ourKind, (Object)latestTimestamp);
            Query.Builder queryBuilder = Query.newBuilder();
            if (Strings.isNullOrEmpty((String)namespace)) {
                queryBuilder.addKindBuilder().setName("__Stat_Kind__");
            } else {
                queryBuilder.addKindBuilder().setName("__Stat_Ns_Kind__");
            }
            queryBuilder.setFilter(DatastoreHelper.makeAndFilter((Filter[])new Filter[]{DatastoreHelper.makeFilter((String)"kind_name", (PropertyFilter.Operator)PropertyFilter.Operator.EQUAL, (Value)DatastoreHelper.makeValue((String)ourKind).build()).build(), DatastoreHelper.makeFilter((String)"timestamp", (PropertyFilter.Operator)PropertyFilter.Operator.EQUAL, (Value)DatastoreHelper.makeValue((long)latestTimestamp).build()).build()}));
            RunQueryRequest request = Read.makeRequest(queryBuilder.build(), namespace);
            long now = System.currentTimeMillis();
            RunQueryResponse response = datastore.runQuery(request);
            LOG.debug("Query for per-kind statistics took {}ms", (Object)(System.currentTimeMillis() - now));
            QueryResultBatch batch = response.getBatch();
            if (batch.getEntityResultsCount() == 0) {
                throw new NoSuchElementException("Datastore statistics for kind " + ourKind + " unavailable");
            }
            Entity entity = batch.getEntityResults(0).getEntity();
            return ((Value)entity.getProperties().get("entity_bytes")).getIntegerValue();
        }

        private static PartitionId.Builder forNamespace(@Nullable String namespace) {
            PartitionId.Builder partitionBuilder = PartitionId.newBuilder();
            if (!Strings.isNullOrEmpty((String)namespace)) {
                partitionBuilder.setNamespaceId(namespace);
            }
            return partitionBuilder;
        }

        static RunQueryRequest makeRequest(Query query, @Nullable String namespace) {
            return RunQueryRequest.newBuilder().setQuery(query).setPartitionId(Read.forNamespace(namespace)).build();
        }

        @VisibleForTesting
        static RunQueryRequest makeRequest(GqlQuery gqlQuery, @Nullable String namespace) {
            return RunQueryRequest.newBuilder().setGqlQuery(gqlQuery).setPartitionId(Read.forNamespace(namespace)).build();
        }

        private static List<Query> splitQuery(Query query, @Nullable String namespace, Datastore datastore, QuerySplitter querySplitter, int numSplits) throws DatastoreException {
            return querySplitter.getSplits(query, Read.forNamespace(namespace).build(), numSplits, datastore);
        }

        @VisibleForTesting
        static Query translateGqlQueryWithLimitCheck(String gql, Datastore datastore, String namespace) throws DatastoreException {
            String gqlQueryWithZeroLimit = gql + " LIMIT 0";
            try {
                Query translatedQuery = Read.translateGqlQuery(gqlQueryWithZeroLimit, datastore, namespace);
                return translatedQuery.toBuilder().clearLimit().build();
            }
            catch (DatastoreException e) {
                if (e.getCode() == Code.INVALID_ARGUMENT) {
                    LOG.warn("Failed to translate Gql query '{}': {}", (Object)gqlQueryWithZeroLimit, (Object)e.getMessage());
                    LOG.warn("User query might have a limit already set, so trying without zero limit");
                    return Read.translateGqlQuery(gql, datastore, namespace);
                }
                throw e;
            }
        }

        private static Query translateGqlQuery(String gql, Datastore datastore, String namespace) throws DatastoreException {
            GqlQuery gqlQuery = GqlQuery.newBuilder().setQueryString(gql).setAllowLiterals(true).build();
            RunQueryRequest req = Read.makeRequest(gqlQuery, namespace);
            return datastore.runQuery(req).getQuery();
        }

        public Read withProjectId(String projectId) {
            Preconditions.checkNotNull((Object)projectId, (Object)"projectId");
            return this.toBuilder().setProjectId((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)projectId)).build();
        }

        public Read withProjectId(ValueProvider<String> projectId) {
            Preconditions.checkNotNull(projectId, (Object)"projectId");
            return this.toBuilder().setProjectId(projectId).build();
        }

        public Read withQuery(Query query) {
            Preconditions.checkNotNull((Object)query, (Object)"query");
            Preconditions.checkArgument((!query.hasLimit() || query.getLimit().getValue() > 0 ? 1 : 0) != 0, (String)"Invalid query limit %s: must be positive", (int)query.getLimit().getValue());
            return this.toBuilder().setQuery(query).build();
        }

        @Experimental(value=Experimental.Kind.SOURCE_SINK)
        public Read withLiteralGqlQuery(String gqlQuery) {
            Preconditions.checkNotNull((Object)gqlQuery, (Object)"gqlQuery");
            return this.toBuilder().setLiteralGqlQuery((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)gqlQuery)).build();
        }

        @Experimental(value=Experimental.Kind.SOURCE_SINK)
        public Read withLiteralGqlQuery(ValueProvider<String> gqlQuery) {
            Preconditions.checkNotNull(gqlQuery, (Object)"gqlQuery");
            return this.toBuilder().setLiteralGqlQuery(gqlQuery).build();
        }

        public Read withNamespace(String namespace) {
            return this.toBuilder().setNamespace((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)namespace)).build();
        }

        public Read withNamespace(ValueProvider<String> namespace) {
            return this.toBuilder().setNamespace(namespace).build();
        }

        public Read withNumQuerySplits(int numQuerySplits) {
            return this.toBuilder().setNumQuerySplits(Math.min(Math.max(numQuerySplits, 0), 50000)).build();
        }

        public Read withLocalhost(String localhost) {
            return this.toBuilder().setLocalhost(localhost).build();
        }

        public PCollection<Entity> expand(PBegin input) {
            V1Options v1Options = V1Options.from(this.getProjectId(), this.getNamespace(), this.getLocalhost());
            PCollection inputQuery = this.getQuery() != null ? (PCollection)input.apply((PTransform)Create.of((Object)this.getQuery(), (Object[])new Query[0])) : (PCollection)((PCollection)input.apply((PTransform)Create.of(this.getLiteralGqlQuery(), (Object[])new ValueProvider[0]).withCoder((Coder)SerializableCoder.of((TypeDescriptor)new TypeDescriptor<ValueProvider<String>>(){})))).apply((PTransform)ParDo.of((DoFn)new GqlQueryTranslateFn(v1Options)));
            PCollection splitQueries = (PCollection)inputQuery.apply((PTransform)ParDo.of((DoFn)new SplitQueryFn(v1Options, this.getNumQuerySplits())));
            PCollection shardedQueries = (PCollection)((PCollection)((PCollection)splitQueries.apply((PTransform)GroupByKey.create())).apply((PTransform)Values.create())).apply((PTransform)Flatten.iterables());
            PCollection entities = (PCollection)shardedQueries.apply((PTransform)ParDo.of((DoFn)new ReadFn(v1Options)));
            return entities;
        }

        public void validate(PipelineOptions options) {
            Preconditions.checkNotNull(this.getProjectId(), (Object)"projectId");
            if (this.getProjectId().isAccessible() && this.getProjectId().get() == null) {
                throw new IllegalArgumentException("Project id cannot be null");
            }
            if (this.getQuery() == null && this.getLiteralGqlQuery() == null) {
                throw new IllegalArgumentException("Either query or gql query ValueProvider should be provided");
            }
            if (this.getQuery() != null && this.getLiteralGqlQuery() != null) {
                throw new IllegalArgumentException("Only one of query or gql query ValueProvider should be provided");
            }
            if (this.getLiteralGqlQuery() != null && this.getLiteralGqlQuery().isAccessible()) {
                Preconditions.checkNotNull((Object)this.getLiteralGqlQuery().get(), (Object)"gqlQuery");
            }
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            String query = this.getQuery() == null ? null : this.getQuery().toString();
            builder.addIfNotNull(DisplayData.item((String)"projectId", this.getProjectId()).withLabel("ProjectId")).addIfNotNull(DisplayData.item((String)"namespace", this.getNamespace()).withLabel("Namespace")).addIfNotNull(DisplayData.item((String)"query", (String)query).withLabel("Query")).addIfNotNull(DisplayData.item((String)"gqlQuery", this.getLiteralGqlQuery()).withLabel("GqlQuery"));
        }

        @VisibleForTesting
        static class ReadFn
        extends DoFn<Query, Entity> {
            private final V1Options options;
            private final V1DatastoreFactory datastoreFactory;
            private transient Datastore datastore;

            public ReadFn(V1Options options) {
                this(options, new V1DatastoreFactory());
            }

            @VisibleForTesting
            ReadFn(V1Options options, V1DatastoreFactory datastoreFactory) {
                this.options = options;
                this.datastoreFactory = datastoreFactory;
            }

            @DoFn.StartBundle
            public void startBundle(DoFn.StartBundleContext c) throws Exception {
                this.datastore = this.datastoreFactory.getDatastore(c.getPipelineOptions(), this.options.getProjectId(), this.options.getLocalhost());
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context) throws Exception {
                Query query = (Query)context.element();
                String namespace = this.options.getNamespace();
                int userLimit = query.hasLimit() ? query.getLimit().getValue() : Integer.MAX_VALUE;
                boolean moreResults = true;
                QueryResultBatch currentBatch = null;
                while (moreResults) {
                    Query.Builder queryBuilder = query.toBuilder().clone();
                    queryBuilder.setLimit(Int32Value.newBuilder().setValue(Math.min(userLimit, 500)));
                    if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
                        queryBuilder.setStartCursor(currentBatch.getEndCursor());
                    }
                    RunQueryRequest request = Read.makeRequest(queryBuilder.build(), namespace);
                    RunQueryResponse response = this.datastore.runQuery(request);
                    currentBatch = response.getBatch();
                    int numFetch = currentBatch.getEntityResultsCount();
                    if (query.hasLimit()) {
                        Verify.verify((userLimit >= numFetch ? 1 : 0) != 0, (String)"Expected userLimit %s >= numFetch %s, because query limit %s must be <= userLimit", (Object[])new Object[]{userLimit, numFetch, query.getLimit()});
                        userLimit -= numFetch;
                    }
                    for (EntityResult entityResult : currentBatch.getEntityResultsList()) {
                        context.output((Object)entityResult.getEntity());
                    }
                    moreResults = userLimit > 0 && (numFetch == 500 || currentBatch.getMoreResults() == QueryResultBatch.MoreResultsType.NOT_FINISHED);
                }
            }

            public void populateDisplayData(DisplayData.Builder builder) {
                super.populateDisplayData(builder);
                builder.include("options", (HasDisplayData)this.options);
            }
        }

        @VisibleForTesting
        static class SplitQueryFn
        extends DoFn<Query, KV<Integer, Query>> {
            private final V1Options options;
            private final int numSplits;
            private final V1DatastoreFactory datastoreFactory;
            private transient Datastore datastore;
            private transient QuerySplitter querySplitter;

            public SplitQueryFn(V1Options options, int numSplits) {
                this(options, numSplits, new V1DatastoreFactory());
            }

            @VisibleForTesting
            SplitQueryFn(V1Options options, int numSplits, V1DatastoreFactory datastoreFactory) {
                this.options = options;
                this.numSplits = numSplits;
                this.datastoreFactory = datastoreFactory;
            }

            @DoFn.StartBundle
            public void startBundle(DoFn.StartBundleContext c) throws Exception {
                this.datastore = this.datastoreFactory.getDatastore(c.getPipelineOptions(), this.options.getProjectId(), this.options.getLocalhost());
                this.querySplitter = this.datastoreFactory.getQuerySplitter();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) throws Exception {
                Object querySplits;
                int key = 1;
                Query query = (Query)c.element();
                if (query.hasLimit()) {
                    c.output((Object)KV.of((Object)key, (Object)query));
                    return;
                }
                int estimatedNumSplits = this.numSplits <= 0 ? Read.getEstimatedNumSplits(this.datastore, query, this.options.getNamespace()) : this.numSplits;
                LOG.info("Splitting the query into {} splits", (Object)estimatedNumSplits);
                try {
                    querySplits = Read.splitQuery(query, this.options.getNamespace(), this.datastore, this.querySplitter, estimatedNumSplits);
                }
                catch (Exception e) {
                    LOG.warn("Unable to parallelize the given query: {}", (Object)query, (Object)e);
                    querySplits = ImmutableList.of((Object)query);
                }
                Iterator iterator = querySplits.iterator();
                while (iterator.hasNext()) {
                    Query subquery = (Query)iterator.next();
                    c.output((Object)KV.of((Object)key++, (Object)subquery));
                }
            }

            public void populateDisplayData(DisplayData.Builder builder) {
                super.populateDisplayData(builder);
                builder.include("options", (HasDisplayData)this.options);
                if (this.numSplits > 0) {
                    builder.add(DisplayData.item((String)"numQuerySplits", (Integer)this.numSplits).withLabel("Requested number of Query splits"));
                }
            }
        }

        static class GqlQueryTranslateFn
        extends DoFn<ValueProvider<String>, Query> {
            private final V1Options v1Options;
            private transient Datastore datastore;
            private final V1DatastoreFactory datastoreFactory;

            GqlQueryTranslateFn(V1Options options) {
                this(options, new V1DatastoreFactory());
            }

            GqlQueryTranslateFn(V1Options options, V1DatastoreFactory datastoreFactory) {
                this.v1Options = options;
                this.datastoreFactory = datastoreFactory;
            }

            @DoFn.StartBundle
            public void startBundle(DoFn.StartBundleContext c) throws Exception {
                this.datastore = this.datastoreFactory.getDatastore(c.getPipelineOptions(), this.v1Options.getProjectId());
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) throws Exception {
                ValueProvider gqlQuery = (ValueProvider)c.element();
                LOG.info("User query: '{}'", gqlQuery.get());
                Query query = Read.translateGqlQueryWithLimitCheck((String)gqlQuery.get(), this.datastore, this.v1Options.getNamespace());
                LOG.info("User gql query translated to Query({})", (Object)query);
                c.output((Object)query);
            }
        }

        @VisibleForTesting
        static class V1Options
        implements HasDisplayData,
        Serializable {
            private final ValueProvider<String> project;
            @Nullable
            private final ValueProvider<String> namespace;
            @Nullable
            private final String localhost;

            private V1Options(ValueProvider<String> project, ValueProvider<String> namespace, String localhost) {
                this.project = project;
                this.namespace = namespace;
                this.localhost = localhost;
            }

            public static V1Options from(String projectId, String namespace, String localhost) {
                return V1Options.from((ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)projectId), (ValueProvider<String>)ValueProvider.StaticValueProvider.of((Object)namespace), localhost);
            }

            public static V1Options from(ValueProvider<String> project, ValueProvider<String> namespace, String localhost) {
                return new V1Options(project, namespace, localhost);
            }

            public String getProjectId() {
                return (String)this.project.get();
            }

            @Nullable
            public String getNamespace() {
                return this.namespace == null ? null : (String)this.namespace.get();
            }

            public ValueProvider<String> getProjectValueProvider() {
                return this.project;
            }

            @Nullable
            public ValueProvider<String> getNamespaceValueProvider() {
                return this.namespace;
            }

            @Nullable
            public String getLocalhost() {
                return this.localhost;
            }

            public void populateDisplayData(DisplayData.Builder builder) {
                builder.addIfNotNull(DisplayData.item((String)"projectId", this.getProjectValueProvider()).withLabel("ProjectId")).addIfNotNull(DisplayData.item((String)"namespace", this.getNamespaceValueProvider()).withLabel("Namespace"));
            }
        }

        static abstract class Builder {
            Builder() {
            }

            abstract Builder setProjectId(ValueProvider<String> var1);

            abstract Builder setQuery(Query var1);

            abstract Builder setLiteralGqlQuery(ValueProvider<String> var1);

            abstract Builder setNamespace(ValueProvider<String> var1);

            abstract Builder setNumQuerySplits(int var1);

            abstract Builder setLocalhost(String var1);

            abstract Read build();
        }
    }
}

