/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.mongodb.CollectionId;
import io.debezium.connector.mongodb.LegacyV1MongoDbSourceInfoStructMaker;
import io.debezium.connector.mongodb.Module;
import io.debezium.connector.mongodb.MongoDbSourceInfoStructMaker;
import io.debezium.connector.mongodb.ReplicaSets;
import io.debezium.schema.DataCollectionId;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.Struct;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbConnectorConfig
extends CommonConnectorConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbConnectorConfig.class);
    protected static final String COLLECTION_INCLUDE_LIST_ALREADY_SPECIFIED_ERROR_MSG = "\"collection.include.list\" is already specified";
    protected static final String COLLECTION_WHITELIST_ALREADY_SPECIFIED_ERROR_MSG = "\"collection.whitelist\" is already specified";
    protected static final String DATABASE_INCLUDE_LIST_ALREADY_SPECIFIED_ERROR_MSG = "\"database.include.list\" is already specified";
    protected static final String DATABASE_WHITELIST_ALREADY_SPECIFIED_ERROR_MSG = "\"database.whitelist\" is already specified";
    protected static final Pattern PATTERN_SPILT = Pattern.compile(",");
    protected static final Pattern FIELD_EXCLUDE_LIST_PATTERN = Pattern.compile("^[*|\\w|\\s*]+(?:\\.[\\w]+\\.[\\w]+)+(\\.[\\w]+)*\\s*$");
    protected static final String QUALIFIED_FIELD_EXCLUDE_LIST_PATTERN = "<databaseName>.<collectionName>.<fieldName>.<nestedFieldName>";
    protected static final Pattern FIELD_RENAMES_PATTERN = Pattern.compile("^[*|\\w|\\s*]+(?:\\.[\\w]+\\.[\\w]+)+(\\.[\\w]+)*:(?:[\\w]+)+\\s*$");
    protected static final String QUALIFIED_FIELD_RENAMES_PATTERN = "<databaseName>.<collectionName>.<fieldName>.<nestedFieldName>:<newNestedFieldName>";
    protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 0;
    public static final Field HOSTS = Field.create("mongodb.hosts").withDisplayName("Hosts").withType(ConfigDef.Type.LIST).withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 1)).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withValidation(MongoDbConnectorConfig::validateHosts).withDescription("The hostname and port pairs (in the form 'host' or 'host:port') of the MongoDB server(s) in the replica set.");
    public static final Field LOGICAL_NAME = Field.create("mongodb.name").withDisplayName("Namespace").withType(ConfigDef.Type.STRING).withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 0)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.HIGH).required().withDescription("Unique name that identifies the MongoDB replica set or cluster and all recorded offsets, and that is used as a prefix for all schemas and topics. Each distinct MongoDB installation should have a separate namespace and monitored by at most one Debezium connector.");
    public static final Field USER = Field.create("mongodb.user").withDisplayName("User").withType(ConfigDef.Type.STRING).withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 3)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.HIGH).withDescription("Database user for connecting to MongoDB, if necessary.");
    public static final Field PASSWORD = Field.create("mongodb.password").withDisplayName("Password").withType(ConfigDef.Type.PASSWORD).withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 4)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.HIGH).withDescription("Password to be used when connecting to MongoDB, if necessary.");
    public static final Field AUTH_SOURCE = Field.create("mongodb.authsource").withDisplayName("Credentials Database").withType(ConfigDef.Type.STRING).withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 3)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDefault("admin").withDescription("Database containing user credentials.");
    @Deprecated
    public static final Field POLL_INTERVAL_SEC = Field.create("mongodb.poll.interval.sec").withDisplayName("Replica membership poll interval (sec)").withType(ConfigDef.Type.INT).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDefault(30).withValidation(Field::isPositiveInteger).withDescription("(Deprecated, use mongodb.poll.interval.ms) Interval for looking for new, removed, or changed replica sets, given in seconds. Defaults to 30 seconds.");
    public static final Field MONGODB_POLL_INTERVAL_MS = Field.create("mongodb.poll.interval.ms").withDisplayName("Replica membership poll interval (ms)").withType(ConfigDef.Type.LONG).withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 5)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDefault(30000L).withValidation(Field::isPositiveInteger).withDescription("Interval for looking for new, removed, or changed replica sets, given in milliseconds.  Defaults to 30 seconds (30,000 ms).");
    public static final Field SSL_ENABLED = Field.create("mongodb.ssl.enabled").withDisplayName("Enable SSL connection to MongoDB").withType(ConfigDef.Type.BOOLEAN).withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 0)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDefault(false).withValidation(Field::isBoolean).withDescription("Should connector use SSL to connect to MongoDB instances");
    public static final Field SSL_ALLOW_INVALID_HOSTNAMES = Field.create("mongodb.ssl.invalid.hostname.allowed").withDisplayName("Allow invalid hostnames for SSL connection").withType(ConfigDef.Type.BOOLEAN).withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 1)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDefault(false).withValidation(Field::isBoolean).withDescription("Whether invalid host names are allowed when using SSL. If true the connection will not prevent man-in-the-middle attacks");
    @Deprecated
    public static final Field MAX_COPY_THREADS = Field.create("initial.sync.max.threads").withDisplayName("Maximum number of threads for initial sync").withType(ConfigDef.Type.INT).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDefault(1).withValidation(Field::isPositiveInteger).withDescription("(Deprecated) Maximum number of threads used to perform an initial sync of the collections in a replica set. Defaults to 1.");
    public static final Field CONNECT_BACKOFF_INITIAL_DELAY_MS = Field.create("connect.backoff.initial.delay.ms").withDisplayName("Initial delay before reconnection (ms)").withType(ConfigDef.Type.LONG).withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 1)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDefault(TimeUnit.SECONDS.toMillis(1L)).withValidation(Field::isPositiveInteger).withDescription("The initial delay when trying to reconnect to a primary after a connection cannot be made or when no primary is available, given in milliseconds. Defaults to 1 second (1,000 ms).");
    public static final Field CONNECT_BACKOFF_MAX_DELAY_MS = Field.create("connect.backoff.max.delay.ms").withDisplayName("Maximum delay before reconnection (ms)").withType(ConfigDef.Type.LONG).withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 2)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDefault(TimeUnit.SECONDS.toMillis(120L)).withValidation(Field::isPositiveInteger).withDescription("The maximum delay when trying to reconnect to a primary after a connection cannot be made or when no primary is available, given in milliseconds. Defaults to 120 second (120,000 ms).");
    public static final Field MAX_FAILED_CONNECTIONS = Field.create("connect.max.attempts").withDisplayName("Connection attempt limit").withType(ConfigDef.Type.INT).withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 4)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.HIGH).withDefault(16).withValidation(Field::isPositiveInteger).withDescription("Maximum number of failed connection attempts to a replica set primary before an exception occurs and task is aborted. Defaults to 16, which with the defaults for '" + CONNECT_BACKOFF_INITIAL_DELAY_MS + "' and '" + CONNECT_BACKOFF_MAX_DELAY_MS + "' results in just over 20 minutes of attempts before failing.");
    public static final Field AUTO_DISCOVER_MEMBERS = Field.create("mongodb.members.auto.discover").withDisplayName("Auto-discovery").withType(ConfigDef.Type.BOOLEAN).withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 2)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDefault(true).withValidation(Field::isBoolean).withDescription("Specifies whether the addresses in 'hosts' are seeds that should be used to discover all members of the cluster or replica set ('true'), or whether the address(es) in 'hosts' should be used as is ('false'). The default is 'true'.");
    public static final Field DATABASE_INCLUDE_LIST = Field.create("database.include.list").withDisplayName("Include Databases").withType(ConfigDef.Type.LIST).withGroup(Field.createGroupEntry(Field.Group.FILTERS, 0)).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withValidation(Field::isListOfRegex).withDescription("A comma-separated list of regular expressions that match the database names for which changes are to be captured");
    @Deprecated
    public static final Field DATABASE_WHITELIST = Field.create("database.whitelist").withDisplayName("Deprecated: Include Databases").withType(ConfigDef.Type.LIST).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.LOW).withValidation(Field::isListOfRegex).withInvisibleRecommender().withDescription("A comma-separated list of regular expressions that match the database names for which changes are to be captured (deprecated, use \"" + DATABASE_INCLUDE_LIST.name() + "\" instead)");
    public static final Field DATABASE_EXCLUDE_LIST = Field.create("database.exclude.list").withDisplayName("Exclude Databases").withType(ConfigDef.Type.LIST).withGroup(Field.createGroupEntry(Field.Group.FILTERS, 1)).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withValidation(Field::isListOfRegex, MongoDbConnectorConfig::validateDatabaseExcludeList).withDescription("A comma-separated list of regular expressions that match the database names for which changes are to be excluded");
    @Deprecated
    public static final Field DATABASE_BLACKLIST = Field.create("database.blacklist").withDisplayName("Deprecated: Exclude Databases").withType(ConfigDef.Type.LIST).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.LOW).withValidation(Field::isListOfRegex, MongoDbConnectorConfig::validateDatabaseBlacklist).withInvisibleRecommender().withDescription("A comma-separated list of regular expressions that match the database names for which changes are to be excluded (deprecated, use \"" + DATABASE_EXCLUDE_LIST.name() + "\" instead)");
    public static final Field COLLECTION_INCLUDE_LIST = Field.create("collection.include.list").withDisplayName("Include Collections").withType(ConfigDef.Type.LIST).withGroup(Field.createGroupEntry(Field.Group.FILTERS, 2)).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withValidation(Field::isListOfRegex).withDescription("A comma-separated list of regular expressions that match the collection names for which changes are to be captured");
    @Deprecated
    public static final Field COLLECTION_WHITELIST = Field.create("collection.whitelist").withDisplayName("Deprecated: Include Collections").withType(ConfigDef.Type.LIST).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.LOW).withValidation(Field::isListOfRegex).withInvisibleRecommender().withDescription("A comma-separated list of regular expressions that match the collection names for which changes are to be captured (deprecated, use \"" + COLLECTION_INCLUDE_LIST.name() + "\" instead)");
    public static final Field COLLECTION_EXCLUDE_LIST = Field.create("collection.exclude.list").withGroup(Field.createGroupEntry(Field.Group.FILTERS, 3)).withValidation(Field::isListOfRegex, MongoDbConnectorConfig::validateCollectionExcludeList).withInvisibleRecommender().withDescription("A comma-separated list of regular expressions that match the collection names for which changes are to be excluded");
    @Deprecated
    public static final Field COLLECTION_BLACKLIST = Field.create("collection.blacklist").withValidation(Field::isListOfRegex, MongoDbConnectorConfig::validateCollectionBlacklist).withInvisibleRecommender().withDescription("A comma-separated list of regular expressions that match the collection names for which changes are to be excluded (deprecated, use \"" + COLLECTION_EXCLUDE_LIST.name() + "\" instead)");
    public static final Field FIELD_EXCLUDE_LIST = Field.create("field.exclude.list").withDisplayName("Exclude Fields").withType(ConfigDef.Type.STRING).withGroup(Field.createGroupEntry(Field.Group.FILTERS, 5)).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.MEDIUM).withValidation(MongoDbConnectorConfig::validateFieldExcludeList).withDescription("A comma-separated list of the fully-qualified names of fields that should be excluded from change event message values");
    @Deprecated
    public static final Field FIELD_BLACKLIST = Field.create("field.blacklist").withDisplayName("Deprecated: Exclude Fields").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.LOW).withInvisibleRecommender().withDescription("A comma-separated list of the fully-qualified names of fields that should be excluded from change event message values (deprecated, use \"" + FIELD_EXCLUDE_LIST.name() + "\" instead)");
    public static final Field FIELD_RENAMES = Field.create("field.renames").withDisplayName("Rename Fields").withType(ConfigDef.Type.STRING).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 0)).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.MEDIUM).withValidation(MongoDbConnectorConfig::validateFieldRenamesList).withDescription("A comma-separated list of the fully-qualified replacements of fields that should be used to rename fields in change event message values. Fully-qualified replacements for fields are of the form databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName, where databaseName and collectionName may contain the wildcard (*) which matches any characters, the colon character (:) is used to determine rename mapping of field.");
    public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode").withDisplayName("Snapshot mode").withEnum(SnapshotMode.class, SnapshotMode.INITIAL).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 0)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDescription("The criteria for running a snapshot upon startup of the connector. Options include: 'initial' (the default) to specify the connector should always perform an initial sync when required; 'never' to specify the connector should never perform an initial sync ");
    public static final Field CAPTURE_MODE = Field.create("capture.mode").withDisplayName("Capture mode").withEnum(CaptureMode.class, CaptureMode.CHANGE_STREAMS_UPDATE_FULL).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 1)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDescription("The method used to capture changes from MongoDB server. Options include: 'oplog' to capture changes from the oplog; 'change_streams' to capture changes via MongoDB Change Streams, update events do not contain full documents; 'change_streams_update_full' (the default) to capture changes via MongoDB Change Streams, update events contain full documents");
    public static final Field CONNECT_TIMEOUT_MS = Field.create("mongodb.connect.timeout.ms").withDisplayName("Connect Timeout MS").withType(ConfigDef.Type.INT).withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 0)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDefault(10000).withDescription("The connection timeout, given in milliseconds. Defaults to 10 seconds (10,000 ms).");
    public static final Field SERVER_SELECTION_TIMEOUT_MS = Field.create("mongodb.server.selection.timeout.ms").withDisplayName("Server selection timeout MS").withType(ConfigDef.Type.INT).withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 5)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDefault(30000).withDescription("The server selection timeout, given in milliseconds. Defaults to 10 seconds (10,000 ms).");
    public static final Field SOCKET_TIMEOUT_MS = Field.create("mongodb.socket.timeout.ms").withDisplayName("Socket timeout MS").withType(ConfigDef.Type.INT).withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 6)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDefault(0).withDescription("The socket timeout, given in milliseconds. Defaults to 0 ms.");
    protected static final Field TASK_ID = Field.create("mongodb.task.id").withDescription("Internal use only").withValidation(Field::isInteger).withInvisibleRecommender();
    public static final Field SNAPSHOT_FILTER_QUERY_BY_COLLECTION = Field.create("snapshot.collection.filter.overrides").withDisplayName("Snapshot mode").withType(ConfigDef.Type.STRING).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 1)).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.MEDIUM).withDescription("This property contains a comma-separated list of <dbName>.<collectionName>, for which  the initial snapshot may be a subset of data present in the data source. The subset would be defined by mongodb filter query specified as value for property snapshot.collection.filter.override.<dbname>.<collectionName>");
    public static final Field CURSOR_MAX_AWAIT_TIME_MS = Field.create("cursor.max.await.time.ms").withDisplayName("Server's oplog streaming cursor max await time").withType(ConfigDef.Type.INT).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDescription("The maximum processing time in milliseconds to wait for the oplog cursor to process a single poll request");
    private static final ConfigDefinition CONFIG_DEFINITION = CommonConnectorConfig.CONFIG_DEFINITION.edit().name("MongoDB").type(HOSTS, USER, PASSWORD, AUTH_SOURCE, LOGICAL_NAME, CONNECT_BACKOFF_INITIAL_DELAY_MS, CONNECT_BACKOFF_MAX_DELAY_MS, CONNECT_TIMEOUT_MS, SOCKET_TIMEOUT_MS, SERVER_SELECTION_TIMEOUT_MS, POLL_INTERVAL_SEC, MONGODB_POLL_INTERVAL_MS, MAX_FAILED_CONNECTIONS, AUTO_DISCOVER_MEMBERS, SSL_ENABLED, SSL_ALLOW_INVALID_HOSTNAMES, CURSOR_MAX_AWAIT_TIME_MS).events(DATABASE_WHITELIST, DATABASE_INCLUDE_LIST, DATABASE_BLACKLIST, DATABASE_EXCLUDE_LIST, COLLECTION_WHITELIST, COLLECTION_INCLUDE_LIST, COLLECTION_BLACKLIST, COLLECTION_EXCLUDE_LIST, FIELD_BLACKLIST, FIELD_EXCLUDE_LIST, FIELD_RENAMES, SNAPSHOT_FILTER_QUERY_BY_COLLECTION).connector(MAX_COPY_THREADS, SNAPSHOT_MODE, CAPTURE_MODE, SCHEMA_NAME_ADJUSTMENT_MODE).create();
    public static Field.Set ALL_FIELDS;
    protected static Field.Set EXPOSED_FIELDS;
    private final SnapshotMode snapshotMode;
    private CaptureMode captureMode;
    private final int snapshotMaxThreads;
    private final int cursorMaxAwaitTimeMs;

    public static ConfigDef configDef() {
        return CONFIG_DEFINITION.configDef();
    }

    public MongoDbConnectorConfig(Configuration config) {
        super(config, config.getString(LOGICAL_NAME), 0);
        String snapshotModeValue = config.getString(SNAPSHOT_MODE);
        this.snapshotMode = SnapshotMode.parse(snapshotModeValue, SNAPSHOT_MODE.defaultValueAsString());
        String captureModeValue = config.getString(CAPTURE_MODE);
        this.captureMode = CaptureMode.parse(captureModeValue, CAPTURE_MODE.defaultValueAsString());
        this.snapshotMaxThreads = MongoDbConnectorConfig.resolveSnapshotMaxThreads(config);
        this.cursorMaxAwaitTimeMs = config.getInteger(CURSOR_MAX_AWAIT_TIME_MS, 0);
    }

    private static int validateHosts(Configuration config, Field field, Field.ValidationOutput problems) {
        String hosts = config.getString(field);
        if (hosts == null) {
            problems.accept(field, hosts, "Host specification is required");
            return 1;
        }
        int count = 0;
        if (ReplicaSets.parse(hosts).all().isEmpty()) {
            problems.accept(field, hosts, "Invalid host specification");
            ++count;
        }
        return count;
    }

    private static int validateFieldExcludeList(Configuration config, Field field, Field.ValidationOutput problems) {
        int problemCount = 0;
        String fieldExcludeList = config.getString(FIELD_EXCLUDE_LIST);
        if (fieldExcludeList != null) {
            for (String excludeField : PATTERN_SPILT.split(fieldExcludeList)) {
                if (FIELD_EXCLUDE_LIST_PATTERN.asPredicate().test(excludeField)) continue;
                problems.accept(FIELD_EXCLUDE_LIST, excludeField, excludeField + " has invalid format (expecting " + QUALIFIED_FIELD_EXCLUDE_LIST_PATTERN + ")");
                ++problemCount;
            }
        }
        return problemCount;
    }

    private static int validateFieldRenamesList(Configuration config, Field field, Field.ValidationOutput problems) {
        int problemCount = 0;
        String fieldRenamesList = config.getString(FIELD_RENAMES);
        if (fieldRenamesList != null) {
            for (String renameField : PATTERN_SPILT.split(fieldRenamesList)) {
                if (FIELD_RENAMES_PATTERN.asPredicate().test(renameField)) continue;
                problems.accept(FIELD_EXCLUDE_LIST, renameField, renameField + " has invalid format (expecting " + QUALIFIED_FIELD_RENAMES_PATTERN + ")");
                ++problemCount;
            }
        }
        return problemCount;
    }

    private static int validateCollectionExcludeList(Configuration config, Field field, Field.ValidationOutput problems) {
        String includeList = config.getString(COLLECTION_INCLUDE_LIST);
        String excludeList = config.getString(COLLECTION_EXCLUDE_LIST);
        if (includeList != null && excludeList != null) {
            problems.accept(COLLECTION_EXCLUDE_LIST, excludeList, COLLECTION_INCLUDE_LIST_ALREADY_SPECIFIED_ERROR_MSG);
            return 1;
        }
        return 0;
    }

    private static int validateCollectionBlacklist(Configuration config, Field field, Field.ValidationOutput problems) {
        String whitelist = config.getFallbackStringPropertyWithWarning(COLLECTION_INCLUDE_LIST, COLLECTION_WHITELIST);
        String blacklist = config.getFallbackStringPropertyWithWarning(COLLECTION_EXCLUDE_LIST, COLLECTION_BLACKLIST);
        if (whitelist != null && blacklist != null) {
            problems.accept(COLLECTION_EXCLUDE_LIST, blacklist, COLLECTION_INCLUDE_LIST_ALREADY_SPECIFIED_ERROR_MSG);
            return 1;
        }
        return 0;
    }

    private static int validateDatabaseExcludeList(Configuration config, Field field, Field.ValidationOutput problems) {
        String includeList = config.getString(DATABASE_INCLUDE_LIST);
        String excludeList = config.getString(DATABASE_EXCLUDE_LIST);
        if (includeList != null && excludeList != null) {
            problems.accept(DATABASE_EXCLUDE_LIST, excludeList, DATABASE_INCLUDE_LIST_ALREADY_SPECIFIED_ERROR_MSG);
            return 1;
        }
        return 0;
    }

    private static int validateDatabaseBlacklist(Configuration config, Field field, Field.ValidationOutput problems) {
        String whitelist = config.getFallbackStringPropertyWithWarning(DATABASE_INCLUDE_LIST, DATABASE_WHITELIST);
        String blacklist = config.getFallbackStringPropertyWithWarning(DATABASE_EXCLUDE_LIST, DATABASE_BLACKLIST);
        if (whitelist != null && blacklist != null) {
            problems.accept(DATABASE_BLACKLIST, blacklist, DATABASE_WHITELIST_ALREADY_SPECIFIED_ERROR_MSG);
            return 1;
        }
        return 0;
    }

    public SnapshotMode getSnapshotMode() {
        return this.snapshotMode;
    }

    public CaptureMode getCaptureMode() {
        return this.captureMode;
    }

    public int getCursorMaxAwaitTime() {
        return this.cursorMaxAwaitTimeMs;
    }

    @Override
    public int getSnapshotMaxThreads() {
        return this.snapshotMaxThreads;
    }

    protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStructMaker(CommonConnectorConfig.Version version) {
        switch (version) {
            case V1: {
                return new LegacyV1MongoDbSourceInfoStructMaker(Module.name(), Module.version(), this);
            }
        }
        return new MongoDbSourceInfoStructMaker(Module.name(), Module.version(), this);
    }

    public Optional<String> getSnapshotFilterQueryForCollection(CollectionId collectionId) {
        return Optional.ofNullable(this.getSnapshotFilterQueryByCollection().get(collectionId.dbName() + "." + collectionId.name()));
    }

    public Map<String, String> getSnapshotFilterQueryByCollection() {
        String collectionList = this.getConfig().getString(SNAPSHOT_FILTER_QUERY_BY_COLLECTION);
        if (collectionList == null) {
            return Collections.emptyMap();
        }
        HashMap<String, String> snapshotFilterQueryByCollection = new HashMap<String, String>();
        for (String collection : collectionList.split(",")) {
            snapshotFilterQueryByCollection.put(collection, this.getConfig().getString(SNAPSHOT_FILTER_QUERY_BY_COLLECTION + "." + collection));
        }
        return Collections.unmodifiableMap(snapshotFilterQueryByCollection);
    }

    @Override
    public boolean supportsOperationFiltering() {
        return true;
    }

    @Override
    public String getContextName() {
        return Module.contextName();
    }

    @Override
    public String getConnectorName() {
        return Module.name();
    }

    private static int resolveSnapshotMaxThreads(Configuration config) {
        if (config.hasKey(SNAPSHOT_MAX_THREADS.name())) {
            return config.getInteger(SNAPSHOT_MAX_THREADS);
        }
        if (config.hasKey(MAX_COPY_THREADS.name())) {
            LOGGER.warn("The option '{}' is deprecated.  Use '{}' instead.", (Object)MAX_FAILED_CONNECTIONS.name(), (Object)SNAPSHOT_MAX_THREADS.name());
        }
        return config.getInteger(MAX_COPY_THREADS);
    }

    @Override
    public Optional<String[]> parseSignallingMessage(Struct value) {
        String after = value.getString("after");
        if (after == null) {
            LOGGER.warn("After part of signal '{}' is missing", (Object)value);
            return Optional.empty();
        }
        Document fields = Document.parse(after);
        if (fields.size() != 3) {
            LOGGER.warn("The signal event '{}' should have 3 fields but has {}", (Object)after, (Object)fields.size());
            return Optional.empty();
        }
        String[] result = new String[3];
        int idx = 0;
        for (Object fieldValue : fields.values()) {
            if (fieldValue instanceof Document) {
                result[idx++] = ((Document)fieldValue).toJson();
                continue;
            }
            result[idx++] = fieldValue.toString();
        }
        return Optional.of(result);
    }

    @Override
    public boolean isSignalDataCollection(DataCollectionId dataCollectionId) {
        CollectionId id = (CollectionId)dataCollectionId;
        return this.getSignalingDataCollectionId() != null && this.getSignalingDataCollectionId().equals(id.dbName() + "." + id.name());
    }

    static {
        EXPOSED_FIELDS = ALL_FIELDS = Field.setOf(CONFIG_DEFINITION.all());
    }

    public static enum SnapshotMode implements EnumeratedValue
    {
        INITIAL("initial", true),
        NEVER("never", false);

        private final String value;
        private final boolean includeData;

        private SnapshotMode(String value, boolean includeData) {
            this.value = value;
            this.includeData = includeData;
        }

        @Override
        public String getValue() {
            return this.value;
        }

        public static SnapshotMode parse(String value) {
            if (value == null) {
                return null;
            }
            value = value.trim();
            for (SnapshotMode option : SnapshotMode.values()) {
                if (!option.getValue().equalsIgnoreCase(value)) continue;
                return option;
            }
            return null;
        }

        public static SnapshotMode parse(String value, String defaultValue) {
            SnapshotMode mode = SnapshotMode.parse(value);
            if (mode == null && defaultValue != null) {
                mode = SnapshotMode.parse(defaultValue);
            }
            return mode;
        }
    }

    public static enum CaptureMode implements EnumeratedValue
    {
        OPLOG("oplog", false, false),
        CHANGE_STREAMS("change_streams", true, false),
        CHANGE_STREAMS_UPDATE_FULL("change_streams_update_full", true, true);

        private final String value;
        private final boolean changeStreams;
        private final boolean fullUpdate;

        private CaptureMode(String value, boolean changeStreams, boolean fullUpdate) {
            this.value = value;
            this.changeStreams = changeStreams;
            this.fullUpdate = fullUpdate;
        }

        @Override
        public String getValue() {
            return this.value;
        }

        public static CaptureMode parse(String value) {
            if (value == null) {
                return null;
            }
            value = value.trim();
            for (CaptureMode option : CaptureMode.values()) {
                if (!option.getValue().equalsIgnoreCase(value)) continue;
                return option;
            }
            return null;
        }

        public static CaptureMode parse(String value, String defaultValue) {
            CaptureMode mode = CaptureMode.parse(value);
            if (mode == null && defaultValue != null) {
                mode = CaptureMode.parse(defaultValue);
            }
            return mode;
        }

        public boolean isChangeStreams() {
            return this.changeStreams;
        }

        public boolean isFullUpdate() {
            return this.fullUpdate;
        }
    }
}

