/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.mongodb.source;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentRowDataDeserializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.config.MongodbReadOptions;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.enumerator.MongodbSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.reader.MongodbReader;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplit;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplitStrategy;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.SamplingSplitStrategy;
import org.bson.BsonDocument;

public class MongodbSource
implements SeaTunnelSource<SeaTunnelRow, MongoSplit, ArrayList<MongoSplit>>,
SupportColumnProjection {
    private static final long serialVersionUID = 1L;
    private final CatalogTable catalogTable;
    private final ReadonlyConfig options;

    public MongodbSource(CatalogTable catalogTable, ReadonlyConfig options) {
        this.catalogTable = catalogTable;
        this.options = options;
    }

    public String getPluginName() {
        return "MongoDB";
    }

    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }

    public List<CatalogTable> getProducedCatalogTables() {
        return Collections.singletonList(this.catalogTable);
    }

    public SourceReader<SeaTunnelRow, MongoSplit> createReader(SourceReader.Context readerContext) {
        return new MongodbReader(readerContext, this.crateClientProvider(this.options), this.createDeserializer(this.options, this.catalogTable.getSeaTunnelRowType()), this.createMongodbReadOptions(this.options));
    }

    public SourceSplitEnumerator<MongoSplit, ArrayList<MongoSplit>> createEnumerator(SourceSplitEnumerator.Context<MongoSplit> enumeratorContext) {
        MongodbClientProvider clientProvider = this.crateClientProvider(this.options);
        return new MongodbSplitEnumerator(enumeratorContext, clientProvider, this.createSplitStrategy(this.options, clientProvider));
    }

    public SourceSplitEnumerator<MongoSplit, ArrayList<MongoSplit>> restoreEnumerator(SourceSplitEnumerator.Context<MongoSplit> enumeratorContext, ArrayList<MongoSplit> checkpointState) {
        MongodbClientProvider clientProvider = this.crateClientProvider(this.options);
        return new MongodbSplitEnumerator(enumeratorContext, clientProvider, this.createSplitStrategy(this.options, clientProvider), checkpointState);
    }

    private MongodbClientProvider crateClientProvider(ReadonlyConfig config) {
        return MongodbCollectionProvider.builder().connectionString((String)config.get(MongodbConfig.URI)).database((String)config.get(MongodbConfig.DATABASE)).collection((String)config.get(MongodbConfig.COLLECTION)).build();
    }

    private DocumentRowDataDeserializer createDeserializer(ReadonlyConfig config, SeaTunnelRowType rowType) {
        return new DocumentRowDataDeserializer(rowType.getFieldNames(), (SeaTunnelDataType<?>)rowType, (Boolean)config.get(MongodbConfig.FLAT_SYNC_STRING));
    }

    private MongoSplitStrategy createSplitStrategy(ReadonlyConfig config, MongodbClientProvider clientProvider) {
        SamplingSplitStrategy.Builder splitStrategyBuilder = SamplingSplitStrategy.builder();
        splitStrategyBuilder.setSplitKey((String)config.get(MongodbConfig.SPLIT_KEY));
        splitStrategyBuilder.setSizePerSplit((Long)config.get(MongodbConfig.SPLIT_SIZE));
        config.getOptional(MongodbConfig.MATCH_QUERY).ifPresent(s -> splitStrategyBuilder.setMatchQuery(BsonDocument.parse(s)));
        config.getOptional(MongodbConfig.PROJECTION).ifPresent(s -> splitStrategyBuilder.setProjection(BsonDocument.parse(s)));
        return splitStrategyBuilder.setClientProvider(clientProvider).build();
    }

    private MongodbReadOptions createMongodbReadOptions(ReadonlyConfig config) {
        MongodbReadOptions.MongoReadOptionsBuilder mongoReadOptionsBuilder = MongodbReadOptions.builder();
        mongoReadOptionsBuilder.setMaxTimeMS((Long)config.get(MongodbConfig.MAX_TIME_MIN));
        mongoReadOptionsBuilder.setFetchSize((Integer)config.get(MongodbConfig.FETCH_SIZE));
        mongoReadOptionsBuilder.setNoCursorTimeout((Boolean)config.get(MongodbConfig.CURSOR_NO_TIMEOUT));
        return mongoReadOptionsBuilder.build();
    }
}

