/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.mongodb;

import com.google.auto.value.AutoValue;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.mongodb.AutoValue_MongoDbIO_Read;
import org.apache.beam.sdk.io.mongodb.AutoValue_MongoDbIO_Write;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdks.java.io.mongodb.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.sdks.java.io.mongodb.repackaged.com.google.common.base.Preconditions;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class MongoDbIO {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDbIO.class);

    public static Read read() {
        return new AutoValue_MongoDbIO_Read.Builder().setKeepAlive(true).setMaxConnectionIdleTime(60000).setNumSplits(0).build();
    }

    public static Write write() {
        return new AutoValue_MongoDbIO_Write.Builder().setKeepAlive(true).setMaxConnectionIdleTime(60000).setBatchSize(1024L).build();
    }

    private MongoDbIO() {
    }

    @AutoValue
    public static abstract class Write
    extends PTransform<PCollection<Document>, PDone> {
        @Nullable
        abstract String uri();

        abstract boolean keepAlive();

        abstract int maxConnectionIdleTime();

        @Nullable
        abstract String database();

        @Nullable
        abstract String collection();

        abstract long batchSize();

        abstract Builder builder();

        public Write withUri(String uri) {
            Preconditions.checkArgument(uri != null, "uri can not be null");
            return this.builder().setUri(uri).build();
        }

        public Write withKeepAlive(boolean keepAlive) {
            return this.builder().setKeepAlive(keepAlive).build();
        }

        public Write withMaxConnectionIdleTime(int maxConnectionIdleTime) {
            return this.builder().setMaxConnectionIdleTime(maxConnectionIdleTime).build();
        }

        public Write withDatabase(String database) {
            Preconditions.checkArgument(database != null, "database can not be null");
            return this.builder().setDatabase(database).build();
        }

        public Write withCollection(String collection) {
            Preconditions.checkArgument(collection != null, "collection can not be null");
            return this.builder().setCollection(collection).build();
        }

        public Write withBatchSize(long batchSize) {
            Preconditions.checkArgument(batchSize >= 0L, "Batch size must be >= 0, but was %d", batchSize);
            return this.builder().setBatchSize(batchSize).build();
        }

        public PDone expand(PCollection<Document> input) {
            Preconditions.checkArgument(this.uri() != null, "withUri() is required");
            Preconditions.checkArgument(this.database() != null, "withDatabase() is required");
            Preconditions.checkArgument(this.collection() != null, "withCollection() is required");
            input.apply((PTransform)ParDo.of((DoFn)new WriteFn(this)));
            return PDone.in((Pipeline)input.getPipeline());
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            builder.add(DisplayData.item((String)"uri", (String)this.uri()));
            builder.add(DisplayData.item((String)"keepAlive", (Boolean)this.keepAlive()));
            builder.add(DisplayData.item((String)"maxConnectionIdleTime", (Integer)this.maxConnectionIdleTime()));
            builder.add(DisplayData.item((String)"database", (String)this.database()));
            builder.add(DisplayData.item((String)"collection", (String)this.collection()));
            builder.add(DisplayData.item((String)"batchSize", (Long)this.batchSize()));
        }

        static class WriteFn
        extends DoFn<Document, Void> {
            private final Write spec;
            private transient MongoClient client;
            private List<Document> batch;

            public WriteFn(Write spec) {
                this.spec = spec;
            }

            @DoFn.Setup
            public void createMongoClient() throws Exception {
                MongoClientOptions.Builder builder = new MongoClientOptions.Builder();
                builder.socketKeepAlive(this.spec.keepAlive());
                builder.maxConnectionIdleTime(this.spec.maxConnectionIdleTime());
                this.client = new MongoClient(new MongoClientURI(this.spec.uri(), builder));
            }

            @DoFn.StartBundle
            public void startBundle() throws Exception {
                this.batch = new ArrayList<Document>();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext ctx) throws Exception {
                this.batch.add(new Document((Map)ctx.element()));
                if ((long)this.batch.size() >= this.spec.batchSize()) {
                    this.flush();
                }
            }

            @DoFn.FinishBundle
            public void finishBundle() throws Exception {
                this.flush();
            }

            private void flush() {
                if (this.batch.isEmpty()) {
                    return;
                }
                MongoDatabase mongoDatabase = this.client.getDatabase(this.spec.database());
                MongoCollection mongoCollection = mongoDatabase.getCollection(this.spec.collection());
                mongoCollection.insertMany(this.batch);
                this.batch.clear();
            }

            @DoFn.Teardown
            public void closeMongoClient() throws Exception {
                this.client.close();
                this.client = null;
            }
        }

        static abstract class Builder {
            Builder() {
            }

            abstract Builder setUri(String var1);

            abstract Builder setKeepAlive(boolean var1);

            abstract Builder setMaxConnectionIdleTime(int var1);

            abstract Builder setDatabase(String var1);

            abstract Builder setCollection(String var1);

            abstract Builder setBatchSize(long var1);

            abstract Write build();
        }
    }

    private static class BoundedMongoDbReader
    extends BoundedSource.BoundedReader<Document> {
        private final BoundedMongoDbSource source;
        private MongoClient client;
        private MongoCursor<Document> cursor;
        private Document current;

        public BoundedMongoDbReader(BoundedMongoDbSource source) {
            this.source = source;
        }

        public boolean start() {
            Read spec = this.source.spec;
            MongoClientOptions.Builder optionsBuilder = new MongoClientOptions.Builder();
            optionsBuilder.maxConnectionIdleTime(spec.maxConnectionIdleTime());
            optionsBuilder.socketKeepAlive(spec.keepAlive());
            this.client = new MongoClient(new MongoClientURI(spec.uri(), optionsBuilder));
            MongoDatabase mongoDatabase = this.client.getDatabase(spec.database());
            MongoCollection mongoCollection = mongoDatabase.getCollection(spec.collection());
            if (spec.filter() == null) {
                this.cursor = mongoCollection.find().iterator();
            } else {
                Document bson = Document.parse((String)spec.filter());
                this.cursor = mongoCollection.find((Bson)bson).iterator();
            }
            return this.advance();
        }

        public boolean advance() {
            if (this.cursor.hasNext()) {
                this.current = (Document)this.cursor.next();
                return true;
            }
            return false;
        }

        public BoundedMongoDbSource getCurrentSource() {
            return this.source;
        }

        public Document getCurrent() {
            return this.current;
        }

        public void close() {
            try {
                if (this.cursor != null) {
                    this.cursor.close();
                }
            }
            catch (Exception e) {
                LOG.warn("Error closing MongoDB cursor", (Throwable)e);
            }
            try {
                this.client.close();
            }
            catch (Exception e) {
                LOG.warn("Error closing MongoDB client", (Throwable)e);
            }
        }
    }

    @VisibleForTesting
    static class BoundedMongoDbSource
    extends BoundedSource<Document> {
        private Read spec;

        private BoundedMongoDbSource(Read spec) {
            this.spec = spec;
        }

        public Coder<Document> getOutputCoder() {
            return SerializableCoder.of(Document.class);
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            this.spec.populateDisplayData(builder);
        }

        public BoundedSource.BoundedReader<Document> createReader(PipelineOptions options) {
            return new BoundedMongoDbReader(this);
        }

        public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) {
            try (MongoClient mongoClient = new MongoClient(new MongoClientURI(this.spec.uri()));){
                long l = this.getEstimatedSizeBytes(mongoClient, this.spec.database(), this.spec.collection());
                return l;
            }
        }

        private long getEstimatedSizeBytes(MongoClient mongoClient, String database, String collection) {
            MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
            BasicDBObject stat = new BasicDBObject();
            stat.append("collStats", (Object)collection);
            Document stats = mongoDatabase.runCommand((Bson)stat);
            return ((Number)stats.get((Object)"size", Number.class)).longValue();
        }

        public List<BoundedSource<Document>> split(long desiredBundleSizeBytes, PipelineOptions options) {
            try (MongoClient mongoClient = new MongoClient(new MongoClientURI(this.spec.uri()));){
                MongoDatabase mongoDatabase = mongoClient.getDatabase(this.spec.database());
                if (this.spec.numSplits() > 0) {
                    long estimatedSizeBytes = this.getEstimatedSizeBytes(mongoClient, this.spec.database(), this.spec.collection());
                    desiredBundleSizeBytes = estimatedSizeBytes / (long)this.spec.numSplits();
                }
                if (desiredBundleSizeBytes < 0x100000L) {
                    desiredBundleSizeBytes = 0x100000L;
                }
                BasicDBObject splitVectorCommand = new BasicDBObject();
                splitVectorCommand.append("splitVector", (Object)(this.spec.database() + "." + this.spec.collection()));
                splitVectorCommand.append("keyPattern", (Object)new BasicDBObject().append("_id", (Object)1));
                splitVectorCommand.append("force", (Object)false);
                LOG.debug("Splitting in chunk of {} MB", (Object)(desiredBundleSizeBytes / 1024L / 1024L));
                splitVectorCommand.append("maxChunkSize", (Object)(desiredBundleSizeBytes / 1024L / 1024L));
                Document splitVectorCommandResult = mongoDatabase.runCommand((Bson)splitVectorCommand);
                List splitKeys = (List)splitVectorCommandResult.get((Object)"splitKeys");
                ArrayList<BoundedSource<Document>> sources = new ArrayList<BoundedSource<Document>>();
                if (splitKeys.size() < 1) {
                    LOG.debug("Split keys is low, using an unique source");
                    sources.add(this);
                    ArrayList<BoundedSource<Document>> arrayList = sources;
                    return arrayList;
                }
                LOG.debug("Number of splits is {}", (Object)splitKeys.size());
                for (String shardFilter : BoundedMongoDbSource.splitKeysToFilters(splitKeys, this.spec.filter())) {
                    sources.add(new BoundedMongoDbSource(this.spec.withFilter(shardFilter)));
                }
                ArrayList<BoundedSource<Document>> arrayList = sources;
                return arrayList;
            }
        }

        @VisibleForTesting
        static List<String> splitKeysToFilters(List<Document> splitKeys, String additionalFilter) {
            ArrayList<String> filters = new ArrayList<String>();
            String lowestBound = null;
            for (int i = 0; i < splitKeys.size(); ++i) {
                String rangeFilter;
                String splitKey = splitKeys.get(i).get((Object)"_id").toString();
                if (i == 0) {
                    rangeFilter = String.format("{ $and: [ {\"_id\":{$lte:ObjectId(\"%s\")}}", splitKey);
                    filters.add(BoundedMongoDbSource.formatFilter(rangeFilter, additionalFilter));
                } else if (i == splitKeys.size() - 1) {
                    rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\"),$lte:ObjectId(\"%s\")}}", lowestBound, splitKey);
                    filters.add(BoundedMongoDbSource.formatFilter(rangeFilter, additionalFilter));
                    rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\")}}", splitKey);
                    filters.add(BoundedMongoDbSource.formatFilter(rangeFilter, additionalFilter));
                } else {
                    rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\"),$lte:ObjectId(\"%s\")}}", lowestBound, splitKey);
                    filters.add(BoundedMongoDbSource.formatFilter(rangeFilter, additionalFilter));
                }
                lowestBound = splitKey;
            }
            return filters;
        }

        private static String formatFilter(String filter, @Nullable String additionalFilter) {
            if (additionalFilter != null && !additionalFilter.isEmpty()) {
                return String.format("%s,%s ]}", filter, additionalFilter);
            }
            return String.format("%s ]}", filter);
        }
    }

    @AutoValue
    public static abstract class Read
    extends PTransform<PBegin, PCollection<Document>> {
        @Nullable
        abstract String uri();

        abstract boolean keepAlive();

        abstract int maxConnectionIdleTime();

        @Nullable
        abstract String database();

        @Nullable
        abstract String collection();

        @Nullable
        abstract String filter();

        abstract int numSplits();

        abstract Builder builder();

        public Read withUri(String uri) {
            Preconditions.checkArgument(uri != null, "MongoDbIO.read().withUri(uri) called with null uri");
            return this.builder().setUri(uri).build();
        }

        public Read withKeepAlive(boolean keepAlive) {
            return this.builder().setKeepAlive(keepAlive).build();
        }

        public Read withMaxConnectionIdleTime(int maxConnectionIdleTime) {
            return this.builder().setMaxConnectionIdleTime(maxConnectionIdleTime).build();
        }

        public Read withDatabase(String database) {
            Preconditions.checkArgument(database != null, "database can not be null");
            return this.builder().setDatabase(database).build();
        }

        public Read withCollection(String collection) {
            Preconditions.checkArgument(collection != null, "collection can not be null");
            return this.builder().setCollection(collection).build();
        }

        public Read withFilter(String filter) {
            Preconditions.checkArgument(filter != null, "filter can not be null");
            return this.builder().setFilter(filter).build();
        }

        public Read withNumSplits(int numSplits) {
            Preconditions.checkArgument(numSplits >= 0, "invalid num_splits: must be >= 0, but was %d", numSplits);
            return this.builder().setNumSplits(numSplits).build();
        }

        public PCollection<Document> expand(PBegin input) {
            Preconditions.checkArgument(this.uri() != null, "withUri() is required");
            Preconditions.checkArgument(this.database() != null, "withDatabase() is required");
            Preconditions.checkArgument(this.collection() != null, "withCollection() is required");
            return (PCollection)input.apply((PTransform)org.apache.beam.sdk.io.Read.from((BoundedSource)new BoundedMongoDbSource(this)));
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item((String)"uri", (String)this.uri()));
            builder.add(DisplayData.item((String)"keepAlive", (Boolean)this.keepAlive()));
            builder.add(DisplayData.item((String)"maxConnectionIdleTime", (Integer)this.maxConnectionIdleTime()));
            builder.add(DisplayData.item((String)"database", (String)this.database()));
            builder.add(DisplayData.item((String)"collection", (String)this.collection()));
            builder.addIfNotNull(DisplayData.item((String)"filter", (String)this.filter()));
            builder.add(DisplayData.item((String)"numSplit", (Integer)this.numSplits()));
        }

        static abstract class Builder {
            Builder() {
            }

            abstract Builder setUri(String var1);

            abstract Builder setKeepAlive(boolean var1);

            abstract Builder setMaxConnectionIdleTime(int var1);

            abstract Builder setDatabase(String var1);

            abstract Builder setCollection(String var1);

            abstract Builder setFilter(String var1);

            abstract Builder setNumSplits(int var1);

            abstract Read build();
        }
    }
}

