/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.base.source.assigner;

import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.assigner.AssignerStatus;
import org.apache.flink.cdc.connectors.base.source.assigner.SplitAssigner;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.base.source.assigner.state.SnapshotPendingSplitsState;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.base.source.meta.split.SchemalessSnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.metrics.SourceEnumeratorMetrics;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotSplitAssigner<C extends SourceConfig>
implements SplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitAssigner.class);
    private final List<TableId> alreadyProcessedTables;
    private final List<SchemalessSnapshotSplit> remainingSplits;
    private final Map<String, SchemalessSnapshotSplit> assignedSplits;
    private final Map<TableId, TableChanges.TableChange> tableSchemas;
    private final Map<String, Offset> splitFinishedOffsets;
    private AssignerStatus assignerStatus;
    private final C sourceConfig;
    private final int currentParallelism;
    private final List<TableId> remainingTables;
    private final boolean isRemainingTablesCheckpointed;
    private ChunkSplitter chunkSplitter;
    private boolean isTableIdCaseSensitive;
    @Nullable
    private Long checkpointIdToFinish;
    private final DataSourceDialect<C> dialect;
    private final OffsetFactory offsetFactory;
    private SourceEnumeratorMetrics enumeratorMetrics;
    private final Map<String, Long> splitFinishedCheckpointIds;
    private static final long UNDEFINED_CHECKPOINT_ID = -1L;
    private final Object lock = new Object();
    private ExecutorService splittingExecutorService;
    private volatile Throwable uncaughtSplitterException;

    public SnapshotSplitAssigner(C sourceConfig, int currentParallelism, List<TableId> remainingTables, boolean isTableIdCaseSensitive, DataSourceDialect<C> dialect, OffsetFactory offsetFactory) {
        this(sourceConfig, currentParallelism, new ArrayList<TableId>(), new ArrayList<SchemalessSnapshotSplit>(), new LinkedHashMap<String, SchemalessSnapshotSplit>(), new HashMap<TableId, TableChanges.TableChange>(), new HashMap<String, Offset>(), AssignerStatus.INITIAL_ASSIGNING, remainingTables, isTableIdCaseSensitive, true, dialect, offsetFactory, new ConcurrentHashMap<String, Long>(), ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
    }

    public SnapshotSplitAssigner(C sourceConfig, int currentParallelism, SnapshotPendingSplitsState checkpoint, DataSourceDialect<C> dialect, OffsetFactory offsetFactory) {
        this(sourceConfig, currentParallelism, checkpoint.getAlreadyProcessedTables(), checkpoint.getRemainingSplits(), checkpoint.getAssignedSplits(), checkpoint.getTableSchemas(), checkpoint.getSplitFinishedOffsets(), checkpoint.getSnapshotAssignerStatus(), checkpoint.getRemainingTables(), checkpoint.isTableIdCaseSensitive(), checkpoint.isRemainingTablesCheckpointed(), dialect, offsetFactory, new ConcurrentHashMap<String, Long>(), checkpoint.getChunkSplitterState());
    }

    private SnapshotSplitAssigner(C sourceConfig, int currentParallelism, List<TableId> alreadyProcessedTables, List<SchemalessSnapshotSplit> remainingSplits, Map<String, SchemalessSnapshotSplit> assignedSplits, Map<TableId, TableChanges.TableChange> tableSchemas, Map<String, Offset> splitFinishedOffsets, AssignerStatus assignerStatus, List<TableId> remainingTables, boolean isTableIdCaseSensitive, boolean isRemainingTablesCheckpointed, DataSourceDialect<C> dialect, OffsetFactory offsetFactory, Map<String, Long> splitFinishedCheckpointIds, ChunkSplitterState chunkSplitterState) {
        this.sourceConfig = sourceConfig;
        this.currentParallelism = currentParallelism;
        this.alreadyProcessedTables = alreadyProcessedTables;
        this.remainingSplits = remainingSplits;
        this.assignedSplits = assignedSplits.entrySet().stream().sorted(Map.Entry.comparingByKey()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (o, o2) -> o, LinkedHashMap::new));
        this.tableSchemas = tableSchemas;
        this.splitFinishedOffsets = splitFinishedOffsets;
        this.assignerStatus = assignerStatus;
        this.remainingTables = new CopyOnWriteArrayList<TableId>(remainingTables);
        this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
        this.isTableIdCaseSensitive = isTableIdCaseSensitive;
        this.dialect = dialect;
        this.offsetFactory = offsetFactory;
        this.splitFinishedCheckpointIds = splitFinishedCheckpointIds;
        this.chunkSplitter = SnapshotSplitAssigner.createChunkSplitter(sourceConfig, dialect, chunkSplitterState);
    }

    @Override
    public void open() {
        this.chunkSplitter.open();
        this.discoveryCaptureTables();
        this.captureNewlyAddedTables();
        this.startAsynchronouslySplit();
    }

    private void discoveryCaptureTables() {
        if (this.needToDiscoveryTables()) {
            long start = System.currentTimeMillis();
            LOG.debug("The remainingTables is empty, start to discovery tables");
            try {
                List<TableId> discoverTables = this.dialect.discoverDataCollections(this.sourceConfig);
                this.remainingTables.addAll(discoverTables);
                LOG.debug("Discovery tables success, time cost: {} ms.", (Object)(System.currentTimeMillis() - start));
            }
            catch (Exception e) {
                throw new FlinkRuntimeException("Failed to discovery tables to capture", (Throwable)e);
            }
        }
        if (!this.isRemainingTablesCheckpointed && !AssignerStatus.isSnapshotAssigningFinished(this.assignerStatus)) {
            try {
                List<TableId> discoverTables = this.dialect.discoverDataCollections(this.sourceConfig);
                discoverTables.removeAll(this.alreadyProcessedTables);
                this.remainingTables.addAll(discoverTables);
            }
            catch (Exception e) {
                throw new FlinkRuntimeException("Failed to discover remaining tables to capture", (Throwable)e);
            }
        }
    }

    private void captureNewlyAddedTables() {
        if (this.sourceConfig.isScanNewlyAddedTableEnabled() && !this.sourceConfig.getStartupOptions().isSnapshotOnly() && AssignerStatus.isAssigningFinished(this.assignerStatus)) {
            try {
                List<TableId> currentCapturedTables = this.dialect.discoverDataCollections(this.sourceConfig);
                HashSet<Object> previousCapturedTables = new HashSet<Object>();
                List tablesInRemainingSplits = this.remainingSplits.stream().map(SnapshotSplit::getTableId).collect(Collectors.toList());
                previousCapturedTables.addAll(tablesInRemainingSplits);
                previousCapturedTables.addAll(this.alreadyProcessedTables);
                previousCapturedTables.addAll(this.remainingTables);
                HashSet tablesToRemove = new HashSet(previousCapturedTables);
                tablesToRemove.removeAll(currentCapturedTables);
                currentCapturedTables.removeAll(previousCapturedTables);
                List<TableId> newlyAddedTables = currentCapturedTables;
                if (!tablesToRemove.isEmpty()) {
                    LinkedList<String> splitsToRemove = new LinkedList<String>();
                    for (Map.Entry<String, SchemalessSnapshotSplit> splitEntry : this.assignedSplits.entrySet()) {
                        if (!tablesToRemove.contains(splitEntry.getValue().getTableId())) continue;
                        splitsToRemove.add(splitEntry.getKey());
                    }
                    splitsToRemove.forEach(this.assignedSplits.keySet()::remove);
                    splitsToRemove.forEach(this.splitFinishedOffsets.keySet()::remove);
                    this.tableSchemas.entrySet().removeIf(schema -> tablesToRemove.contains(schema.getKey()));
                    LOG.info("Enumerator remove tables after restart: {}", tablesToRemove);
                    this.remainingSplits.removeIf(split -> tablesToRemove.contains(split.getTableId()));
                    this.remainingTables.removeAll(tablesToRemove);
                    this.alreadyProcessedTables.removeIf(tableId -> tablesToRemove.contains(tableId));
                }
                if (!newlyAddedTables.isEmpty()) {
                    LOG.info("Found newly added tables, start capture newly added tables process");
                    this.remainingTables.addAll(newlyAddedTables);
                    if (AssignerStatus.isAssigningFinished(this.assignerStatus)) {
                        LOG.info("Found newly added tables, start capture newly added tables process under stream reading phase");
                        this.startAssignNewlyAddedTables();
                    }
                }
            }
            catch (Exception e) {
                throw new FlinkRuntimeException("Failed to discover remaining tables to capture", (Throwable)e);
            }
        }
    }

    public void initEnumeratorMetrics(SourceEnumeratorMetrics enumeratorMetrics) {
        this.enumeratorMetrics = enumeratorMetrics;
        this.enumeratorMetrics.enterSnapshotPhase();
        this.enumeratorMetrics.registerMetrics((Gauge<Integer>)((Gauge)this.alreadyProcessedTables::size), (Gauge<Integer>)((Gauge)this.assignedSplits::size), (Gauge<Integer>)((Gauge)this.remainingSplits::size));
        this.enumeratorMetrics.addNewTables(this.computeTablesPendingSnapshot());
        for (SchemalessSnapshotSplit snapshotSplit : this.remainingSplits) {
            this.enumeratorMetrics.getTableMetrics(snapshotSplit.getTableId()).addNewSplit(snapshotSplit.splitId());
        }
        for (SchemalessSnapshotSplit snapshotSplit : this.assignedSplits.values()) {
            this.enumeratorMetrics.getTableMetrics(snapshotSplit.getTableId()).addProcessedSplit(snapshotSplit.splitId());
        }
        for (String splitId : this.splitFinishedOffsets.keySet()) {
            TableId tableId = SnapshotSplit.extractTableId(splitId);
            this.enumeratorMetrics.getTableMetrics(tableId).addFinishedSplit(splitId);
        }
    }

    private int computeTablesPendingSnapshot() {
        int numTablesPendingSnapshot = this.remainingTables.size();
        HashSet<TableId> computedTables = new HashSet<TableId>();
        for (SchemalessSnapshotSplit split : this.remainingSplits) {
            TableId tableId = split.getTableId();
            if (computedTables.contains(tableId) || this.alreadyProcessedTables.contains(tableId) || this.remainingTables.contains(tableId)) continue;
            computedTables.add(tableId);
            ++numTablesPendingSnapshot;
        }
        return numTablesPendingSnapshot;
    }

    private void startAsynchronouslySplit() {
        if (this.chunkSplitter.hasNextChunk() || !this.remainingTables.isEmpty()) {
            if (this.splittingExecutorService == null) {
                ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
                this.splittingExecutorService = Executors.newSingleThreadExecutor(threadFactory);
            }
            this.splittingExecutorService.submit(this::splitChunksForRemainingTables);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void splitTable(TableId nextTable) {
        LOG.info("Start splitting table {} into chunks...", (Object)nextTable);
        long start = System.currentTimeMillis();
        int chunkNum = 0;
        boolean hasRecordSchema = false;
        do {
            Object object = this.lock;
            synchronized (object) {
                Collection<SnapshotSplit> splits;
                try {
                    splits = this.chunkSplitter.generateSplits(nextTable);
                }
                catch (Exception e) {
                    throw new IllegalStateException("Error when splitting chunks for " + nextTable, e);
                }
                if (!hasRecordSchema && !splits.isEmpty()) {
                    hasRecordSchema = true;
                    this.tableSchemas.putAll(splits.iterator().next().getTableSchemas());
                }
                ArrayList<String> splitIds = new ArrayList<String>();
                for (SnapshotSplit split : splits) {
                    SchemalessSnapshotSplit schemalessSnapshotSplit = split.toSchemalessSnapshotSplit();
                    splitIds.add(schemalessSnapshotSplit.splitId());
                    if (this.sourceConfig.isAssignUnboundedChunkFirst() && split.getSplitEnd() == null) {
                        this.remainingSplits.add(0, schemalessSnapshotSplit);
                        continue;
                    }
                    this.remainingSplits.add(schemalessSnapshotSplit);
                }
                chunkNum += splits.size();
                this.enumeratorMetrics.getTableMetrics(nextTable).addNewSplits(splitIds);
                if (!this.chunkSplitter.hasNextChunk()) {
                    this.remainingTables.remove(nextTable);
                }
                this.lock.notify();
            }
        } while (this.chunkSplitter.hasNextChunk());
        long end = System.currentTimeMillis();
        LOG.info("Split table {} into {} chunks, time cost: {}ms.", new Object[]{nextTable, chunkNum, end - start});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Optional<SourceSplitBase> getNext() {
        Object object = this.lock;
        synchronized (object) {
            this.checkSplitterErrors();
            if (!this.remainingSplits.isEmpty()) {
                Iterator<SchemalessSnapshotSplit> iterator = this.remainingSplits.iterator();
                SchemalessSnapshotSplit split = iterator.next();
                iterator.remove();
                this.assignedSplits.put(split.splitId(), split);
                this.addAlreadyProcessedTablesIfNotExists(split.getTableId());
                this.enumeratorMetrics.getTableMetrics(split.getTableId()).finishProcessSplit(split.splitId());
                return Optional.of(split.toSnapshotSplit(this.tableSchemas.get(split.getTableId())));
            }
            if (!this.remainingTables.isEmpty()) {
                try {
                    this.lock.wait();
                }
                catch (InterruptedException e) {
                    throw new FlinkRuntimeException("InterruptedException while waiting for asynchronously snapshot split");
                }
                return this.getNext();
            }
            this.closeExecutorService();
            return Optional.empty();
        }
    }

    @Override
    public boolean waitingForFinishedSplits() {
        return !this.allSnapshotSplitsFinished();
    }

    @Override
    public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
        if (this.waitingForFinishedSplits()) {
            LOG.error("The assigner is not ready to offer finished split information, this should not be called");
            throw new FlinkRuntimeException("The assigner is not ready to offer finished split information, this should not be called");
        }
        ArrayList<SchemalessSnapshotSplit> assignedSnapshotSplit = new ArrayList<SchemalessSnapshotSplit>(this.assignedSplits.values());
        ArrayList<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<FinishedSnapshotSplitInfo>();
        for (SchemalessSnapshotSplit split : assignedSnapshotSplit) {
            Offset finishedOffset = this.splitFinishedOffsets.get(split.splitId());
            finishedSnapshotSplitInfos.add(new FinishedSnapshotSplitInfo(split.getTableId(), split.splitId(), split.getSplitStart(), split.getSplitEnd(), finishedOffset, this.offsetFactory));
        }
        return finishedSnapshotSplitInfos;
    }

    @Override
    public void onFinishedSplits(Map<String, Offset> splitFinishedOffsets) {
        this.splitFinishedOffsets.putAll(splitFinishedOffsets);
        for (String splitId : splitFinishedOffsets.keySet()) {
            this.splitFinishedCheckpointIds.put(splitId, -1L);
        }
        LOG.info("splitFinishedCheckpointIds size in onFinishedSplits: {}", (Object)(this.splitFinishedCheckpointIds == null ? 0 : this.splitFinishedCheckpointIds.size()));
        if (this.allSnapshotSplitsFinished() && AssignerStatus.isAssigningSnapshotSplits(this.assignerStatus)) {
            if (this.currentParallelism == 1) {
                this.assignerStatus = this.assignerStatus.onFinish();
                LOG.info("Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status.");
            } else {
                LOG.info("Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished.");
            }
        }
    }

    @Override
    public void addSplits(Collection<SourceSplitBase> splits) {
        for (SourceSplitBase split : splits) {
            this.tableSchemas.putAll(split.asSnapshotSplit().getTableSchemas());
            this.remainingSplits.add(split.asSnapshotSplit().toSchemalessSnapshotSplit());
            this.assignedSplits.remove(split.splitId());
            this.splitFinishedOffsets.remove(split.splitId());
            this.enumeratorMetrics.getTableMetrics(split.asSnapshotSplit().getTableId()).reprocessSplit(split.splitId());
            TableId tableId = split.asSnapshotSplit().getTableId();
            this.enumeratorMetrics.getTableMetrics(tableId).removeFinishedSplit(split.splitId());
        }
    }

    @Override
    public SnapshotPendingSplitsState snapshotState(long checkpointId) {
        if (this.splitFinishedCheckpointIds != null && !this.splitFinishedCheckpointIds.isEmpty()) {
            for (Map.Entry<String, Long> splitFinishedCheckpointId : this.splitFinishedCheckpointIds.entrySet()) {
                if (splitFinishedCheckpointId.getValue() != -1L) continue;
                splitFinishedCheckpointId.setValue(checkpointId);
            }
            LOG.info("SnapshotSplitAssigner snapshotState on checkpoint {} with splitFinishedCheckpointIds size {}.", (Object)checkpointId, (Object)this.splitFinishedCheckpointIds.size());
        }
        SnapshotPendingSplitsState state = new SnapshotPendingSplitsState(this.alreadyProcessedTables, this.remainingSplits, this.assignedSplits, this.tableSchemas, this.splitFinishedOffsets, this.assignerStatus, this.remainingTables, this.isTableIdCaseSensitive, true, this.splitFinishedCheckpointIds, this.chunkSplitter.snapshotState(checkpointId));
        if (this.checkpointIdToFinish == null && AssignerStatus.isAssigningSnapshotSplits(this.assignerStatus) && this.allSnapshotSplitsFinished()) {
            this.checkpointIdToFinish = checkpointId;
        }
        return state;
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        if (this.checkpointIdToFinish != null && AssignerStatus.isAssigningSnapshotSplits(this.assignerStatus) && this.allSnapshotSplitsFinished()) {
            if (checkpointId >= this.checkpointIdToFinish) {
                this.assignerStatus = this.assignerStatus.onFinish();
            }
            LOG.info("Snapshot split assigner is turn into finished status.");
        }
        if (this.splitFinishedCheckpointIds != null && !this.splitFinishedCheckpointIds.isEmpty()) {
            Iterator<Map.Entry<String, Long>> iterator = this.splitFinishedCheckpointIds.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, Long> splitFinishedCheckpointId = iterator.next();
                String splitId = splitFinishedCheckpointId.getKey();
                Long splitCheckpointId = splitFinishedCheckpointId.getValue();
                if (splitCheckpointId == -1L || checkpointId < splitCheckpointId) continue;
                TableId tableId = SnapshotSplit.extractTableId(splitId);
                this.enumeratorMetrics.getTableMetrics(tableId).addFinishedSplit(splitId);
                iterator.remove();
            }
            LOG.info("Checkpoint completed on checkpoint {} with splitFinishedCheckpointIds size {}.", (Object)checkpointId, (Object)this.splitFinishedCheckpointIds.size());
        }
    }

    @Override
    public void close() throws IOException {
        this.closeExecutorService();
        if (this.chunkSplitter != null) {
            try {
                this.chunkSplitter.close();
            }
            catch (Exception e) {
                LOG.warn("Fail to close the chunk splitter.");
            }
        }
        this.dialect.close();
    }

    private void closeExecutorService() {
        if (this.splittingExecutorService != null) {
            this.splittingExecutorService.shutdown();
        }
    }

    private void addAlreadyProcessedTablesIfNotExists(TableId tableId) {
        if (!this.alreadyProcessedTables.contains(tableId)) {
            this.alreadyProcessedTables.add(tableId);
            this.enumeratorMetrics.startSnapshotTables(1);
        }
    }

    @Override
    public boolean noMoreSplits() {
        return this.remainingTables.isEmpty() && this.remainingSplits.isEmpty();
    }

    public boolean needToDiscoveryTables() {
        return this.remainingTables.isEmpty() && this.remainingSplits.isEmpty() && this.alreadyProcessedTables.isEmpty();
    }

    public Map<String, SchemalessSnapshotSplit> getAssignedSplits() {
        return this.assignedSplits;
    }

    @Override
    public AssignerStatus getAssignerStatus() {
        return this.assignerStatus;
    }

    @Override
    public void startAssignNewlyAddedTables() {
        Preconditions.checkState((boolean)AssignerStatus.isAssigningFinished(this.assignerStatus), (String)"Invalid assigner status %s", (Object[])new Object[]{this.assignerStatus});
        this.assignerStatus = this.assignerStatus.startAssignNewlyTables();
    }

    @Override
    public void onStreamSplitUpdated() {
        Preconditions.checkState((boolean)AssignerStatus.isNewlyAddedAssigningSnapshotFinished(this.assignerStatus), (String)"Invalid assigner status %s", (Object[])new Object[]{this.assignerStatus});
        this.assignerStatus = this.assignerStatus.onStreamSplitUpdated();
    }

    public Map<TableId, TableChanges.TableChange> getTableSchemas() {
        return this.tableSchemas;
    }

    public Map<String, Offset> getSplitFinishedOffsets() {
        return this.splitFinishedOffsets;
    }

    private boolean allSnapshotSplitsFinished() {
        return this.noMoreSplits() && this.assignedSplits.size() == this.splitFinishedOffsets.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void splitChunksForRemainingTables() {
        try {
            if (this.chunkSplitter.hasNextChunk()) {
                LOG.info("Start splitting remaining chunks for table {}", (Object)this.chunkSplitter.getCurrentSplittingTableId());
                this.splitTable(this.chunkSplitter.getCurrentSplittingTableId());
            }
            for (TableId nextTable : this.remainingTables) {
                this.splitTable(nextTable);
            }
        }
        catch (Throwable e) {
            Object object = this.lock;
            synchronized (object) {
                if (this.uncaughtSplitterException == null) {
                    this.uncaughtSplitterException = e;
                } else {
                    this.uncaughtSplitterException.addSuppressed(e);
                }
                this.lock.notify();
            }
        }
    }

    private void checkSplitterErrors() {
        if (this.uncaughtSplitterException != null) {
            throw new FlinkRuntimeException("Chunk splitting has encountered exception", this.uncaughtSplitterException);
        }
    }

    private static ChunkSplitter createChunkSplitter(SourceConfig sourceConfig, DataSourceDialect dataSourceDialect, ChunkSplitterState chunkSplitterState) {
        TableId tableId = chunkSplitterState.getCurrentSplittingTableId();
        return dataSourceDialect.createChunkSplitter(sourceConfig, !ChunkSplitterState.NO_SPLITTING_TABLE_STATE.equals(chunkSplitterState) && tableId != null && dataSourceDialect.isIncludeDataCollection(sourceConfig, tableId) ? chunkSplitterState : ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
    }
}

