/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.mongodb;

import com.google.common.base.Preconditions;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.mongodb.AutoValue_MongoDbIO_Read;
import org.apache.beam.sdk.io.mongodb.AutoValue_MongoDbIO_Write;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbIO {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDbIO.class);

    public static Read read() {
        return new AutoValue_MongoDbIO_Read.Builder().setNumSplits(0).build();
    }

    public static Write write() {
        return new AutoValue_MongoDbIO_Write.Builder().setBatchSize(1024L).build();
    }

    private MongoDbIO() {
    }

    public static abstract class Write
    extends PTransform<PCollection<Document>, PDone> {
        @Nullable
        abstract String uri();

        @Nullable
        abstract String database();

        @Nullable
        abstract String collection();

        abstract long batchSize();

        abstract Builder toBuilder();

        public Write withUri(String uri) {
            return this.toBuilder().setUri(uri).build();
        }

        public Write withDatabase(String database) {
            return this.toBuilder().setDatabase(database).build();
        }

        public Write withCollection(String collection) {
            return this.toBuilder().setCollection(collection).build();
        }

        public Write withBatchSize(long batchSize) {
            return this.toBuilder().setBatchSize(batchSize).build();
        }

        public PDone expand(PCollection<Document> input) {
            input.apply((PTransform)ParDo.of((DoFn)new WriteFn(this)));
            return PDone.in((Pipeline)input.getPipeline());
        }

        public void validate(PCollection<Document> input) {
            Preconditions.checkNotNull((Object)this.uri(), (Object)"uri");
            Preconditions.checkNotNull((Object)this.database(), (Object)"database");
            Preconditions.checkNotNull((Object)this.collection(), (Object)"collection");
            Preconditions.checkNotNull((Object)this.batchSize(), (Object)"batchSize");
        }

        private static class WriteFn
        extends DoFn<Document, Void> {
            private final Write spec;
            private transient MongoClient client;
            private List<Document> batch;

            public WriteFn(Write spec) {
                this.spec = spec;
            }

            @DoFn.Setup
            public void createMongoClient() throws Exception {
                this.client = new MongoClient(new MongoClientURI(this.spec.uri()));
            }

            @DoFn.StartBundle
            public void startBundle(DoFn.Context ctx) throws Exception {
                this.batch = new ArrayList<Document>();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext ctx) throws Exception {
                this.batch.add(new Document((Map)ctx.element()));
                if ((long)this.batch.size() >= this.spec.batchSize()) {
                    this.finishBundle((DoFn.Context)ctx);
                }
            }

            @DoFn.FinishBundle
            public void finishBundle(DoFn.Context ctx) throws Exception {
                MongoDatabase mongoDatabase = this.client.getDatabase(this.spec.database());
                MongoCollection mongoCollection = mongoDatabase.getCollection(this.spec.collection());
                mongoCollection.insertMany(this.batch);
                this.batch.clear();
            }

            @DoFn.Teardown
            public void closeMongoClient() throws Exception {
                this.client.close();
                this.client = null;
            }
        }

        static abstract class Builder {
            Builder() {
            }

            abstract Builder setUri(String var1);

            abstract Builder setDatabase(String var1);

            abstract Builder setCollection(String var1);

            abstract Builder setBatchSize(long var1);

            abstract Write build();
        }
    }

    private static class BoundedMongoDbReader
    extends BoundedSource.BoundedReader<Document> {
        private final BoundedMongoDbSource source;
        private MongoClient client;
        private MongoCursor<Document> cursor;
        private Document current;

        public BoundedMongoDbReader(BoundedMongoDbSource source) {
            this.source = source;
        }

        public boolean start() {
            Read spec = this.source.spec;
            this.client = new MongoClient(new MongoClientURI(spec.uri()));
            MongoDatabase mongoDatabase = this.client.getDatabase(spec.database());
            MongoCollection mongoCollection = mongoDatabase.getCollection(spec.collection());
            if (spec.filter() == null) {
                this.cursor = mongoCollection.find().iterator();
            } else {
                Document bson = Document.parse((String)spec.filter());
                this.cursor = mongoCollection.find((Bson)bson).iterator();
            }
            return this.advance();
        }

        public boolean advance() {
            if (this.cursor.hasNext()) {
                this.current = (Document)this.cursor.next();
                return true;
            }
            return false;
        }

        public BoundedMongoDbSource getCurrentSource() {
            return this.source;
        }

        public Document getCurrent() {
            return this.current;
        }

        public void close() {
            try {
                if (this.cursor != null) {
                    this.cursor.close();
                }
            }
            catch (Exception e) {
                LOG.warn("Error closing MongoDB cursor", (Throwable)e);
            }
            try {
                this.client.close();
            }
            catch (Exception e) {
                LOG.warn("Error closing MongoDB client", (Throwable)e);
            }
        }
    }

    private static class BoundedMongoDbSource
    extends BoundedSource<Document> {
        private Read spec;

        private BoundedMongoDbSource(Read spec) {
            this.spec = spec;
        }

        public Coder<Document> getDefaultOutputCoder() {
            return SerializableCoder.of(Document.class);
        }

        public void validate() {
            this.spec.validate((PBegin)null);
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            this.spec.populateDisplayData(builder);
        }

        public BoundedSource.BoundedReader<Document> createReader(PipelineOptions options) {
            return new BoundedMongoDbReader(this);
        }

        public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) {
            MongoClient mongoClient = new MongoClient(new MongoClientURI(this.spec.uri()));
            MongoDatabase mongoDatabase = mongoClient.getDatabase(this.spec.database());
            BasicDBObject stat = new BasicDBObject();
            stat.append("collStats", (Object)this.spec.collection());
            Document stats = mongoDatabase.runCommand((Bson)stat);
            return ((Number)stats.get((Object)"size", Number.class)).longValue();
        }

        public List<BoundedSource<Document>> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) {
            MongoClient mongoClient = new MongoClient(new MongoClientURI(this.spec.uri()));
            MongoDatabase mongoDatabase = mongoClient.getDatabase(this.spec.database());
            if (this.spec.numSplits() > 0) {
                long estimatedSizeBytes = this.getEstimatedSizeBytes(options);
                desiredBundleSizeBytes = estimatedSizeBytes / (long)this.spec.numSplits();
            }
            if (desiredBundleSizeBytes < 0x100000L) {
                desiredBundleSizeBytes = 0x100000L;
            }
            BasicDBObject splitVectorCommand = new BasicDBObject();
            splitVectorCommand.append("splitVector", (Object)(this.spec.database() + "." + this.spec.collection()));
            splitVectorCommand.append("keyPattern", (Object)new BasicDBObject().append("_id", (Object)1));
            splitVectorCommand.append("force", (Object)false);
            LOG.debug("Splitting in chunk of {} MB", (Object)(desiredBundleSizeBytes / 1024L / 1024L));
            splitVectorCommand.append("maxChunkSize", (Object)(desiredBundleSizeBytes / 1024L / 1024L));
            Document splitVectorCommandResult = mongoDatabase.runCommand((Bson)splitVectorCommand);
            List splitKeys = (List)splitVectorCommandResult.get((Object)"splitKeys");
            ArrayList<BoundedSource<Document>> sources = new ArrayList<BoundedSource<Document>>();
            if (splitKeys.size() < 1) {
                LOG.debug("Split keys is low, using an unique source");
                sources.add(this);
                return sources;
            }
            LOG.debug("Number of splits is {}", (Object)splitKeys.size());
            for (String shardFilter : BoundedMongoDbSource.splitKeysToFilters(splitKeys, this.spec.filter())) {
                sources.add(new BoundedMongoDbSource(this.spec.withFilter(shardFilter)));
            }
            return sources;
        }

        private static List<String> splitKeysToFilters(List<Document> splitKeys, String additionalFilter) {
            ArrayList<String> filters = new ArrayList<String>();
            String lowestBound = null;
            for (int i = 0; i < splitKeys.size(); ++i) {
                String splitKey = splitKeys.get(i).get((Object)"_id").toString();
                String rangeFilter = i == 0 ? String.format("{ $and: [ {\"_id\":{$lte:ObjectId(\"%s\")}}", splitKey) : (i == splitKeys.size() - 1 ? String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\")}}", splitKey) : String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\"),$lte:ObjectId(\"%s\")}}", lowestBound, splitKey));
                rangeFilter = additionalFilter != null && !additionalFilter.isEmpty() ? String.format("%s,%s ]}", rangeFilter, additionalFilter) : String.format("%s ]}", rangeFilter);
                filters.add(rangeFilter);
                lowestBound = splitKey;
            }
            return filters;
        }
    }

    public static abstract class Read
    extends PTransform<PBegin, PCollection<Document>> {
        @Nullable
        abstract String uri();

        @Nullable
        abstract String database();

        @Nullable
        abstract String collection();

        @Nullable
        abstract String filter();

        abstract int numSplits();

        abstract Builder toBuilder();

        public Read withUri(String uri) {
            Preconditions.checkNotNull((Object)uri);
            return this.toBuilder().setUri(uri).build();
        }

        public Read withDatabase(String database) {
            Preconditions.checkNotNull((Object)database);
            return this.toBuilder().setDatabase(database).build();
        }

        public Read withCollection(String collection) {
            Preconditions.checkNotNull((Object)collection);
            return this.toBuilder().setCollection(collection).build();
        }

        public Read withFilter(String filter) {
            Preconditions.checkNotNull((Object)filter);
            return this.toBuilder().setFilter(filter).build();
        }

        public Read withNumSplits(int numSplits) {
            Preconditions.checkArgument((numSplits >= 0 ? 1 : 0) != 0);
            return this.toBuilder().setNumSplits(numSplits).build();
        }

        public PCollection<Document> expand(PBegin input) {
            return (PCollection)input.apply((PTransform)org.apache.beam.sdk.io.Read.from((BoundedSource)new BoundedMongoDbSource(this)));
        }

        public void validate(PBegin input) {
            Preconditions.checkNotNull((Object)this.uri(), (Object)"uri");
            Preconditions.checkNotNull((Object)this.database(), (Object)"database");
            Preconditions.checkNotNull((Object)this.collection(), (Object)"collection");
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item((String)"uri", (String)this.uri()));
            builder.add(DisplayData.item((String)"database", (String)this.database()));
            builder.add(DisplayData.item((String)"collection", (String)this.collection()));
            builder.addIfNotNull(DisplayData.item((String)"filter", (String)this.filter()));
            builder.add(DisplayData.item((String)"numSplit", (Integer)this.numSplits()));
        }

        static abstract class Builder {
            Builder() {
            }

            abstract Builder setUri(String var1);

            abstract Builder setDatabase(String var1);

            abstract Builder setCollection(String var1);

            abstract Builder setFilter(String var1);

            abstract Builder setNumSplits(int var1);

            abstract Read build();
        }
    }
}

