/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.cdc.base.source;

import io.debezium.relational.TableId;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig;
import org.apache.seatunnel.connectors.cdc.base.config.StopConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.HybridSplitAssigner;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSplitAssigner;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.HybridPendingSplitsState;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.IncrementalPhaseState;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.SnapshotPhaseState;
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader;
import org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter;
import org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;
import org.apache.seatunnel.shade.com.google.common.collect.Sets;

public abstract class IncrementalSource<T, C extends SourceConfig>
implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState> {
    protected ReadonlyConfig readonlyConfig;
    protected SourceConfig.Factory<C> configFactory;
    protected OffsetFactory offsetFactory;
    protected DataSourceDialect<C> dataSourceDialect;
    protected StartupConfig startupConfig;
    protected int incrementalParallelism;
    protected StopConfig stopConfig;
    protected List<CatalogTable> catalogTables;
    protected StopMode stopMode;
    protected DebeziumDeserializationSchema<T> deserializationSchema;

    protected IncrementalSource(ReadonlyConfig options, List<CatalogTable> catalogTables) {
        this.catalogTables = catalogTables;
        this.readonlyConfig = options;
        this.startupConfig = this.getStartupConfig(this.readonlyConfig);
        this.stopConfig = this.getStopConfig(this.readonlyConfig);
        this.stopMode = this.stopConfig.getStopMode();
        this.incrementalParallelism = (Integer)this.readonlyConfig.get(SourceOptions.INCREMENTAL_PARALLELISM);
        this.configFactory = this.createSourceConfigFactory(this.readonlyConfig);
        this.dataSourceDialect = this.createDataSourceDialect(this.readonlyConfig);
        this.deserializationSchema = this.createDebeziumDeserializationSchema(this.readonlyConfig);
        this.offsetFactory = this.createOffsetFactory(this.readonlyConfig);
    }

    protected StartupConfig getStartupConfig(ReadonlyConfig config) {
        return new StartupConfig((StartupMode)((Object)config.get(this.getStartupModeOption())), (String)config.get(SourceOptions.STARTUP_SPECIFIC_OFFSET_FILE), (Long)config.get(SourceOptions.STARTUP_SPECIFIC_OFFSET_POS), (Long)config.get(SourceOptions.STARTUP_TIMESTAMP));
    }

    private StopConfig getStopConfig(ReadonlyConfig config) {
        return new StopConfig((StopMode)((Object)config.get(this.getStopModeOption())), (String)config.get(SourceOptions.STOP_SPECIFIC_OFFSET_FILE), (Long)config.get(SourceOptions.STOP_SPECIFIC_OFFSET_POS), (Long)config.get(SourceOptions.STOP_TIMESTAMP));
    }

    public List<CatalogTable> getProducedCatalogTables() {
        if (DeserializeFormat.COMPATIBLE_DEBEZIUM_JSON.equals(this.readonlyConfig.get(JdbcSourceOptions.FORMAT))) {
            return Collections.singletonList(CatalogTableUtil.getCatalogTable((String)"default.default", (SeaTunnelRowType)CompatibleDebeziumJsonDeserializationSchema.DEBEZIUM_DATA_ROW_TYPE));
        }
        return this.catalogTables;
    }

    public abstract Option<StartupMode> getStartupModeOption();

    public abstract Option<StopMode> getStopModeOption();

    public abstract SourceConfig.Factory<C> createSourceConfigFactory(ReadonlyConfig var1);

    public abstract DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(ReadonlyConfig var1);

    public abstract DataSourceDialect<C> createDataSourceDialect(ReadonlyConfig var1);

    public abstract OffsetFactory createOffsetFactory(ReadonlyConfig var1);

    public Boundedness getBoundedness() {
        return this.stopMode == StopMode.NEVER ? Boundedness.UNBOUNDED : Boundedness.BOUNDED;
    }

    public SourceReader<T, SourceSplitBase> createReader(SourceReader.Context readerContext) throws Exception {
        C sourceConfig = this.configFactory.create(readerContext.getIndexOfSubtask());
        LinkedBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue = new LinkedBlockingQueue<RecordsWithSplitIds<SourceRecords>>(2);
        SchemaChangeResolver schemaChangeResolver = this.deserializationSchema.getSchemaChangeResolver();
        Supplier splitReaderSupplier = () -> new IncrementalSourceSplitReader<SourceConfig>(readerContext.getIndexOfSubtask(), (DataSourceDialect<SourceConfig>)this.dataSourceDialect, (SourceConfig)sourceConfig, schemaChangeResolver);
        return new IncrementalSourceReader<T, C>(this.dataSourceDialect, elementsQueue, splitReaderSupplier, this.createRecordEmitter((SourceConfig)sourceConfig, readerContext), new SourceReaderOptions(this.readonlyConfig), readerContext, sourceConfig, this.deserializationSchema);
    }

    protected RecordEmitter<SourceRecords, T, SourceSplitStateBase> createRecordEmitter(SourceConfig sourceConfig, SourceReader.Context context) {
        return new IncrementalSourceRecordEmitter<T>(this.deserializationSchema, this.offsetFactory, context);
    }

    public SourceSplitEnumerator<SourceSplitBase, PendingSplitsState> createEnumerator(SourceSplitEnumerator.Context<SourceSplitBase> enumeratorContext) throws Exception {
        SplitAssigner splitAssigner;
        C sourceConfig = this.configFactory.create(0);
        List<TableId> remainingTables = this.dataSourceDialect.discoverDataCollections(sourceConfig);
        SplitAssigner.Context<C> assignerContext = new SplitAssigner.Context<C>(sourceConfig, new HashSet<TableId>(remainingTables), new HashMap<String, SnapshotSplit>(), new HashMap<String, SnapshotSplitWatermark>());
        if (sourceConfig.getStartupConfig().getStartupMode() == StartupMode.INITIAL) {
            try {
                boolean isTableIdCaseSensitive = this.dataSourceDialect.isDataCollectionIdCaseSensitive(sourceConfig);
                splitAssigner = new HybridSplitAssigner<C>(assignerContext, enumeratorContext.currentParallelism(), this.incrementalParallelism, remainingTables, isTableIdCaseSensitive, this.dataSourceDialect, this.offsetFactory);
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to discover captured tables for enumerator", e);
            }
        } else {
            splitAssigner = new IncrementalSplitAssigner<C>(assignerContext, this.incrementalParallelism, this.offsetFactory);
        }
        return new IncrementalSourceEnumerator(enumeratorContext, splitAssigner);
    }

    public SourceSplitEnumerator<SourceSplitBase, PendingSplitsState> restoreEnumerator(SourceSplitEnumerator.Context<SourceSplitBase> enumeratorContext, PendingSplitsState checkpointState) throws Exception {
        SplitAssigner splitAssigner;
        C sourceConfig = this.configFactory.create(0);
        HashSet<TableId> capturedTables = new HashSet<TableId>(this.dataSourceDialect.discoverDataCollections(sourceConfig));
        if (checkpointState instanceof HybridPendingSplitsState) {
            checkpointState = this.restore(capturedTables, (HybridPendingSplitsState)checkpointState);
            SnapshotPhaseState checkpointSnapshotState = ((HybridPendingSplitsState)checkpointState).getSnapshotPhaseState();
            SplitAssigner.Context<C> assignerContext = new SplitAssigner.Context<C>(sourceConfig, capturedTables, checkpointSnapshotState.getAssignedSplits(), checkpointSnapshotState.getSplitCompletedOffsets());
            splitAssigner = new HybridSplitAssigner<C>(assignerContext, enumeratorContext.currentParallelism(), this.incrementalParallelism, (HybridPendingSplitsState)checkpointState, this.dataSourceDialect, this.offsetFactory);
        } else if (checkpointState instanceof IncrementalPhaseState) {
            SplitAssigner.Context<C> assignerContext = new SplitAssigner.Context<C>(sourceConfig, capturedTables, new HashMap<String, SnapshotSplit>(), new HashMap<String, SnapshotSplitWatermark>());
            splitAssigner = new IncrementalSplitAssigner<C>(assignerContext, this.incrementalParallelism, this.offsetFactory);
        } else {
            throw new UnsupportedOperationException("Unsupported restored PendingSplitsState: " + checkpointState);
        }
        return new IncrementalSourceEnumerator(enumeratorContext, splitAssigner);
    }

    private HybridPendingSplitsState restore(Set<TableId> capturedTables, HybridPendingSplitsState checkpointState) {
        SnapshotPhaseState checkpointSnapshotState = checkpointState.getSnapshotPhaseState();
        Set checkpointCapturedTables = Stream.concat(checkpointSnapshotState.getAlreadyProcessedTables().stream(), checkpointSnapshotState.getRemainingTables().stream()).collect(Collectors.toSet());
        Sets.SetView newTables = Sets.difference(capturedTables, checkpointCapturedTables);
        Sets.SetView deletedTables = Sets.difference(checkpointCapturedTables, capturedTables);
        checkpointSnapshotState.getRemainingTables().addAll((Collection<TableId>)newTables);
        checkpointSnapshotState.getRemainingTables().removeAll((Collection<?>)deletedTables);
        checkpointSnapshotState.getAlreadyProcessedTables().removeAll((Collection<?>)deletedTables);
        HashSet<String> deletedSplitIds = new HashSet<String>();
        Iterator<SnapshotSplit> splitIterator = checkpointSnapshotState.getRemainingSplits().iterator();
        while (splitIterator.hasNext()) {
            SnapshotSplit split = splitIterator.next();
            if (!deletedTables.contains(split.getTableId())) continue;
            splitIterator.remove();
            deletedSplitIds.add(split.splitId());
        }
        for (Map.Entry<String, SnapshotSplit> entry : checkpointSnapshotState.getAssignedSplits().entrySet()) {
            SnapshotSplit split = entry.getValue();
            if (!deletedTables.contains(split.getTableId())) continue;
            deletedSplitIds.add(entry.getKey());
        }
        deletedSplitIds.forEach(splitId -> {
            checkpointSnapshotState.getAssignedSplits().remove(splitId);
            checkpointSnapshotState.getSplitCompletedOffsets().remove(splitId);
        });
        if (!(checkpointSnapshotState.getRemainingTables().isEmpty() && checkpointSnapshotState.getRemainingSplits().isEmpty() || !checkpointSnapshotState.isAssignerCompleted())) {
            return new HybridPendingSplitsState(new SnapshotPhaseState(checkpointSnapshotState.getAlreadyProcessedTables(), checkpointSnapshotState.getRemainingSplits(), checkpointSnapshotState.getAssignedSplits(), checkpointSnapshotState.getSplitCompletedOffsets(), false, checkpointSnapshotState.getRemainingTables(), checkpointSnapshotState.isTableIdCaseSensitive(), checkpointSnapshotState.isRemainingTablesCheckpointed()), checkpointState.getIncrementalPhaseState());
        }
        return checkpointState;
    }

    public IncrementalSource() {
    }
}

