/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.base.source.assigner.splitter;

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkRange;
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtils;
import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public abstract class JdbcSourceChunkSplitter
implements ChunkSplitter {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceChunkSplitter.class);
    protected final JdbcSourceConfig sourceConfig;
    protected final JdbcDataSourceDialect dialect;
    private final Object lock = new Object();
    @Nullable
    private TableId currentSplittingTableId;
    @Nullable
    private ChunkSplitterState.ChunkBound nextChunkStart;
    @Nullable
    private Integer nextChunkId;
    private JdbcConnection jdbcConnection;
    private Table currentSplittingTable;
    private TableChanges.TableChange currentSchema;
    private Column splitColumn;
    private RowType splitType;
    private Object[] minMaxOfSplitColumn;
    private long approximateRowCnt;

    public JdbcSourceChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect, ChunkSplitterState chunkSplitterState) {
        this(sourceConfig, dialect, chunkSplitterState.getCurrentSplittingTableId(), chunkSplitterState.getNextChunkStart(), chunkSplitterState.getNextChunkId());
    }

    public JdbcSourceChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect, @Nullable TableId currentSplittingTableId, @Nullable ChunkSplitterState.ChunkBound nextChunkStart, @Nullable Integer nextChunkId) {
        this.sourceConfig = sourceConfig;
        this.dialect = dialect;
        this.currentSplittingTableId = currentSplittingTableId;
        this.nextChunkStart = nextChunkStart;
        this.nextChunkId = nextChunkId;
    }

    @Override
    public void open() {
        this.jdbcConnection = this.dialect.openJdbcConnection(this.sourceConfig);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Collection<SnapshotSplit> generateSplits(TableId tableId) throws Exception {
        if (!this.hasNextChunk()) {
            this.analyzeTable(tableId);
            Optional<List<SnapshotSplit>> evenlySplitChunks = this.trySplitAllEvenlySizedChunks(tableId);
            if (evenlySplitChunks.isPresent()) {
                return evenlySplitChunks.get();
            }
            Object object = this.lock;
            synchronized (object) {
                this.currentSplittingTableId = tableId;
                this.nextChunkStart = ChunkSplitterState.ChunkBound.START_BOUND;
                this.nextChunkId = 0;
                return Collections.singletonList(this.splitOneUnevenlySizedChunk(tableId));
            }
        }
        Preconditions.checkState((boolean)this.currentSplittingTableId.equals(tableId), (Object)"Can not split a new table before the previous table splitting finish.");
        if (this.currentSplittingTable == null) {
            this.analyzeTable(this.currentSplittingTableId);
        }
        Object object = this.lock;
        synchronized (object) {
            return Collections.singletonList(this.splitOneUnevenlySizedChunk(tableId));
        }
    }

    @Override
    public boolean hasNextChunk() {
        return this.currentSplittingTableId != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ChunkSplitterState snapshotState(long checkpointId) {
        Object object = this.lock;
        synchronized (object) {
            return new ChunkSplitterState(this.currentSplittingTableId, this.nextChunkStart, this.nextChunkId);
        }
    }

    @Override
    public TableId getCurrentSplittingTableId() {
        return this.currentSplittingTableId;
    }

    @Override
    public void close() throws Exception {
        if (this.jdbcConnection != null) {
            this.jdbcConnection.close();
        }
    }

    protected abstract Object queryNextChunkMax(JdbcConnection var1, TableId var2, Column var3, int var4, Object var5) throws SQLException;

    protected abstract Long queryApproximateRowCnt(JdbcConnection var1, TableId var2) throws SQLException;

    protected boolean isEvenlySplitColumn(Column splitColumn) {
        DataType flinkType = this.fromDbzColumn(splitColumn);
        LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot();
        return typeRoot == LogicalTypeRoot.BIGINT || typeRoot == LogicalTypeRoot.INTEGER || typeRoot == LogicalTypeRoot.DECIMAL;
    }

    protected abstract DataType fromDbzColumn(Column var1);

    protected double calculateDistributionFactor(TableId tableId, Object min, Object max, long approximateRowCnt) {
        if (!min.getClass().equals(max.getClass())) {
            throw new IllegalStateException(String.format("Unsupported operation type, the MIN value type %s is different with MAX value type %s.", min.getClass().getSimpleName(), max.getClass().getSimpleName()));
        }
        if (approximateRowCnt == 0L) {
            return Double.MAX_VALUE;
        }
        BigDecimal difference = ObjectUtils.minus(max, min);
        BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1L));
        double distributionFactor = subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, 2).doubleValue();
        LOG.info("The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}", new Object[]{tableId, distributionFactor, min, max, approximateRowCnt});
        return distributionFactor;
    }

    protected Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) {
        return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumn);
    }

    protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) {
        return ObjectUtils.compare(chunkEnd, max) <= 0;
    }

    protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) {
        return ObjectUtils.compare(chunkEnd, max) >= 0;
    }

    protected Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
        return JdbcChunkUtils.queryMinMax(jdbc, jdbc.quotedTableIdString(tableId), jdbc.quotedColumnIdString(splitColumn.name()));
    }

    protected Object queryMin(JdbcConnection jdbc, TableId tableId, Column splitColumn, Object excludedLowerBound) throws SQLException {
        return JdbcChunkUtils.queryMin(jdbc, jdbc.quotedTableIdString(tableId), jdbc.quotedColumnIdString(splitColumn.name()), excludedLowerBound);
    }

    private RowType getSplitType(Column splitColumn) {
        return (RowType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)splitColumn.name(), (DataType)this.fromDbzColumn(splitColumn))}).getLogicalType();
    }

    private Optional<List<SnapshotSplit>> trySplitAllEvenlySizedChunks(TableId tableId) {
        LOG.debug("Try evenly splitting table {} into chunks", (Object)tableId);
        Object min = this.minMaxOfSplitColumn[0];
        Object max = this.minMaxOfSplitColumn[1];
        if (min == null || max == null || min.equals(max)) {
            return Optional.of(this.createSnapshotSplit(tableId, Collections.singletonList(ChunkRange.all())));
        }
        int chunkSize = this.sourceConfig.getSplitSize();
        int dynamicChunkSize = this.getDynamicChunkSize(tableId, this.splitColumn, min, max, chunkSize, this.approximateRowCnt);
        if (dynamicChunkSize != -1) {
            LOG.debug("finish evenly splitting table {} into chunks", (Object)tableId);
            List<ChunkRange> chunks = this.splitEvenlySizedChunks(tableId, min, max, this.approximateRowCnt, chunkSize, dynamicChunkSize);
            return Optional.of(this.createSnapshotSplit(tableId, chunks));
        }
        LOG.debug("beginning unevenly splitting table {} into chunks", (Object)tableId);
        return Optional.empty();
    }

    private void analyzeTable(TableId tableId) {
        try {
            this.currentSchema = this.dialect.queryTableSchema(this.jdbcConnection, tableId);
            this.currentSplittingTable = Objects.requireNonNull(this.currentSchema).getTable();
            this.splitColumn = this.getSplitColumn(this.currentSplittingTable, this.sourceConfig.getChunkKeyColumn());
            this.splitType = this.getSplitType(this.splitColumn);
            this.minMaxOfSplitColumn = this.queryMinMax(this.jdbcConnection, tableId, this.splitColumn);
            this.approximateRowCnt = this.queryApproximateRowCnt(this.jdbcConnection, tableId);
        }
        catch (Exception e) {
            throw new RuntimeException("Fail to analyze table in chunk splitter.", e);
        }
    }

    private SnapshotSplit splitOneUnevenlySizedChunk(TableId tableId) throws SQLException {
        int chunkSize = this.sourceConfig.getSplitSize();
        Object chunkStartVal = this.nextChunkStart.getValue();
        LOG.info("Use unevenly-sized chunks for table {}, the chunk size is {} from {}", new Object[]{tableId, chunkSize, this.nextChunkStart == ChunkSplitterState.ChunkBound.START_BOUND ? "null" : chunkStartVal.toString()});
        Object chunkEnd = this.nextChunkEnd(this.jdbcConnection, this.nextChunkStart == ChunkSplitterState.ChunkBound.START_BOUND ? this.minMaxOfSplitColumn[0] : chunkStartVal, tableId, this.splitColumn, this.minMaxOfSplitColumn[1], chunkSize);
        this.maySleep(this.nextChunkId, tableId);
        if (chunkEnd != null && this.isChunkEndLeMax(chunkEnd, this.minMaxOfSplitColumn[1], this.splitColumn)) {
            this.nextChunkStart = ChunkSplitterState.ChunkBound.middleOf(chunkEnd);
            Integer n = this.nextChunkId;
            Integer n2 = this.nextChunkId = Integer.valueOf(this.nextChunkId + 1);
            return this.createSnapshotSplit(tableId, n, this.splitType, chunkStartVal, chunkEnd);
        }
        this.currentSplittingTableId = null;
        this.nextChunkStart = ChunkSplitterState.ChunkBound.END_BOUND;
        Integer n = this.nextChunkId;
        Integer n3 = this.nextChunkId = Integer.valueOf(this.nextChunkId + 1);
        return this.createSnapshotSplit(tableId, n, this.splitType, chunkStartVal, null);
    }

    private int getDynamicChunkSize(TableId tableId, Column splitColumn, Object min, Object max, int chunkSize, long approximateRowCnt) {
        if (!this.isEvenlySplitColumn(splitColumn)) {
            return -1;
        }
        double distributionFactorUpper = this.sourceConfig.getDistributionFactorUpper();
        double distributionFactorLower = this.sourceConfig.getDistributionFactorLower();
        double distributionFactor = this.calculateDistributionFactor(tableId, min, max, approximateRowCnt);
        boolean dataIsEvenlyDistributed = ObjectUtils.doubleCompare(distributionFactor, distributionFactorLower) >= 0 && ObjectUtils.doubleCompare(distributionFactor, distributionFactorUpper) <= 0;
        LOG.info("The actual distribution factor for table {} is {}, the lower bound of evenly distribution factor is {}, the upper bound of evenly distribution factor is {}", new Object[]{tableId, distributionFactor, distributionFactorLower, distributionFactorUpper});
        if (dataIsEvenlyDistributed) {
            return Math.max((int)(distributionFactor * (double)chunkSize), 1);
        }
        return -1;
    }

    private List<ChunkRange> splitEvenlySizedChunks(TableId tableId, Object min, Object max, long approximateRowCnt, int chunkSize, int dynamicChunkSize) {
        LOG.info("Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", new Object[]{tableId, approximateRowCnt, chunkSize, dynamicChunkSize});
        if (approximateRowCnt <= (long)chunkSize) {
            return Collections.singletonList(ChunkRange.all());
        }
        ArrayList<ChunkRange> splits = new ArrayList<ChunkRange>();
        Object chunkStart = null;
        Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
        while (ObjectUtils.compare(chunkEnd, max) <= 0) {
            splits.add(ChunkRange.of(chunkStart, chunkEnd));
            chunkStart = chunkEnd;
            try {
                chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
            }
            catch (ArithmeticException e) {
                // empty catch block
                break;
            }
        }
        splits.add(ChunkRange.of(chunkStart, null));
        return splits;
    }

    private List<ChunkRange> splitUnevenlySizedChunks(JdbcConnection jdbc, TableId tableId, Column splitColumn, Object min, Object max, int chunkSize) throws SQLException {
        LOG.info("Use unevenly-sized chunks for table {}, the chunk size is {}", (Object)tableId, (Object)chunkSize);
        ArrayList<ChunkRange> splits = new ArrayList<ChunkRange>();
        Object chunkStart = null;
        Object chunkEnd = this.nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
        int count = 0;
        while (chunkEnd != null && this.isChunkEndLeMax(chunkEnd, max, splitColumn)) {
            splits.add(ChunkRange.of(chunkStart, chunkEnd));
            this.maySleep(count++, tableId);
            chunkStart = chunkEnd;
            chunkEnd = this.nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize);
        }
        splits.add(ChunkRange.of(chunkStart, null));
        return splits;
    }

    private Object nextChunkEnd(JdbcConnection jdbc, Object previousChunkEnd, TableId tableId, Column splitColumn, Object max, int chunkSize) throws SQLException {
        Object chunkEnd = this.queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd);
        if (Objects.equals(previousChunkEnd, chunkEnd)) {
            chunkEnd = this.queryMin(jdbc, tableId, splitColumn, chunkEnd);
        }
        if (this.isChunkEndGeMax(chunkEnd, max, splitColumn)) {
            return null;
        }
        return chunkEnd;
    }

    private List<SnapshotSplit> createSnapshotSplit(TableId tableId, List<ChunkRange> chunks) {
        ArrayList<SnapshotSplit> splits = new ArrayList<SnapshotSplit>();
        for (int i = 0; i < chunks.size(); ++i) {
            ChunkRange chunk = chunks.get(i);
            SnapshotSplit split = this.createSnapshotSplit(tableId, i, this.splitType, chunk.getChunkStart(), chunk.getChunkEnd());
            splits.add(split);
        }
        return splits;
    }

    private SnapshotSplit createSnapshotSplit(TableId tableId, int chunkId, RowType splitKeyType, Object chunkStart, Object chunkEnd) {
        Object[] objectArray;
        Object[] objectArray2;
        if (chunkStart == null) {
            objectArray2 = null;
        } else {
            Object[] objectArray3 = new Object[1];
            objectArray2 = objectArray3;
            objectArray3[0] = chunkStart;
        }
        Object[] splitStart = objectArray2;
        if (chunkEnd == null) {
            objectArray = null;
        } else {
            Object[] objectArray4 = new Object[1];
            objectArray = objectArray4;
            objectArray4[0] = chunkEnd;
        }
        Object[] splitEnd = objectArray;
        HashMap<TableId, TableChanges.TableChange> schema = new HashMap<TableId, TableChanges.TableChange>();
        schema.put(tableId, this.currentSchema);
        return new SnapshotSplit(tableId, chunkId, splitKeyType, splitStart, splitEnd, null, schema);
    }

    private void maySleep(int count, TableId tableId) {
        if (count % 10 == 0) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", (Object)count, (Object)tableId);
        }
    }
}

