/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.source.assigners;

import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.schema.MySqlSchema;
import org.apache.flink.cdc.connectors.mysql.schema.MySqlTypeUtils;
import org.apache.flink.cdc.connectors.mysql.source.assigners.ChunkRange;
import org.apache.flink.cdc.connectors.mysql.source.assigners.ChunkSplitter;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlChunkSplitter
implements ChunkSplitter {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlChunkSplitter.class);
    private final Object lock = new Object();
    private final MySqlSourceConfig sourceConfig;
    private final MySqlSchema mySqlSchema;
    @Nullable
    private TableId currentSplittingTableId;
    @Nullable
    private ChunkSplitterState.ChunkBound nextChunkStart;
    @Nullable
    private Integer nextChunkId;
    private JdbcConnection jdbcConnection;
    private Table currentSplittingTable;
    private Column splitColumn;
    private RowType splitType;
    private Object[] minMaxOfSplitColumn;
    private long approximateRowCnt;

    public MySqlChunkSplitter(MySqlSchema mySqlSchema, MySqlSourceConfig sourceConfig) {
        this(mySqlSchema, sourceConfig, null, null, null);
    }

    public MySqlChunkSplitter(MySqlSchema mySqlSchema, MySqlSourceConfig sourceConfig, ChunkSplitterState chunkSplitterState) {
        this(mySqlSchema, sourceConfig, chunkSplitterState.getCurrentSplittingTableId(), chunkSplitterState.getNextChunkStart(), chunkSplitterState.getNextChunkId());
    }

    private MySqlChunkSplitter(MySqlSchema mySqlSchema, MySqlSourceConfig sourceConfig, @Nullable TableId currentSplittingTableId, @Nullable ChunkSplitterState.ChunkBound nextChunkStart, @Nullable Integer nextChunkId) {
        this.mySqlSchema = mySqlSchema;
        this.sourceConfig = sourceConfig;
        this.currentSplittingTableId = currentSplittingTableId;
        this.nextChunkStart = nextChunkStart;
        this.nextChunkId = nextChunkId;
    }

    @Override
    public void open() {
        this.jdbcConnection = DebeziumUtils.openJdbcConnection(this.sourceConfig);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<MySqlSnapshotSplit> splitChunks(MySqlPartition partition, TableId tableId) throws Exception {
        if (!this.hasNextChunk()) {
            this.analyzeTable(partition, tableId);
            Optional<List<MySqlSnapshotSplit>> evenlySplitChunks = this.trySplitAllEvenlySizedChunks(partition, tableId);
            if (evenlySplitChunks.isPresent()) {
                return evenlySplitChunks.get();
            }
            Object object = this.lock;
            synchronized (object) {
                this.currentSplittingTableId = tableId;
                this.nextChunkStart = ChunkSplitterState.ChunkBound.START_BOUND;
                this.nextChunkId = 0;
                return Collections.singletonList(this.splitOneUnevenlySizedChunk(partition, tableId));
            }
        }
        Preconditions.checkState((boolean)this.currentSplittingTableId.equals((Object)tableId), (Object)"Can not split a new table before the previous table splitting finish.");
        if (this.currentSplittingTable == null) {
            this.analyzeTable(partition, this.currentSplittingTableId);
        }
        Object object = this.lock;
        synchronized (object) {
            return Collections.singletonList(this.splitOneUnevenlySizedChunk(partition, tableId));
        }
    }

    private void analyzeTable(MySqlPartition partition, TableId tableId) {
        try {
            this.currentSplittingTable = this.mySqlSchema.getTableSchema(partition, this.jdbcConnection, tableId).getTable();
            this.splitColumn = ChunkUtils.getChunkKeyColumn(this.currentSplittingTable, this.sourceConfig.getChunkKeyColumns());
            this.splitType = ChunkUtils.getChunkKeyColumnType(this.splitColumn);
            this.minMaxOfSplitColumn = StatementUtils.queryMinMax(this.jdbcConnection, tableId, this.splitColumn.name());
            this.approximateRowCnt = StatementUtils.queryApproximateRowCnt(this.jdbcConnection, tableId);
        }
        catch (Exception e) {
            throw new RuntimeException("Fail to analyze table in chunk splitter.", e);
        }
    }

    private MySqlSnapshotSplit splitOneUnevenlySizedChunk(MySqlPartition partition, TableId tableId) throws SQLException {
        int chunkSize = this.sourceConfig.getSplitSize();
        Object chunkStartVal = this.nextChunkStart.getValue();
        LOG.info("Use unevenly-sized chunks for table {}, the chunk size is {} from {}", new Object[]{tableId, chunkSize, this.nextChunkStart == ChunkSplitterState.ChunkBound.START_BOUND ? "null" : chunkStartVal.toString()});
        Object chunkEnd = this.nextChunkEnd(this.jdbcConnection, this.nextChunkStart == ChunkSplitterState.ChunkBound.START_BOUND ? this.minMaxOfSplitColumn[0] : chunkStartVal, tableId, this.splitColumn.name(), this.minMaxOfSplitColumn[1], chunkSize);
        MySqlChunkSplitter.maySleep(this.nextChunkId, tableId);
        if (chunkEnd != null && ObjectUtils.compare(chunkEnd, this.minMaxOfSplitColumn[1]) <= 0) {
            this.nextChunkStart = ChunkSplitterState.ChunkBound.middleOf(chunkEnd);
            Integer n = this.nextChunkId;
            Integer n2 = this.nextChunkId = Integer.valueOf(this.nextChunkId + 1);
            return this.createSnapshotSplit(this.jdbcConnection, partition, tableId, n, this.splitType, chunkStartVal, chunkEnd);
        }
        this.currentSplittingTableId = null;
        this.nextChunkStart = ChunkSplitterState.ChunkBound.END_BOUND;
        Integer n = this.nextChunkId;
        Integer n3 = this.nextChunkId = Integer.valueOf(this.nextChunkId + 1);
        return this.createSnapshotSplit(this.jdbcConnection, partition, tableId, n, this.splitType, chunkStartVal, null);
    }

    private Optional<List<MySqlSnapshotSplit>> trySplitAllEvenlySizedChunks(MySqlPartition partition, TableId tableId) {
        LOG.debug("Try evenly splitting table {} into chunks", (Object)tableId);
        Object min = this.minMaxOfSplitColumn[0];
        Object max = this.minMaxOfSplitColumn[1];
        if (min == null || max == null || min.equals(max)) {
            return Optional.of(this.generateSplits(partition, tableId, Collections.singletonList(ChunkRange.all())));
        }
        int chunkSize = this.sourceConfig.getSplitSize();
        int dynamicChunkSize = this.getDynamicChunkSize(tableId, this.splitColumn, min, max, chunkSize, this.approximateRowCnt);
        if (dynamicChunkSize != -1) {
            LOG.debug("finish evenly splitting table {} into chunks", (Object)tableId);
            List<ChunkRange> chunks = this.splitEvenlySizedChunks(tableId, min, max, this.approximateRowCnt, chunkSize, dynamicChunkSize);
            return Optional.of(this.generateSplits(partition, tableId, chunks));
        }
        LOG.debug("beginning unevenly splitting table {} into chunks", (Object)tableId);
        return Optional.empty();
    }

    private List<MySqlSnapshotSplit> generateSplits(MySqlPartition partition, TableId tableId, List<ChunkRange> chunks) {
        ArrayList<MySqlSnapshotSplit> splits = new ArrayList<MySqlSnapshotSplit>();
        for (int i = 0; i < chunks.size(); ++i) {
            ChunkRange chunk = chunks.get(i);
            MySqlSnapshotSplit split = this.createSnapshotSplit(this.jdbcConnection, partition, tableId, i, this.splitType, chunk.getChunkStart(), chunk.getChunkEnd());
            splits.add(split);
        }
        return splits;
    }

    @Override
    public boolean hasNextChunk() {
        return this.currentSplittingTableId != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ChunkSplitterState snapshotState(long checkpointId) {
        Object object = this.lock;
        synchronized (object) {
            return new ChunkSplitterState(this.currentSplittingTableId, this.nextChunkStart, this.nextChunkId);
        }
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
    }

    @VisibleForTesting
    public List<ChunkRange> splitEvenlySizedChunks(TableId tableId, Object min, Object max, long approximateRowCnt, int chunkSize, int dynamicChunkSize) {
        LOG.info("Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", new Object[]{tableId, approximateRowCnt, chunkSize, dynamicChunkSize});
        if (approximateRowCnt <= (long)chunkSize) {
            return Collections.singletonList(ChunkRange.all());
        }
        ArrayList<ChunkRange> splits = new ArrayList<ChunkRange>();
        Object chunkStart = null;
        Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
        while (ObjectUtils.compare(chunkEnd, max) <= 0) {
            splits.add(ChunkRange.of(chunkStart, chunkEnd));
            chunkStart = chunkEnd;
            try {
                chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
            }
            catch (ArithmeticException e) {
                // empty catch block
                break;
            }
        }
        splits.add(ChunkRange.of(chunkStart, null));
        return splits;
    }

    private Object nextChunkEnd(JdbcConnection jdbc, Object previousChunkEnd, TableId tableId, String splitColumnName, Object max, int chunkSize) throws SQLException {
        Object chunkEnd = StatementUtils.queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd);
        if (Objects.equals(previousChunkEnd, chunkEnd) && (chunkEnd = StatementUtils.queryMin(jdbc, tableId, splitColumnName, chunkEnd)) == null) {
            return null;
        }
        if (ObjectUtils.compare(chunkEnd, max) >= 0) {
            return null;
        }
        return chunkEnd;
    }

    private MySqlSnapshotSplit createSnapshotSplit(JdbcConnection jdbc, MySqlPartition partition, TableId tableId, int chunkId, RowType splitKeyType, Object chunkStart, Object chunkEnd) {
        Object[] objectArray;
        Object[] objectArray2;
        if (chunkStart == null) {
            objectArray2 = null;
        } else {
            Object[] objectArray3 = new Object[1];
            objectArray2 = objectArray3;
            objectArray3[0] = chunkStart;
        }
        Object[] splitStart = objectArray2;
        if (chunkEnd == null) {
            objectArray = null;
        } else {
            Object[] objectArray4 = new Object[1];
            objectArray = objectArray4;
            objectArray4[0] = chunkEnd;
        }
        Object[] splitEnd = objectArray;
        HashMap<TableId, TableChanges.TableChange> schema = new HashMap<TableId, TableChanges.TableChange>();
        schema.put(tableId, this.mySqlSchema.getTableSchema(partition, jdbc, tableId));
        return new MySqlSnapshotSplit(tableId, MySqlChunkSplitter.splitId(tableId, chunkId), splitKeyType, splitStart, splitEnd, null, schema);
    }

    private int getDynamicChunkSize(TableId tableId, Column splitColumn, Object min, Object max, int chunkSize, long approximateRowCnt) {
        if (!MySqlChunkSplitter.isEvenlySplitColumn(splitColumn)) {
            return -1;
        }
        double distributionFactorUpper = this.sourceConfig.getDistributionFactorUpper();
        double distributionFactorLower = this.sourceConfig.getDistributionFactorLower();
        double distributionFactor = this.calculateDistributionFactor(tableId, min, max, approximateRowCnt);
        boolean dataIsEvenlyDistributed = ObjectUtils.doubleCompare(distributionFactor, distributionFactorLower) >= 0 && ObjectUtils.doubleCompare(distributionFactor, distributionFactorUpper) <= 0;
        LOG.info("The actual distribution factor for table {} is {}, the lower bound of evenly distribution factor is {}, the upper bound of evenly distribution factor is {}", new Object[]{tableId, distributionFactor, distributionFactorLower, distributionFactorUpper});
        if (dataIsEvenlyDistributed) {
            return Math.max((int)(distributionFactor * (double)chunkSize), 1);
        }
        return -1;
    }

    private static boolean isEvenlySplitColumn(Column splitColumn) {
        DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn);
        LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot();
        return typeRoot == LogicalTypeRoot.BIGINT || typeRoot == LogicalTypeRoot.INTEGER || typeRoot == LogicalTypeRoot.DECIMAL;
    }

    private double calculateDistributionFactor(TableId tableId, Object min, Object max, long approximateRowCnt) {
        if (!min.getClass().equals(max.getClass())) {
            throw new IllegalStateException(String.format("Unsupported operation type, the MIN value type %s is different with MAX value type %s.", min.getClass().getSimpleName(), max.getClass().getSimpleName()));
        }
        if (approximateRowCnt == 0L) {
            return Double.MAX_VALUE;
        }
        BigDecimal difference = ObjectUtils.minus(max, min);
        BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1L));
        double distributionFactor = subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, 2).doubleValue();
        LOG.info("The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}", new Object[]{tableId, distributionFactor, min, max, approximateRowCnt});
        return distributionFactor;
    }

    private static String splitId(TableId tableId, int chunkId) {
        return tableId.toString() + ":" + chunkId;
    }

    private static void maySleep(int count, TableId tableId) {
        if (count % 10 == 0) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            LOG.info("ChunkSplitter has split {} chunks for table {}", (Object)count, (Object)tableId);
        }
    }

    public TableId getCurrentSplittingTableId() {
        return this.currentSplittingTableId;
    }

    public Integer getNextChunkId() {
        return this.nextChunkId;
    }

    @Override
    public void close() throws Exception {
        if (this.jdbcConnection != null) {
            this.jdbcConnection.close();
        }
        this.mySqlSchema.close();
    }
}

