/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mongodb.source.reader.fetch;

import com.mongodb.client.model.changestream.OperationType;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.util.LoggingContext;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import org.apache.flink.cdc.connectors.mongodb.source.dialect.MongoDBDialect;
import org.apache.flink.cdc.connectors.mongodb.source.offset.ChangeStreamDescriptor;
import org.apache.flink.cdc.connectors.mongodb.source.offset.ChangeStreamOffset;
import org.apache.flink.cdc.connectors.mongodb.source.utils.BsonUtils;
import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.bson.BsonType;
import org.bson.BsonValue;

public class MongoDBFetchTaskContext
implements FetchTask.Context {
    private final MongoDBDialect dialect;
    private final MongoDBSourceConfig sourceConfig;
    private final ChangeStreamDescriptor changeStreamDescriptor;
    private ChangeEventQueue<DataChangeEvent> changeEventQueue;

    public MongoDBFetchTaskContext(MongoDBDialect dialect, MongoDBSourceConfig sourceConfig, ChangeStreamDescriptor changeStreamDescriptor) {
        this.dialect = dialect;
        this.sourceConfig = sourceConfig;
        this.changeStreamDescriptor = changeStreamDescriptor;
    }

    @Override
    public void configure(SourceSplitBase sourceSplitBase) {
        int queueSize = this.sourceConfig.getBatchSize();
        this.changeEventQueue = new ChangeEventQueue.Builder().pollInterval(Duration.ofMillis(this.sourceConfig.getPollAwaitTimeMillis())).maxBatchSize(this.sourceConfig.getPollMaxBatchSize()).maxQueueSize(queueSize).loggingContextSupplier(() -> LoggingContext.forConnector("mongodb-cdc", "mongodb-cdc-connector", "mongodb-cdc-connector-task")).build();
    }

    @Override
    public MongoDBSourceConfig getSourceConfig() {
        return this.sourceConfig;
    }

    @Override
    public MongoDBDialect getDataSourceDialect() {
        return this.dialect;
    }

    public ChangeStreamDescriptor getChangeStreamDescriptor() {
        return this.changeStreamDescriptor;
    }

    @Override
    public ChangeEventQueue<DataChangeEvent> getQueue() {
        return this.changeEventQueue;
    }

    @Override
    public TableId getTableId(SourceRecord record) {
        return MongoRecordUtils.getTableId(record);
    }

    @Override
    public Tables.TableFilter getTableFilter() {
        return Tables.TableFilter.includeAll();
    }

    @Override
    public Offset getStreamOffset(SourceRecord record) {
        return new ChangeStreamOffset(MongoRecordUtils.getResumeToken(record));
    }

    @Override
    public boolean isDataChangeRecord(SourceRecord record) {
        return MongoRecordUtils.isDataChangeRecord(record);
    }

    @Override
    public boolean isRecordBetween(SourceRecord record, Object[] splitStart, Object[] splitEnd) {
        BsonDocument documentKey = MongoRecordUtils.getDocumentKey(record);
        BsonDocument splitKeys = (BsonDocument)splitStart[0];
        String firstKey = splitKeys.getFirstKey();
        BsonValue keyValue = documentKey.get(firstKey);
        BsonValue lowerBound = ((BsonDocument)splitStart[1]).get(firstKey);
        BsonValue upperBound = ((BsonDocument)splitEnd[1]).get(firstKey);
        if (lowerBound.getBsonType() == BsonType.MIN_KEY && upperBound.getBsonType() == BsonType.MAX_KEY) {
            return true;
        }
        return BsonUtils.compareBsonValue(lowerBound, keyValue) <= 0 && BsonUtils.compareBsonValue(keyValue, upperBound) < 0;
    }

    @Override
    public void rewriteOutputBuffer(Map<Struct, SourceRecord> outputBuffer, SourceRecord changeRecord) {
        Struct key = (Struct)changeRecord.key();
        Struct value = (Struct)changeRecord.value();
        if (value != null) {
            OperationType operation = OperationType.fromString(value.getString("operationType"));
            switch (operation) {
                case INSERT: 
                case UPDATE: 
                case REPLACE: {
                    value.put("operationType", (Object)OperationType.INSERT.getValue());
                    outputBuffer.put(key, changeRecord);
                    break;
                }
                case DELETE: {
                    outputBuffer.remove(key);
                    break;
                }
                default: {
                    throw new IllegalStateException(String.format("Data change record meet UNKNOWN operation, the the record is %s.", changeRecord));
                }
            }
        }
    }

    @Override
    public List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord> snapshotRecords) {
        return snapshotRecords.stream().peek(record -> {
            Struct value = (Struct)record.value();
            Struct source = new Struct(value.schema().field("source").schema());
            source.put("ts_ms", (Object)0L);
            source.put("snapshot", (Object)"true");
            value.put("source", (Object)source);
        }).collect(Collectors.toList());
    }

    @Override
    public void close() throws Exception {
    }
}

