/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mongodb.source.reader.fetch;

import com.mongodb.MongoCommandException;
import com.mongodb.MongoNamespace;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.changestream.OperationType;
import com.mongodb.kafka.connect.source.heartbeat.HeartbeatManager;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import java.time.Instant;
import java.util.Optional;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import org.apache.flink.cdc.connectors.mongodb.source.offset.ChangeStreamDescriptor;
import org.apache.flink.cdc.connectors.mongodb.source.offset.ChangeStreamOffset;
import org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBFetchTaskContext;
import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils;
import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDBStreamFetchTask
implements FetchTask<SourceSplitBase> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBStreamFetchTask.class);
    private final StreamSplit streamSplit;
    private volatile boolean taskRunning = false;
    private MongoDBSourceConfig sourceConfig;
    private final Time time = new SystemTime();
    private boolean supportsStartAtOperationTime = true;
    private boolean supportsStartAfter = true;

    public MongoDBStreamFetchTask(StreamSplit streamSplit) {
        this.streamSplit = streamSplit;
    }

    public void execute(FetchTask.Context context) throws Exception {
        MongoDBFetchTaskContext taskContext = (MongoDBFetchTaskContext)context;
        this.sourceConfig = taskContext.getSourceConfig();
        ChangeStreamDescriptor descriptor = taskContext.getChangeStreamDescriptor();
        ChangeEventQueue<DataChangeEvent> queue = taskContext.getQueue();
        MongoClient mongoClient = MongoUtils.clientFor(this.sourceConfig);
        MongoChangeStreamCursor<BsonDocument> changeStreamCursor = this.openChangeStreamCursor(descriptor);
        HeartbeatManager heartbeatManager = this.openHeartbeatManagerIfNeeded(changeStreamCursor);
        long startPoll = this.time.milliseconds();
        long nextUpdate = startPoll + (long)this.sourceConfig.getPollAwaitTimeMillis();
        this.taskRunning = true;
        try {
            while (this.taskRunning) {
                ChangeStreamOffset currentOffset;
                Optional<Object> next;
                try {
                    next = Optional.ofNullable(changeStreamCursor.tryNext());
                }
                catch (MongoCommandException e) {
                    if (MongoUtils.checkIfChangeStreamCursorExpires(e)) {
                        LOG.warn("Change stream cursor has expired, trying to recreate cursor");
                        boolean resumeTokenExpires = MongoUtils.checkIfResumeTokenExpires(e);
                        if (resumeTokenExpires) {
                            LOG.warn("Resume token has expired, fallback to timestamp restart mode");
                        }
                        changeStreamCursor = this.openChangeStreamCursor(descriptor, resumeTokenExpires);
                        next = Optional.ofNullable(changeStreamCursor.tryNext());
                    }
                    throw e;
                }
                SourceRecord changeRecord = null;
                if (!next.isPresent()) {
                    long untilNext = nextUpdate - this.time.milliseconds();
                    if (untilNext > 0L) {
                        LOG.debug("Waiting {} ms to poll change records", (Object)untilNext);
                        this.time.sleep(untilNext);
                        continue;
                    }
                    if (heartbeatManager != null) {
                        changeRecord = heartbeatManager.heartbeat().map(this::normalizeHeartbeatRecord).orElse(null);
                    }
                    nextUpdate = this.time.milliseconds() + (long)this.sourceConfig.getPollAwaitTimeMillis();
                } else {
                    BsonDocument changeStreamDocument = (BsonDocument)next.get();
                    OperationType operationType = this.getOperationType(changeStreamDocument);
                    switch (operationType) {
                        case INSERT: 
                        case UPDATE: 
                        case REPLACE: 
                        case DELETE: {
                            MongoNamespace namespace = this.getMongoNamespace(changeStreamDocument);
                            BsonDocument resumeToken = changeStreamDocument.getDocument((Object)"_id");
                            BsonDocument valueDocument = this.normalizeChangeStreamDocument(changeStreamDocument);
                            LOG.trace("Adding {} to {}", (Object)valueDocument, (Object)namespace.getFullName());
                            changeRecord = MongoRecordUtils.createSourceRecord(MongoRecordUtils.createPartitionMap(this.sourceConfig.getScheme(), this.sourceConfig.getHosts(), namespace.getDatabaseName(), namespace.getCollectionName()), MongoRecordUtils.createSourceOffsetMap(resumeToken, false), namespace.getFullName(), changeStreamDocument.getDocument((Object)"_id"), valueDocument);
                            break;
                        }
                        default: {
                            LOG.info("Ignored {} record: {}", (Object)operationType, (Object)changeStreamDocument);
                        }
                    }
                }
                if (changeRecord != null && !this.isBoundedRead()) {
                    queue.enqueue((Object)new DataChangeEvent(changeRecord));
                }
                if (!this.isBoundedRead()) continue;
                if (changeRecord != null) {
                    currentOffset = new ChangeStreamOffset(MongoRecordUtils.getResumeToken(changeRecord));
                    if (currentOffset.isAtOrBefore(this.streamSplit.getEndingOffset())) {
                        queue.enqueue((Object)new DataChangeEvent(changeRecord));
                    }
                } else {
                    currentOffset = new ChangeStreamOffset(MongoUtils.getCurrentClusterTime(mongoClient));
                }
                if (!currentOffset.isAtOrAfter(this.streamSplit.getEndingOffset())) continue;
                SourceRecord watermark = WatermarkEvent.create(MongoRecordUtils.createWatermarkPartitionMap(descriptor.toString()), (String)"__mongodb_watermarks", (String)this.streamSplit.splitId(), (WatermarkKind)WatermarkKind.END, (Offset)currentOffset);
                queue.enqueue((Object)new DataChangeEvent(watermark));
                break;
            }
        }
        catch (Exception e) {
            LOG.error("Poll change stream records failed ", (Throwable)e);
            throw e;
        }
        finally {
            this.taskRunning = false;
            if (changeStreamCursor != null) {
                changeStreamCursor.close();
            }
        }
    }

    public boolean isRunning() {
        return this.taskRunning;
    }

    public StreamSplit getSplit() {
        return this.streamSplit;
    }

    public void close() {
        this.taskRunning = false;
    }

    private MongoChangeStreamCursor<BsonDocument> openChangeStreamCursor(ChangeStreamDescriptor changeStreamDescriptor) {
        return this.openChangeStreamCursor(changeStreamDescriptor, false);
    }

    private MongoChangeStreamCursor<BsonDocument> openChangeStreamCursor(ChangeStreamDescriptor changeStreamDescriptor, boolean forceTimestampStartup) {
        ChangeStreamOffset offset = new ChangeStreamOffset(this.streamSplit.getStartingOffset().getOffset());
        ChangeStreamIterable<Document> changeStreamIterable = MongoUtils.getChangeStreamIterable(this.sourceConfig, changeStreamDescriptor);
        BsonDocument resumeToken = offset.getResumeToken();
        BsonTimestamp timestamp = offset.getTimestamp();
        if (resumeToken != null && !forceTimestampStartup) {
            if (this.supportsStartAfter) {
                LOG.info("Open the change stream after the previous offset: {}", (Object)resumeToken);
                changeStreamIterable.startAfter(resumeToken);
            } else {
                LOG.info("Open the change stream after the previous offset using resumeAfter: {}", (Object)resumeToken);
                changeStreamIterable.resumeAfter(resumeToken);
            }
        } else if (this.supportsStartAtOperationTime) {
            LOG.info("Open the change stream at the timestamp: {}", (Object)timestamp);
            changeStreamIterable.startAtOperationTime(timestamp);
        } else {
            if (forceTimestampStartup) {
                LOG.error("Open change stream failed. Unable to resume from timestamp");
                throw new FlinkRuntimeException("Open change stream failed. Unable to resume from timestamp");
            }
            LOG.warn("Open the change stream of the latest offset");
        }
        try {
            return (MongoChangeStreamCursor)changeStreamIterable.withDocumentClass(BsonDocument.class).cursor();
        }
        catch (MongoCommandException e) {
            if (e.getErrorCode() == 9 || e.getErrorCode() == 40415) {
                if (e.getErrorMessage().contains("startAtOperationTime")) {
                    this.supportsStartAtOperationTime = false;
                    return this.openChangeStreamCursor(changeStreamDescriptor);
                }
                if (e.getErrorMessage().contains("startAfter")) {
                    this.supportsStartAfter = false;
                    return this.openChangeStreamCursor(changeStreamDescriptor);
                }
                LOG.error("Open change stream failed ", (Throwable)e);
                throw new FlinkRuntimeException("Open change stream failed", (Throwable)e);
            }
            if (e.getErrorCode() == 20) {
                LOG.error("Illegal $changeStream operation: {} {}", (Object)e.getErrorMessage(), (Object)e.getErrorCode());
                throw new FlinkRuntimeException("Illegal $changeStream operation", (Throwable)e);
            }
            if (e.getErrorCode() == 13) {
                LOG.error("Unauthorized $changeStream operation: {} {}", (Object)e.getErrorMessage(), (Object)e.getErrorCode());
                throw new FlinkRuntimeException("Unauthorized $changeStream operation", (Throwable)e);
            }
            if (!forceTimestampStartup && MongoUtils.checkIfResumeTokenExpires(e)) {
                LOG.info("Failed to open cursor with resume token, fallback to timestamp startup");
                return this.openChangeStreamCursor(changeStreamDescriptor, true);
            }
            LOG.error("Open change stream failed ", (Throwable)e);
            throw new FlinkRuntimeException("Open change stream failed", (Throwable)e);
        }
    }

    private HeartbeatManager openHeartbeatManagerIfNeeded(MongoChangeStreamCursor<BsonDocument> changeStreamCursor) {
        if (this.sourceConfig.getHeartbeatIntervalMillis() > 0) {
            return new HeartbeatManager(this.time, changeStreamCursor, (long)this.sourceConfig.getHeartbeatIntervalMillis(), "__mongodb_heartbeats", MongoRecordUtils.createHeartbeatPartitionMap(this.sourceConfig.getScheme(), this.sourceConfig.getHosts()));
        }
        return null;
    }

    private BsonDocument normalizeChangeStreamDocument(BsonDocument changeStreamDocument) {
        changeStreamDocument.put("_id", (BsonValue)this.normalizeKeyDocument(changeStreamDocument));
        changeStreamDocument.put("ts_ms", (BsonValue)new BsonInt64(System.currentTimeMillis()));
        BsonDocument source = new BsonDocument();
        source.put("snapshot", (BsonValue)new BsonString("false"));
        if (!changeStreamDocument.containsKey((Object)"clusterTime")) {
            if (changeStreamDocument.containsKey((Object)"ts_ms")) {
                long timestampValue = changeStreamDocument.getInt64((Object)"ts_ms").getValue();
                BsonTimestamp legacyTimestamp = MongoRecordUtils.bsonTimestampFromEpochMillis(timestampValue);
                changeStreamDocument.put("clusterTime", (BsonValue)legacyTimestamp);
            } else {
                LOG.warn("Cannot extract clusterTime from change stream event, fallback to current timestamp.");
                changeStreamDocument.put("clusterTime", (BsonValue)MongoRecordUtils.currentBsonTimestamp());
            }
        }
        BsonTimestamp clusterTime = changeStreamDocument.getTimestamp((Object)"clusterTime");
        Instant clusterInstant = Instant.ofEpochSecond(clusterTime.getTime());
        source.put("ts_ms", (BsonValue)new BsonInt64(clusterInstant.toEpochMilli()));
        changeStreamDocument.put("source", (BsonValue)source);
        return changeStreamDocument;
    }

    private BsonDocument normalizeKeyDocument(BsonDocument changeStreamDocument) {
        BsonDocument documentKey = changeStreamDocument.getDocument((Object)"documentKey");
        BsonDocument primaryKey = new BsonDocument("_id", documentKey.get((Object)"_id"));
        return new BsonDocument("_id", (BsonValue)primaryKey);
    }

    private SourceRecord normalizeHeartbeatRecord(SourceRecord heartbeatRecord) {
        Struct heartbeatValue = new Struct(MongoDBEnvelope.HEARTBEAT_VALUE_SCHEMA);
        heartbeatValue.put("ts_ms", (Object)Instant.now().toEpochMilli());
        return new SourceRecord(heartbeatRecord.sourcePartition(), heartbeatRecord.sourceOffset(), heartbeatRecord.topic(), heartbeatRecord.keySchema(), heartbeatRecord.key(), MongoDBEnvelope.HEARTBEAT_VALUE_SCHEMA, (Object)heartbeatValue);
    }

    private MongoNamespace getMongoNamespace(BsonDocument changeStreamDocument) {
        BsonDocument ns = changeStreamDocument.getDocument((Object)"ns");
        return new MongoNamespace(ns.getString((Object)"db").getValue(), ns.getString((Object)"coll").getValue());
    }

    private OperationType getOperationType(BsonDocument changeStreamDocument) {
        return OperationType.fromString((String)changeStreamDocument.getString((Object)"operationType").getValue());
    }

    private boolean isBoundedRead() {
        return !ChangeStreamOffset.NO_STOPPING_OFFSET.equals(this.streamSplit.getEndingOffset());
    }
}

