/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.jdbc.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.ChunkSplitter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcSourceSplitEnumerator
implements SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceSplitEnumerator.class);
    private final Map<TablePath, JdbcSourceTable> tables;
    private final ConcurrentLinkedQueue<TablePath> pendingTables;
    private final Map<Integer, List<JdbcSourceSplit>> pendingSplits;
    private final ChunkSplitter splitter;
    private final SourceSplitEnumerator.Context<JdbcSourceSplit> context;
    private final Object stateLock = new Object();

    public JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> context, JdbcSourceConfig jdbcSourceConfig, Map<TablePath, JdbcSourceTable> tables, JdbcSourceState sourceState) {
        this.context = context;
        this.tables = tables;
        this.splitter = ChunkSplitter.create(jdbcSourceConfig);
        if (sourceState == null) {
            this.pendingTables = new ConcurrentLinkedQueue<TablePath>(tables.keySet());
            this.pendingSplits = new HashMap<Integer, List<JdbcSourceSplit>>();
        } else {
            this.pendingTables = new ConcurrentLinkedQueue<TablePath>(sourceState.getPendingTables());
            this.pendingSplits = new HashMap<Integer, List<JdbcSourceSplit>>(sourceState.getPendingSplits());
        }
    }

    public void open() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() throws Exception {
        LOG.info("Starting split enumerator.");
        Set readers = this.context.registeredReaders();
        while (!this.pendingTables.isEmpty()) {
            Object object = this.stateLock;
            synchronized (object) {
                TablePath tablePath = this.pendingTables.poll();
                LOG.info("Splitting table {}.", (Object)tablePath);
                Collection<JdbcSourceSplit> splits = this.splitter.generateSplits(this.tables.get(tablePath));
                LOG.info("Split table {} into {} splits.", (Object)tablePath, (Object)splits.size());
                this.addPendingSplit(splits);
            }
            object = this.stateLock;
            synchronized (object) {
                this.assignSplit(readers);
            }
        }
        this.splitter.close();
        LOG.info("No more splits to assign. Sending NoMoreSplitsEvent to reader {}.", (Object)readers);
        readers.forEach(arg_0 -> this.context.signalNoMoreSplits(arg_0));
    }

    public void close() throws IOException {
        this.splitter.close();
    }

    public void addSplitsBack(List<JdbcSourceSplit> splits, int subtaskId) {
        if (!splits.isEmpty()) {
            this.addPendingSplit(splits, subtaskId);
            if (this.context.registeredReaders().contains(subtaskId)) {
                this.assignSplit(Collections.singletonList(subtaskId));
            } else {
                LOG.warn("Reader {} is not registered. Pending splits {} are not assigned.", (Object)subtaskId, splits);
            }
        }
        LOG.info("Add back splits {} to JdbcSourceSplitEnumerator.", (Object)splits.size());
    }

    public int currentUnassignedSplitSize() {
        return this.pendingTables.isEmpty() && this.pendingSplits.isEmpty() ? 0 : 1;
    }

    public void handleSplitRequest(int subtaskId) {
        throw new JdbcConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, String.format("Unsupported handleSplitRequest: %d", subtaskId));
    }

    public void registerReader(int subtaskId) {
        LOG.info("Register reader {} to JdbcSourceSplitEnumerator.", (Object)subtaskId);
        if (!this.pendingSplits.isEmpty()) {
            this.assignSplit(Collections.singletonList(subtaskId));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JdbcSourceState snapshotState(long checkpointId) throws Exception {
        Object object = this.stateLock;
        synchronized (object) {
            return new JdbcSourceState(new ArrayList<TablePath>(this.pendingTables), new HashMap<Integer, List<JdbcSourceSplit>>(this.pendingSplits));
        }
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }

    private void assignSplit(Collection<Integer> readers) {
        LOG.debug("Assign pendingSplits to readers {}", readers);
        for (int reader : readers) {
            List<JdbcSourceSplit> assignmentForReader = this.pendingSplits.remove(reader);
            if (assignmentForReader == null || assignmentForReader.isEmpty()) continue;
            LOG.debug("Assign splits {} to reader {}", assignmentForReader, (Object)reader);
            this.context.assignSplit(reader, assignmentForReader);
        }
    }

    private void addPendingSplit(Collection<JdbcSourceSplit> splits) {
        int readerCount = this.context.currentParallelism();
        for (JdbcSourceSplit split : splits) {
            int ownerReader = JdbcSourceSplitEnumerator.getSplitOwner(split.splitId(), readerCount);
            LOG.debug("Assigning {} to {} reader.", (Object)split, (Object)ownerReader);
            this.pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList()).add(split);
        }
    }

    private void addPendingSplit(Collection<JdbcSourceSplit> splits, int ownerReader) {
        this.pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList()).addAll(splits);
    }

    private static int getSplitOwner(String tp, int numReaders) {
        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
    }
}

