/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.dialect;

import com.mongodb.client.MongoClient;
import io.debezium.relational.TableId;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.fetch.MongodbFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.fetch.MongodbScanFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.fetch.MongodbStreamFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamDescriptor;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.splitters.MongodbChunkSplitter;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.CollectionDiscoveryUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongodbDialect
implements DataSourceDialect<MongodbSourceConfig> {
    private static final Logger log = LoggerFactory.getLogger(MongodbDialect.class);
    private final Map<MongodbSourceConfig, CollectionDiscoveryUtils.CollectionDiscoveryInfo> cache = new ConcurrentHashMap<MongodbSourceConfig, CollectionDiscoveryUtils.CollectionDiscoveryInfo>();

    @Override
    public String getName() {
        return "MongoDB";
    }

    @Override
    public List<TableId> discoverDataCollections(MongodbSourceConfig sourceConfig) {
        CollectionDiscoveryUtils.CollectionDiscoveryInfo discoveryInfo = this.discoverAndCacheDataCollections(sourceConfig);
        return discoveryInfo.getDiscoveredCollections().stream().map(TableId::parse).collect(Collectors.toList());
    }

    @Override
    public boolean isDataCollectionIdCaseSensitive(MongodbSourceConfig sourceConfig) {
        return true;
    }

    @Override
    public ChunkSplitter createChunkSplitter(MongodbSourceConfig sourceConfig) {
        return new MongodbChunkSplitter(sourceConfig);
    }

    @Override
    public FetchTask<SourceSplitBase> createFetchTask(@Nonnull SourceSplitBase sourceSplitBase) {
        if (sourceSplitBase.isSnapshotSplit()) {
            return new MongodbScanFetchTask(sourceSplitBase.asSnapshotSplit());
        }
        return new MongodbStreamFetchTask(sourceSplitBase.asIncrementalSplit());
    }

    @Override
    public FetchTask.Context createFetchTaskContext(SourceSplitBase sourceSplitBase, MongodbSourceConfig sourceConfig) {
        CollectionDiscoveryUtils.CollectionDiscoveryInfo discoveryInfo = this.discoverAndCacheDataCollections(sourceConfig);
        ChangeStreamDescriptor changeStreamDescriptor = MongodbUtils.getChangeStreamDescriptor(sourceConfig, discoveryInfo.getDiscoveredDatabases(), discoveryInfo.getDiscoveredCollections());
        return new MongodbFetchTaskContext(this, sourceConfig, changeStreamDescriptor);
    }

    private CollectionDiscoveryUtils.CollectionDiscoveryInfo discoverAndCacheDataCollections(MongodbSourceConfig sourceConfig) {
        return this.cache.computeIfAbsent(sourceConfig, config -> {
            MongoClient mongoClient = MongodbUtils.createMongoClient(sourceConfig);
            List<String> discoveredDatabases = CollectionDiscoveryUtils.databaseNames(mongoClient, CollectionDiscoveryUtils.databaseFilter(sourceConfig.getDatabaseList()));
            List<String> discoveredCollections = CollectionDiscoveryUtils.collectionNames(mongoClient, discoveredDatabases, CollectionDiscoveryUtils.collectionsFilter(sourceConfig.getCollectionList()));
            return new CollectionDiscoveryUtils.CollectionDiscoveryInfo(discoveredDatabases, discoveredCollections);
        });
    }

    public ChangeStreamOffset displayCurrentOffset(MongodbSourceConfig sourceConfig) {
        ChangeStreamOffset changeStreamOffset;
        CollectionDiscoveryUtils.CollectionDiscoveryInfo discoveryInfo;
        ChangeStreamDescriptor changeStreamDescriptor;
        MongoClient mongoClient = MongodbUtils.createMongoClient(sourceConfig);
        BsonDocument startupResumeToken = MongodbUtils.getLatestResumeToken(mongoClient, changeStreamDescriptor = MongodbUtils.getChangeStreamDescriptor(sourceConfig, (discoveryInfo = this.discoverAndCacheDataCollections(sourceConfig)).getDiscoveredDatabases(), discoveryInfo.getDiscoveredCollections()));
        if (startupResumeToken != null) {
            changeStreamOffset = new ChangeStreamOffset(startupResumeToken);
            log.info("startup resume token={},change stream offset={}", (Object)startupResumeToken, (Object)changeStreamOffset);
        } else {
            changeStreamOffset = new ChangeStreamOffset(MongodbUtils.getCurrentClusterTime(mongoClient));
        }
        return changeStreamOffset;
    }
}

