/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mongodb.source.assigners.splitters;

import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Aggregates;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.mongodb.source.assigners.splitters.SingleSplitStrategy;
import org.apache.flink.cdc.connectors.mongodb.source.assigners.splitters.SplitContext;
import org.apache.flink.cdc.connectors.mongodb.source.assigners.splitters.SplitStrategy;
import org.apache.flink.cdc.connectors.mongodb.source.dialect.MongoDBDialect;
import org.apache.flink.cdc.connectors.mongodb.source.utils.ChunkUtils;
import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils;
import org.apache.flink.table.types.logical.RowType;
import org.bson.BsonDocument;
import org.bson.BsonValue;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class SampleBucketSplitStrategy
implements SplitStrategy {
    private static final Logger LOG = LoggerFactory.getLogger(SampleBucketSplitStrategy.class);
    public static final SampleBucketSplitStrategy INSTANCE = new SampleBucketSplitStrategy();
    private static final int DEFAULT_SAMPLING_THRESHOLD = 102400;

    private SampleBucketSplitStrategy() {
    }

    @Override
    public Collection<SnapshotSplit> split(SplitContext splitContext) {
        long chunkSizeInBytes = splitContext.getChunkSizeMB() * 1024 * 1024;
        long sizeInBytes = splitContext.getSizeInBytes();
        long count = splitContext.getDocumentCount();
        if (sizeInBytes < chunkSizeInBytes) {
            return SingleSplitStrategy.INSTANCE.split(splitContext);
        }
        int numChunks = (int)(sizeInBytes / chunkSizeInBytes) + 1;
        int numberOfSamples = count < 102400L ? (int)count : Math.min(numChunks * splitContext.getSamplesPerChunk(), (int)count);
        TableId collectionId = splitContext.getCollectionId();
        MongoCollection<BsonDocument> collection = MongoUtils.collectionFor(splitContext.getMongoClient(), collectionId, BsonDocument.class);
        ArrayList<Bson> pipeline = new ArrayList<Bson>();
        if ((long)numberOfSamples != count) {
            pipeline.add(Aggregates.sample(numberOfSamples));
        }
        pipeline.add(Aggregates.bucketAuto("$_id", numChunks));
        LOG.info("Collection {} going to sample {} records into {} chunks", new Object[]{collectionId, numberOfSamples, numChunks});
        List chunks = collection.aggregate(pipeline).allowDiskUse(true).into(new ArrayList());
        LOG.info("Collection {} got {} chunks by auto bucket and sample", (Object)collectionId, (Object)chunks.size());
        RowType rowType = this.shardKeysToRowType(Collections.singleton("_id"));
        ArrayList<SnapshotSplit> snapshotSplits = new ArrayList<SnapshotSplit>(chunks.size() + 2);
        HashMap<TableId, TableChanges.TableChange> schema = new HashMap<TableId, TableChanges.TableChange>();
        schema.put(collectionId, MongoDBDialect.collectionSchema(collectionId));
        SnapshotSplit firstSplit = new SnapshotSplit(collectionId, this.splitId(collectionId, 0), rowType, ChunkUtils.minLowerBoundOfId(), ChunkUtils.boundOfId(this.lowerBoundOfBucket((BsonDocument)chunks.get(0))), null, schema);
        snapshotSplits.add(firstSplit);
        for (int i = 0; i < chunks.size(); ++i) {
            BsonDocument bucket = (BsonDocument)chunks.get(i);
            snapshotSplits.add(new SnapshotSplit(collectionId, this.splitId(collectionId, i + 1), rowType, ChunkUtils.boundOfId(this.lowerBoundOfBucket(bucket)), ChunkUtils.boundOfId(this.upperBoundOfBucket(bucket)), null, schema));
        }
        SnapshotSplit lastSplit = new SnapshotSplit(collectionId, this.splitId(collectionId, chunks.size() + 1), rowType, ChunkUtils.boundOfId(this.upperBoundOfBucket((BsonDocument)chunks.get(chunks.size() - 1))), ChunkUtils.maxUpperBoundOfId(), null, schema);
        snapshotSplits.add(lastSplit);
        return snapshotSplits;
    }

    private BsonDocument bucketBounds(BsonDocument bucket) {
        return bucket.getDocument("_id");
    }

    private BsonValue lowerBoundOfBucket(BsonDocument bucket) {
        return this.bucketBounds(bucket).get("min");
    }

    private BsonValue upperBoundOfBucket(BsonDocument bucket) {
        return this.bucketBounds(bucket).get("max");
    }
}

