/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mongodb.source.reader.fetch;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.changestream.OperationType;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.relational.TableId;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBFetchTaskContext;
import org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask;
import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils;
import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.RawBsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDBScanFetchTask
extends AbstractScanFetchTask {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBScanFetchTask.class);

    public MongoDBScanFetchTask(SnapshotSplit snapshotSplit) {
        super(snapshotSplit);
    }

    @Override
    protected void executeDataSnapshot(FetchTask.Context context) throws Exception {
        ChangeEventQueue<DataChangeEvent> changeEventQueue = context.getQueue();
        MongoDBFetchTaskContext taskContext = (MongoDBFetchTaskContext)context;
        MongoDBSourceConfig sourceConfig = taskContext.getSourceConfig();
        TableId collectionId = this.snapshotSplit.getTableId();
        try (MongoCursor cursor = null;){
            MongoClient mongoClient = MongoUtils.clientFor(sourceConfig);
            MongoCollection<RawBsonDocument> collection = MongoUtils.collectionFor(mongoClient, collectionId, RawBsonDocument.class);
            cursor = collection.find().min((BsonDocument)this.snapshotSplit.getSplitStart()[1]).max((BsonDocument)this.snapshotSplit.getSplitEnd()[1]).hint((BsonDocument)this.snapshotSplit.getSplitStart()[0]).batchSize(sourceConfig.getBatchSize()).noCursorTimeout(sourceConfig.disableCursorTimeout()).cursor();
            while (cursor.hasNext()) {
                if (!this.taskRunning) {
                    throw new InterruptedException("Interrupted while snapshotting collection " + collectionId.identifier());
                }
                BsonDocument valueDocument = this.normalizeSnapshotDocument(collectionId, (BsonDocument)cursor.next());
                BsonDocument keyDocument = new BsonDocument("_id", valueDocument.get("_id"));
                SourceRecord snapshotRecord = MongoRecordUtils.createSourceRecord(MongoRecordUtils.createPartitionMap(sourceConfig.getScheme(), sourceConfig.getHosts(), collectionId.catalog(), collectionId.table()), MongoRecordUtils.createSourceOffsetMap(keyDocument.getDocument("_id"), true), collectionId.identifier(), keyDocument, valueDocument);
                changeEventQueue.enqueue(new DataChangeEvent(snapshotRecord));
            }
        }
    }

    @Override
    protected void executeBackfillTask(FetchTask.Context context, StreamSplit backfillStreamSplit) throws Exception {
        MongoDBStreamFetchTask backfillStreamTask = new MongoDBStreamFetchTask(backfillStreamSplit);
        backfillStreamTask.execute(context);
    }

    @Override
    protected void dispatchLowWaterMarkEvent(FetchTask.Context context, SourceSplitBase split, Offset lowWatermark) throws InterruptedException {
        ChangeEventQueue<DataChangeEvent> changeEventQueue = context.getQueue();
        changeEventQueue.enqueue(new DataChangeEvent(WatermarkEvent.create(MongoRecordUtils.createWatermarkPartitionMap(this.snapshotSplit.getTableId().identifier()), "__mongodb_watermarks", this.snapshotSplit.splitId(), WatermarkKind.LOW, lowWatermark)));
    }

    @Override
    protected void dispatchHighWaterMarkEvent(FetchTask.Context context, SourceSplitBase split, Offset highWatermark) throws InterruptedException {
        ChangeEventQueue<DataChangeEvent> changeEventQueue = context.getQueue();
        changeEventQueue.enqueue(new DataChangeEvent(WatermarkEvent.create(MongoRecordUtils.createWatermarkPartitionMap(this.snapshotSplit.getTableId().identifier()), "__mongodb_watermarks", split.splitId(), WatermarkKind.HIGH, highWatermark)));
    }

    @Override
    protected void dispatchEndWaterMarkEvent(FetchTask.Context context, SourceSplitBase split, Offset endWatermark) throws InterruptedException {
        ChangeEventQueue<DataChangeEvent> changeEventQueue = context.getQueue();
        changeEventQueue.enqueue(new DataChangeEvent(WatermarkEvent.create(MongoRecordUtils.createWatermarkPartitionMap(this.snapshotSplit.getTableId().identifier()), "__mongodb_watermarks", split.splitId(), WatermarkKind.END, endWatermark)));
    }

    private BsonDocument normalizeSnapshotDocument(TableId collectionId, BsonDocument originalDocument) {
        BsonDocument valueDocument = new BsonDocument();
        BsonDocument id = new BsonDocument();
        id.put("_id", originalDocument.get("_id"));
        valueDocument.put("_id", id);
        valueDocument.put("operationType", new BsonString(OperationType.INSERT.getValue()));
        BsonDocument ns = new BsonDocument();
        ns.put("db", new BsonString(collectionId.catalog()));
        ns.put("coll", new BsonString(collectionId.table()));
        valueDocument.put("ns", ns);
        valueDocument.put("documentKey", new BsonDocument("_id", originalDocument.get("_id")));
        valueDocument.put("fullDocument", originalDocument);
        valueDocument.put("ts_ms", new BsonInt64(System.currentTimeMillis()));
        BsonDocument source = new BsonDocument();
        source.put("snapshot", new BsonString("true"));
        source.put("ts_ms", new BsonInt64(0L));
        valueDocument.put("source", source);
        return valueDocument;
    }
}

