/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nonnull;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
import org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfigProvider;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.MongoDBRecordEmitter;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.dialect.MongodbDialect;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffsetFactory;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;

public class MongodbIncrementalSource<T>
extends IncrementalSource<T, MongodbSourceConfig>
implements SupportParallelism {
    static final String IDENTIFIER = "MongoDB-CDC";

    public MongodbIncrementalSource(ReadonlyConfig options, List<CatalogTable> catalogTables) {
        super(options, catalogTables);
    }

    @Override
    public Option<StartupMode> getStartupModeOption() {
        return MongodbSourceOptions.STARTUP_MODE;
    }

    @Override
    public Option<StopMode> getStopModeOption() {
        return MongodbSourceOptions.STOP_MODE;
    }

    public String getPluginName() {
        return IDENTIFIER;
    }

    @Override
    public SourceConfig.Factory<MongodbSourceConfig> createSourceConfigFactory(@Nonnull ReadonlyConfig config) {
        MongodbSourceConfigProvider.Builder builder = MongodbSourceConfigProvider.newBuilder().hosts((String)config.get(MongodbSourceOptions.HOSTS)).validate();
        Optional.ofNullable((List)config.get(MongodbSourceOptions.DATABASE)).ifPresent(builder::databaseList);
        Optional.ofNullable((List)config.get(MongodbSourceOptions.COLLECTION)).ifPresent(builder::collectionList);
        Optional.ofNullable((String)config.get(MongodbSourceOptions.USERNAME)).ifPresent(builder::username);
        Optional.ofNullable((String)config.get(MongodbSourceOptions.PASSWORD)).ifPresent(builder::password);
        Optional.ofNullable((String)config.get(MongodbSourceOptions.CONNECTION_OPTIONS)).ifPresent(builder::connectionOptions);
        Optional.ofNullable((Integer)config.get(MongodbSourceOptions.BATCH_SIZE)).ifPresent(builder::batchSize);
        Optional.ofNullable((Integer)config.get(MongodbSourceOptions.POLL_MAX_BATCH_SIZE)).ifPresent(builder::pollMaxBatchSize);
        Optional.ofNullable((Integer)config.get(MongodbSourceOptions.POLL_AWAIT_TIME_MILLIS)).ifPresent(builder::pollAwaitTimeMillis);
        Optional.ofNullable((Integer)config.get(MongodbSourceOptions.HEARTBEAT_INTERVAL_MILLIS)).ifPresent(builder::heartbeatIntervalMillis);
        Optional.ofNullable((Integer)config.get(MongodbSourceOptions.HEARTBEAT_INTERVAL_MILLIS)).ifPresent(builder::splitMetaGroupSize);
        Optional.ofNullable((Integer)config.get(MongodbSourceOptions.INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB)).ifPresent(builder::splitSizeMB);
        Optional.ofNullable(this.startupConfig).ifPresent(builder::startupOptions);
        Optional.ofNullable(this.stopConfig).ifPresent(builder::stopOptions);
        return builder;
    }

    @Override
    public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(ReadonlyConfig config) {
        if (DeserializeFormat.COMPATIBLE_DEBEZIUM_JSON.equals(config.get(JdbcSourceOptions.FORMAT))) {
            return new DebeziumJsonDeserializeSchema((Map)config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES));
        }
        return new MongoDBConnectorDeserializationSchema(this.catalogTables);
    }

    @Override
    public DataSourceDialect<MongodbSourceConfig> createDataSourceDialect(ReadonlyConfig config) {
        return new MongodbDialect();
    }

    @Override
    public OffsetFactory createOffsetFactory(ReadonlyConfig config) {
        return new ChangeStreamOffsetFactory();
    }

    @Override
    protected RecordEmitter<SourceRecords, T, SourceSplitStateBase> createRecordEmitter(SourceConfig sourceConfig, SourceReader.Context context) {
        return new MongoDBRecordEmitter(this.deserializationSchema, this.offsetFactory, context);
    }
}

