/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb;

import com.mongodb.MongoChangeStreamException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.mongodb.CollectionId;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbIncrementalSnapshotContext;
import io.debezium.connector.mongodb.MongoDbOffsetContext;
import io.debezium.connector.mongodb.MongoDbPartition;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.ReplicaSetOffsetContext;
import io.debezium.connector.mongodb.ReplicaSetPartition;
import io.debezium.connector.mongodb.ReplicaSets;
import io.debezium.connector.mongodb.SourceInfo;
import io.debezium.connector.mongodb.connection.ConnectionContext;
import io.debezium.connector.mongodb.connection.MongoDbConnection;
import io.debezium.connector.mongodb.connection.ReplicaSet;
import io.debezium.connector.mongodb.recordemitter.MongoDbSnapshotRecordEmitter;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbSnapshotChangeEventSource
extends AbstractSnapshotChangeEventSource<MongoDbPartition, MongoDbOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSnapshotChangeEventSource.class);
    private final MongoDbConnectorConfig connectorConfig;
    private final MongoDbTaskContext taskContext;
    private final MongoDbConnection.ChangeEventSourceConnectionFactory connections;
    private final ConnectionContext connectionContext;
    private final ReplicaSets replicaSets;
    private final EventDispatcher<MongoDbPartition, CollectionId> dispatcher;
    private final Clock clock;
    private final SnapshotProgressListener<MongoDbPartition> snapshotProgressListener;
    private final ErrorHandler errorHandler;
    private AtomicBoolean aborted = new AtomicBoolean(false);

    public MongoDbSnapshotChangeEventSource(MongoDbConnectorConfig connectorConfig, MongoDbTaskContext taskContext, MongoDbConnection.ChangeEventSourceConnectionFactory connections, ReplicaSets replicaSets, EventDispatcher<MongoDbPartition, CollectionId> dispatcher, Clock clock, SnapshotProgressListener<MongoDbPartition> snapshotProgressListener, ErrorHandler errorHandler) {
        super((CommonConnectorConfig)connectorConfig, snapshotProgressListener);
        this.connectorConfig = connectorConfig;
        this.taskContext = taskContext;
        this.connections = connections;
        this.connectionContext = taskContext.getConnectionContext();
        this.replicaSets = replicaSets;
        this.dispatcher = dispatcher;
        this.clock = clock;
        this.snapshotProgressListener = snapshotProgressListener;
        this.errorHandler = errorHandler;
    }

    protected SnapshotResult<MongoDbOffsetContext> doExecute(ChangeEventSource.ChangeEventSourceContext context, MongoDbOffsetContext prevOffsetCtx, AbstractSnapshotChangeEventSource.SnapshotContext<MongoDbPartition, MongoDbOffsetContext> snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) {
        MongoDbSnapshottingTask mongoDbSnapshottingTask = (MongoDbSnapshottingTask)snapshottingTask;
        MongoDbSnapshotContext mongoDbSnapshotContext = (MongoDbSnapshotContext)snapshotContext;
        LOGGER.info("Snapshot step 1 - Preparing");
        if (prevOffsetCtx != null && prevOffsetCtx.isSnapshotRunning()) {
            LOGGER.info("Previous snapshot was cancelled before completion; a new snapshot will be taken.");
        }
        LOGGER.info("Snapshot step 2 - Determining snapshot offsets");
        List<ReplicaSet> replicaSetsToSnapshot = mongoDbSnapshottingTask.getReplicaSetsToSnapshot();
        this.initSnapshotStartOffsets(mongoDbSnapshotContext);
        int threads = replicaSetsToSnapshot.size();
        LOGGER.info("Starting {} thread(s) to snapshot replica sets: {}", (Object)threads, replicaSetsToSnapshot);
        ExecutorService executor = Threads.newFixedThreadPool(MongoDbConnector.class, (String)this.taskContext.serverName(), (String)"replicator-snapshot", (int)threads);
        CountDownLatch latch = new CountDownLatch(threads);
        LOGGER.info("Snapshot step 3 - Snapshotting data");
        replicaSetsToSnapshot.forEach(replicaSet -> executor.submit(() -> {
            try {
                this.taskContext.configureLoggingContext(replicaSet.replicaSetName());
                this.snapshotReplicaSet(context, mongoDbSnapshotContext, (ReplicaSet)replicaSet);
            }
            catch (Throwable t) {
                LOGGER.error("Snapshot for replica set {} failed", (Object)replicaSet.replicaSetName(), (Object)t);
                this.errorHandler.setProducerThrowable(t);
            }
            finally {
                latch.countDown();
            }
        }));
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.aborted.set(true);
        }
        executor.shutdown();
        if (this.aborted.get()) {
            return SnapshotResult.aborted();
        }
        return SnapshotResult.completed((OffsetContext)((MongoDbOffsetContext)snapshotContext.offset));
    }

    protected AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(MongoDbPartition partition, MongoDbOffsetContext offsetContext) {
        if (this.connectorConfig.getSnapshotMode().equals((Object)MongoDbConnectorConfig.SnapshotMode.NEVER)) {
            LOGGER.info("According to the connector configuration, no snapshot will occur.");
            return new MongoDbSnapshottingTask(Collections.emptyList());
        }
        if (offsetContext == null) {
            LOGGER.info("No previous offset has been found");
            return new MongoDbSnapshottingTask(this.replicaSets.all());
        }
        List<ReplicaSet> replicaSetsToSnapshot = this.replicaSets.all().stream().filter(replicaSet -> this.isSnapshotExpected(partition, (ReplicaSet)replicaSet, offsetContext)).collect(Collectors.toList());
        return new MongoDbSnapshottingTask(replicaSetsToSnapshot);
    }

    protected AbstractSnapshotChangeEventSource.SnapshotContext<MongoDbPartition, MongoDbOffsetContext> prepare(MongoDbPartition partition) {
        return new MongoDbSnapshotContext(partition);
    }

    private void snapshotReplicaSet(ChangeEventSource.ChangeEventSourceContext sourceCtx, MongoDbSnapshotContext snapshotCtx, ReplicaSet replicaSet) throws InterruptedException {
        try (MongoDbConnection mongo = this.connections.get(replicaSet, (MongoDbPartition)snapshotCtx.partition);){
            this.createDataEvents(sourceCtx, snapshotCtx, replicaSet, mongo);
        }
    }

    private boolean isSnapshotExpected(MongoDbPartition partition, ReplicaSet replicaSet, MongoDbOffsetContext offsetContext) {
        ReplicaSetOffsetContext rsOffsetContext = offsetContext.getReplicaSetOffsetContext(replicaSet);
        if (!rsOffsetContext.hasOffset()) {
            LOGGER.info("No existing offset found for replica set '{}', starting snapshot", (Object)rsOffsetContext.getReplicaSetName());
            return true;
        }
        if (rsOffsetContext.isSnapshotOngoing()) {
            LOGGER.info("The previous snapshot was incomplete for '{}', so restarting the snapshot", (Object)rsOffsetContext.getReplicaSetName());
            return true;
        }
        LOGGER.info("Found existing offset for replica set '{}' at {}", (Object)rsOffsetContext.getReplicaSetName(), rsOffsetContext.getOffset());
        BsonDocument token = rsOffsetContext.lastResumeTokenDoc();
        return this.isValidResumeToken(partition, replicaSet, token);
    }

    private boolean isValidResumeToken(MongoDbPartition partition, ReplicaSet replicaSet, BsonDocument token) {
        MongoDbConnection mongo = this.connections.get(replicaSet, partition);
        try {
            boolean bl = (Boolean)mongo.execute("Checking change stream", client -> {
                Boolean bl;
                block8: {
                    ChangeStreamIterable stream = client.watch(BsonDocument.class);
                    stream.resumeAfter(token);
                    MongoChangeStreamCursor ignored = stream.cursor();
                    try {
                        LOGGER.info("Valid resume token present for replica set '{}, so no snapshot will be performed'", (Object)replicaSet.replicaSetName());
                        bl = false;
                        if (ignored == null) break block8;
                    }
                    catch (Throwable throwable) {
                        try {
                            if (ignored != null) {
                                try {
                                    ignored.close();
                                }
                                catch (Throwable throwable2) {
                                    throwable.addSuppressed(throwable2);
                                }
                            }
                            throw throwable;
                        }
                        catch (MongoChangeStreamException e) {
                            LOGGER.info("Invalid resume token present for replica set '{}, snapshot will be performed'", (Object)replicaSet.replicaSetName());
                            return true;
                        }
                    }
                    ignored.close();
                }
                return bl;
            });
            if (mongo != null) {
                mongo.close();
            }
            return bl;
        }
        catch (Throwable throwable) {
            try {
                if (mongo != null) {
                    try {
                        mongo.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                }
                throw throwable;
            }
            catch (InterruptedException e) {
                throw new DebeziumException("Interrupted while creating snapshotting task", (Throwable)e);
            }
        }
    }

    private void initSnapshotStartOffsets(MongoDbSnapshotContext snapshotCtx) {
        LOGGER.info("Initializing empty Offset context");
        snapshotCtx.offset = new MongoDbOffsetContext(new SourceInfo(this.connectorConfig), new TransactionContext(), new MongoDbIncrementalSnapshotContext<CollectionId>(false));
    }

    private void initReplicaSetSnapshotStartOffsets(MongoDbSnapshotContext snapshotCtx, ReplicaSet replicaSet, MongoDbConnection mongo) throws InterruptedException {
        LOGGER.info("Determine Snapshot start offset for replica-set {}", (Object)replicaSet.replicaSetName());
        ReplicaSetOffsetContext rsOffsetCtx = ((MongoDbOffsetContext)snapshotCtx.offset).getReplicaSetOffsetContext(replicaSet);
        mongo.execute("Setting resume token", (BlockingConsumer<MongoClient>)((BlockingConsumer)client -> {
            ChangeStreamIterable stream = client.watch(BsonDocument.class);
            try (MongoChangeStreamCursor cursor = stream.cursor();){
                rsOffsetCtx.initEvent((MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>>)cursor);
            }
        }));
    }

    private void createDataEvents(ChangeEventSource.ChangeEventSourceContext sourceCtx, MongoDbSnapshotContext snapshotCtx, ReplicaSet replicaSet, MongoDbConnection mongo) throws InterruptedException {
        this.initReplicaSetSnapshotStartOffsets(snapshotCtx, replicaSet, mongo);
        EventDispatcher.SnapshotReceiver snapshotReceiver = this.dispatcher.getSnapshotChangeEventReceiver();
        ((MongoDbOffsetContext)snapshotCtx.offset).preSnapshotStart();
        this.createDataEventsForReplicaSet(sourceCtx, snapshotCtx, (EventDispatcher.SnapshotReceiver<MongoDbPartition>)snapshotReceiver, replicaSet, mongo);
        ((MongoDbOffsetContext)snapshotCtx.offset).preSnapshotCompletion();
        snapshotReceiver.completeSnapshot();
        ((MongoDbOffsetContext)snapshotCtx.offset).postSnapshotCompletion();
    }

    private void createDataEventsForReplicaSet(ChangeEventSource.ChangeEventSourceContext sourceContext, MongoDbSnapshotContext snapshotContext, EventDispatcher.SnapshotReceiver<MongoDbPartition> snapshotReceiver, ReplicaSet replicaSet, MongoDbConnection mongo) throws InterruptedException {
        String rsName = replicaSet.replicaSetName();
        MongoDbOffsetContext offsetContext = (MongoDbOffsetContext)snapshotContext.offset;
        ReplicaSetOffsetContext rsOffsetContext = offsetContext.getReplicaSetOffsetContext(replicaSet);
        snapshotContext.lastCollection = false;
        offsetContext.startReplicaSetSnapshot(replicaSet.replicaSetName());
        LOGGER.info("Beginning snapshot of '{}' at {}", (Object)rsName, rsOffsetContext.getOffset());
        List collections = this.determineDataCollectionsToBeSnapshotted(mongo.collections()).collect(Collectors.toList());
        this.snapshotProgressListener.monitoredDataCollectionsDetermined((Partition)((MongoDbPartition)snapshotContext.partition), collections);
        if (this.connectorConfig.getSnapshotMaxThreads() > 1) {
            int numThreads = Math.min(collections.size(), this.connectorConfig.getSnapshotMaxThreads());
            ConcurrentLinkedQueue collectionsToCopy = new ConcurrentLinkedQueue(collections);
            String snapshotThreadName = "snapshot-" + (replicaSet.hasReplicaSetName() ? replicaSet.replicaSetName() : "main");
            ExecutorService snapshotThreads = Threads.newFixedThreadPool(MongoDbConnector.class, (String)this.taskContext.serverName(), (String)snapshotThreadName, (int)this.connectorConfig.getSnapshotMaxThreads());
            CountDownLatch latch = new CountDownLatch(numThreads);
            AtomicBoolean aborted = new AtomicBoolean(false);
            AtomicInteger threadCounter = new AtomicInteger(0);
            LOGGER.info("Preparing to use {} thread(s) to snapshot {} collection(s): {}", new Object[]{numThreads, collections.size(), Strings.join((CharSequence)", ", collections)});
            for (int i = 0; i < numThreads; ++i) {
                snapshotThreads.submit(() -> {
                    this.taskContext.configureLoggingContext(replicaSet.replicaSetName() + "-snapshot" + threadCounter.incrementAndGet());
                    try {
                        CollectionId id = null;
                        while (!aborted.get() && (id = (CollectionId)collectionsToCopy.poll()) != null) {
                            if (!sourceContext.isRunning()) {
                                throw new InterruptedException("Interrupted while snapshotting replica set " + replicaSet.replicaSetName());
                            }
                            if (collectionsToCopy.isEmpty()) {
                                snapshotContext.lastCollection = true;
                            }
                            this.createDataEventsForCollection(sourceContext, snapshotContext, snapshotReceiver, replicaSet, id, mongo);
                        }
                    }
                    catch (InterruptedException e) {
                        aborted.set(true);
                    }
                    finally {
                        latch.countDown();
                    }
                });
            }
            try {
                latch.await();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                aborted.set(true);
            }
            snapshotThreads.shutdown();
        } else {
            Iterator it = collections.iterator();
            while (it.hasNext()) {
                CollectionId collectionId = (CollectionId)it.next();
                if (!sourceContext.isRunning()) {
                    throw new InterruptedException("Interrupted while snapshotting replica set " + replicaSet.replicaSetName());
                }
                if (!it.hasNext()) {
                    snapshotContext.lastCollection = true;
                }
                this.createDataEventsForCollection(sourceContext, snapshotContext, snapshotReceiver, replicaSet, collectionId, mongo);
            }
        }
        offsetContext.stopReplicaSetSnapshot(replicaSet.replicaSetName());
    }

    private void createDataEventsForCollection(ChangeEventSource.ChangeEventSourceContext sourceContext, MongoDbSnapshotContext snapshotContext, EventDispatcher.SnapshotReceiver<MongoDbPartition> snapshotReceiver, ReplicaSet replicaSet, CollectionId collectionId, MongoDbConnection mongo) throws InterruptedException {
        long exportStart = this.clock.currentTimeInMillis();
        LOGGER.info("\t Exporting data for collection '{}'", (Object)collectionId);
        mongo.execute("sync '" + collectionId + "'", (BlockingConsumer<MongoClient>)((BlockingConsumer)client -> {
            MongoDatabase database = client.getDatabase(collectionId.dbName());
            MongoCollection collection = database.getCollection(collectionId.name(), BsonDocument.class);
            int batchSize = this.taskContext.getConnectorConfig().getSnapshotFetchSize();
            long docs = 0L;
            Document filterQuery = Document.parse((String)this.connectorConfig.getSnapshotFilterQueryForCollection(collectionId).orElseGet(() -> "{}"));
            try (MongoCursor cursor = collection.find((Bson)filterQuery).batchSize(batchSize).iterator();){
                snapshotContext.lastRecordInCollection = false;
                if (cursor.hasNext()) {
                    while (cursor.hasNext()) {
                        if (!sourceContext.isRunning()) {
                            throw new InterruptedException("Interrupted while snapshotting collection " + collectionId.name());
                        }
                        BsonDocument document = (BsonDocument)cursor.next();
                        ++docs;
                        boolean bl = snapshotContext.lastRecordInCollection = !cursor.hasNext();
                        if (snapshotContext.lastCollection && snapshotContext.lastRecordInCollection) {
                            ((MongoDbOffsetContext)snapshotContext.offset).markSnapshotRecord(SnapshotRecord.LAST);
                        }
                        this.dispatcher.dispatchSnapshotEvent((Partition)((MongoDbPartition)snapshotContext.partition), (DataCollectionId)collectionId, this.getChangeRecordEmitter(snapshotContext, collectionId, document, replicaSet), snapshotReceiver);
                    }
                } else if (snapshotContext.lastCollection) {
                    ((MongoDbOffsetContext)snapshotContext.offset).markSnapshotRecord(SnapshotRecord.LAST);
                }
                LOGGER.info("\t Finished snapshotting {} records for collection '{}'; total duration '{}'", new Object[]{docs, collectionId, Strings.duration((long)(this.clock.currentTimeInMillis() - exportStart))});
                this.snapshotProgressListener.dataCollectionSnapshotCompleted((Partition)((MongoDbPartition)snapshotContext.partition), (DataCollectionId)collectionId, docs);
            }
        }));
    }

    private ChangeRecordEmitter<MongoDbPartition> getChangeRecordEmitter(AbstractSnapshotChangeEventSource.SnapshotContext<MongoDbPartition, MongoDbOffsetContext> snapshotContext, CollectionId collectionId, BsonDocument document, ReplicaSet replicaSet) {
        MongoDbOffsetContext offsetContext = (MongoDbOffsetContext)snapshotContext.offset;
        ReplicaSetPartition replicaSetPartition = offsetContext.getReplicaSetPartition(replicaSet);
        ReplicaSetOffsetContext replicaSetOffsetContext = offsetContext.getReplicaSetOffsetContext(replicaSet);
        replicaSetOffsetContext.readEvent(collectionId, this.getClock().currentTime());
        return new MongoDbSnapshotRecordEmitter(replicaSetPartition, (OffsetContext)replicaSetOffsetContext, this.getClock(), document);
    }

    private Clock getClock() {
        return this.clock;
    }

    public static class MongoDbSnapshottingTask
    extends AbstractSnapshotChangeEventSource.SnapshottingTask {
        private final List<ReplicaSet> replicaSetsToSnapshot;

        public MongoDbSnapshottingTask(List<ReplicaSet> replicaSetsToSnapshot) {
            super(false, !replicaSetsToSnapshot.isEmpty());
            this.replicaSetsToSnapshot = replicaSetsToSnapshot;
        }

        public List<ReplicaSet> getReplicaSetsToSnapshot() {
            return Collections.unmodifiableList(this.replicaSetsToSnapshot);
        }

        public boolean shouldSkipSnapshot() {
            return !this.snapshotData();
        }

        public String toString() {
            return "SnapshottingTask [replicaSetsToSnapshot=" + this.replicaSetsToSnapshot + "]";
        }
    }

    private static class MongoDbSnapshotContext
    extends AbstractSnapshotChangeEventSource.SnapshotContext<MongoDbPartition, MongoDbOffsetContext> {
        public boolean lastCollection;
        public boolean lastRecordInCollection;

        MongoDbSnapshotContext(MongoDbPartition partition) {
            super((Partition)partition);
        }
    }
}

