/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.mongodb.source.enumerator;

import com.mongodb.MongoNamespace;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplit;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplitStrategy;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongodbSplitEnumerator
implements SourceSplitEnumerator<MongoSplit, ArrayList<MongoSplit>> {
    private static final Logger log = LoggerFactory.getLogger(MongodbSplitEnumerator.class);
    private final ArrayList<MongoSplit> pendingSplits = Lists.newArrayList();
    private final SourceSplitEnumerator.Context<MongoSplit> context;
    private final MongodbClientProvider clientProvider;
    private final MongoSplitStrategy strategy;

    public MongodbSplitEnumerator(SourceSplitEnumerator.Context<MongoSplit> context, MongodbClientProvider clientProvider, MongoSplitStrategy strategy) {
        this(context, clientProvider, strategy, Collections.emptyList());
    }

    public MongodbSplitEnumerator(SourceSplitEnumerator.Context<MongoSplit> context, MongodbClientProvider clientProvider, MongoSplitStrategy strategy, List<MongoSplit> splits) {
        this.context = context;
        this.clientProvider = clientProvider;
        this.strategy = strategy;
        this.pendingSplits.addAll(splits);
    }

    public void open() {
    }

    public synchronized void run() {
        log.info("Starting MongoSplitEnumerator.");
        Set readers = this.context.registeredReaders();
        this.pendingSplits.addAll(this.strategy.split());
        MongoNamespace namespace = this.clientProvider.getDefaultCollection().getNamespace();
        log.info("Added {} pending splits for namespace {}.", (Object)this.pendingSplits.size(), (Object)namespace.getFullName());
        this.assignSplits(readers);
    }

    public void close() {
        if (this.clientProvider != null) {
            this.clientProvider.close();
        }
    }

    public void addSplitsBack(List<MongoSplit> splits, int subtaskId) {
        if (splits != null) {
            log.info("Received {} split(s) back from subtask {}.", (Object)splits.size(), (Object)subtaskId);
            this.pendingSplits.addAll(splits);
        }
    }

    public int currentUnassignedSplitSize() {
        return this.pendingSplits.size();
    }

    public void handleSplitRequest(int subtaskId) {
        throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, String.format("Unsupported handleSplitRequest: %d", subtaskId));
    }

    public void registerReader(int subtaskId) {
        log.debug("Register reader {} to MongodbSplitEnumerator.", (Object)subtaskId);
        if (!this.pendingSplits.isEmpty()) {
            this.assignSplits(Collections.singletonList(subtaskId));
        }
    }

    public ArrayList<MongoSplit> snapshotState(long checkpointId) {
        return this.pendingSplits;
    }

    public void notifyCheckpointComplete(long checkpointId) {
    }

    private synchronized void assignSplits(Collection<Integer> readers) {
        log.debug("Assign pendingSplits to readers {}", readers);
        int numReaders = readers.size();
        Map<Integer, List<MongoSplit>> splitsBySubtaskId = this.pendingSplits.stream().collect(Collectors.groupingBy(split -> MongodbSplitEnumerator.getSplitOwner(split.splitId(), numReaders)));
        readers.forEach(subtaskId -> this.assignSplitsToSubtask((Integer)subtaskId, splitsBySubtaskId));
        this.pendingSplits.clear();
        readers.forEach(arg_0 -> this.context.signalNoMoreSplits(arg_0));
    }

    private void assignSplitsToSubtask(Integer subtaskId, Map<Integer, List<MongoSplit>> splitsBySubtaskId) {
        log.info("Received split request from taskId {}.", (Object)subtaskId);
        List assignedSplits = splitsBySubtaskId.getOrDefault(subtaskId, Collections.emptyList());
        this.context.assignSplit(subtaskId.intValue(), assignedSplits);
        log.info("Assigned {} splits to subtask {}, remaining splits: {}.", new Object[]{assignedSplits.size(), subtaskId, this.pendingSplits.size() - assignedSplits.size()});
    }

    private static int getSplitOwner(String tp, int numReaders) {
        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
    }
}

