/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb;

import com.google.auto.service.AutoService;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.MongodbIncrementalSource;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;

@AutoService(value={Factory.class})
public class MongodbIncrementalSourceFactory
implements TableSourceFactory {
    public String factoryIdentifier() {
        return "MongoDB-CDC";
    }

    public OptionRule optionRule() {
        return MongodbSourceOptions.getBaseRule().required(new Option[]{MongodbSourceOptions.HOSTS, MongodbSourceOptions.DATABASE, MongodbSourceOptions.COLLECTION}).exclusive(new Option[]{ConnectorCommonOptions.SCHEMA, ConnectorCommonOptions.TABLE_CONFIGS}).optional(new Option[]{MongodbSourceOptions.USERNAME, MongodbSourceOptions.PASSWORD, MongodbSourceOptions.CONNECTION_OPTIONS, MongodbSourceOptions.BATCH_SIZE, MongodbSourceOptions.POLL_MAX_BATCH_SIZE, MongodbSourceOptions.POLL_AWAIT_TIME_MILLIS, MongodbSourceOptions.HEARTBEAT_INTERVAL_MILLIS, MongodbSourceOptions.INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB, MongodbSourceOptions.STARTUP_MODE, MongodbSourceOptions.STOP_MODE}).conditional(MongodbSourceOptions.STARTUP_MODE, (Object)StartupMode.TIMESTAMP, new Option[]{SourceOptions.STARTUP_TIMESTAMP}).build();
    }

    public Class<? extends SeaTunnelSource> getSourceClass() {
        return MongodbIncrementalSource.class;
    }

    public <T, SplitT extends SourceSplit, StateT extends Serializable> TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
        return () -> {
            List<CatalogTable> catalogTables = this.buildWithConfig(context.getOptions());
            List collections = (List)context.getOptions().get(MongodbSourceOptions.COLLECTION);
            this.validateCatalogTablesAndCollections(catalogTables, collections);
            catalogTables = this.updateAndValidateCatalogTableId(catalogTables, collections);
            return new MongodbIncrementalSource(context.getOptions(), catalogTables);
        };
    }

    private List<CatalogTable> updateAndValidateCatalogTableId(List<CatalogTable> catalogTables, List<String> collections) {
        for (int i = 0; i < catalogTables.size(); ++i) {
            CatalogTable catalogTable = catalogTables.get(i);
            String collectionName = collections.get(i);
            String fullName = catalogTable.getTablePath().getFullName();
            if (!fullName.equals(TablePath.DEFAULT.getFullName())) continue;
            if (catalogTables.size() == 1) {
                TableIdentifier updatedIdentifier = TableIdentifier.of((String)catalogTable.getCatalogName(), (TablePath)TablePath.of((String)collectionName));
                return Collections.singletonList(CatalogTable.of((TableIdentifier)updatedIdentifier, (CatalogTable)catalogTable));
            }
            if (fullName.equals(collectionName)) continue;
            throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, String.format("Inconsistent naming found at index %d: The collection name '%s' must match the schema table name '%s'.", i, collectionName, fullName));
        }
        return catalogTables;
    }

    private void validateCatalogTablesAndCollections(List<CatalogTable> catalogTables, List<String> collections) {
        if (catalogTables.size() != collections.size()) {
            throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "The number of collections must be equal to the number of schema tables");
        }
    }

    private List<CatalogTable> buildWithConfig(ReadonlyConfig config) {
        String factoryId = ((String)config.get(ConnectorCommonOptions.PLUGIN_NAME)).replace("-CDC", "");
        Map schemaMap = (Map)config.get(ConnectorCommonOptions.SCHEMA);
        if (schemaMap != null) {
            if (schemaMap.isEmpty()) {
                throw new SeaTunnelException("Schema config can not be empty");
            }
            CatalogTable catalogTable = CatalogTableUtil.buildWithConfig((String)factoryId, (ReadonlyConfig)config);
            return Collections.singletonList(catalogTable);
        }
        List schemaMaps = (List)config.get(ConnectorCommonOptions.TABLE_CONFIGS);
        if (schemaMaps != null) {
            if (schemaMaps.isEmpty()) {
                throw new SeaTunnelException("tables_configs can not be empty");
            }
            return schemaMaps.stream().map(map -> CatalogTableUtil.buildWithConfig((String)factoryId, (ReadonlyConfig)ReadonlyConfig.fromMap((Map)map))).collect(Collectors.toList());
        }
        return Collections.emptyList();
    }
}

