/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.state;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.DynamicLoadFactor;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.state.PartitionStateManager;
import io.confluent.parallelconsumer.state.ShardManager;
import io.confluent.parallelconsumer.state.WorkContainer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

public class WorkManager<K, V>
implements ConsumerRebalanceListener {
    private static final Logger log = LoggerFactory.getLogger(WorkManager.class);
    private final ParallelConsumerOptions<K, V> options;
    final PartitionStateManager<K, V> pm;
    private final ShardManager<K, V> sm;
    private final DynamicLoadFactor dynamicLoadFactor;
    private int numberRecordsOutForProcessing = 0;
    private final List<Consumer<WorkContainer<K, V>>> successfulWorkListeners = new ArrayList<Consumer<WorkContainer<K, V>>>();

    public WorkManager(PCModule<K, V> module, DynamicLoadFactor dynamicExtraLoadFactor) {
        this.options = module.options();
        this.dynamicLoadFactor = dynamicExtraLoadFactor;
        this.sm = new ShardManager<K, V>(module, this);
        this.pm = new PartitionStateManager<K, V>(module, this.sm);
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        this.pm.onPartitionsAssigned(partitions);
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        this.pm.onPartitionsRevoked(partitions);
        this.onPartitionsRemoved(partitions);
    }

    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        this.pm.onPartitionsLost(partitions);
        this.onPartitionsRemoved(partitions);
    }

    void onPartitionsRemoved(Collection<TopicPartition> partitions) {
    }

    public void registerWork(EpochAndRecordsMap<K, V> records) {
        this.pm.maybeRegisterNewRecordAsWork(records);
    }

    public List<WorkContainer<K, V>> getWorkIfAvailable() {
        return this.getWorkIfAvailable(Integer.MAX_VALUE);
    }

    public List<WorkContainer<K, V>> getWorkIfAvailable(int requestedMaxWorkToRetrieve) {
        if (requestedMaxWorkToRetrieve < 1) {
            return UniLists.of();
        }
        List<WorkContainer<K, V>> work = this.sm.getWorkIfAvailable(requestedMaxWorkToRetrieve);
        log.debug("Got {} of {} requested records of work. In-flight: {}, Awaiting in commit (partition) queues: {}", new Object[]{work.size(), requestedMaxWorkToRetrieve, this.getNumberRecordsOutForProcessing(), this.getNumberOfIncompleteOffsets()});
        this.numberRecordsOutForProcessing += work.size();
        return work;
    }

    public void onSuccessResult(WorkContainer<K, V> wc) {
        log.trace("Work success ({}), removing from processing shard queue", wc);
        wc.endFlight();
        this.pm.onSuccess(wc);
        this.sm.onSuccess(wc);
        this.successfulWorkListeners.forEach(c -> c.accept(wc));
        --this.numberRecordsOutForProcessing;
    }

    public void onOffsetCommitSuccess(Map<TopicPartition, OffsetAndMetadata> committed) {
        this.pm.onOffsetCommitSuccess(committed);
    }

    public void onFailureResult(WorkContainer<K, V> wc) {
        wc.endFlight();
        this.pm.onFailure(wc);
        this.sm.onFailure(wc);
        --this.numberRecordsOutForProcessing;
    }

    public long getNumberOfIncompleteOffsets() {
        return this.pm.getNumberOfIncompleteOffsets();
    }

    public Map<TopicPartition, OffsetAndMetadata> collectCommitDataForDirtyPartitions() {
        return this.pm.collectDirtyCommitData();
    }

    public boolean checkIfWorkIsStale(List<WorkContainer<K, V>> workContainers) {
        for (WorkContainer<K, V> workContainer : workContainers) {
            if (!this.checkIfWorkIsStale(workContainer)) continue;
            return true;
        }
        return false;
    }

    public boolean checkIfWorkIsStale(WorkContainer<K, V> workContainer) {
        return this.pm.getPartitionState(workContainer).checkIfWorkIsStale(workContainer);
    }

    public boolean shouldThrottle() {
        return this.isSufficientlyLoaded();
    }

    public boolean isSufficientlyLoaded() {
        return this.getNumberOfWorkQueuedInShardsAwaitingSelection() > (long)this.options.getTargetAmountOfRecordsInFlight() * (long)this.getLoadingFactor();
    }

    private int getLoadingFactor() {
        return this.dynamicLoadFactor.getCurrentFactor();
    }

    public boolean workIsWaitingToBeProcessed() {
        return this.sm.workIsWaitingToBeProcessed();
    }

    public boolean hasWorkInFlight() {
        return this.getNumberRecordsOutForProcessing() != 0;
    }

    public boolean isWorkInFlightMeetingTarget() {
        return this.getNumberRecordsOutForProcessing() >= this.options.getTargetAmountOfRecordsInFlight();
    }

    public long getNumberOfWorkQueuedInShardsAwaitingSelection() {
        return this.sm.getNumberOfWorkQueuedInShardsAwaitingSelection();
    }

    public boolean hasIncompleteOffsets() {
        return this.pm.hasIncompleteOffsets();
    }

    public boolean isRecordsAwaitingProcessing() {
        return this.sm.getNumberOfWorkQueuedInShardsAwaitingSelection() > 0L;
    }

    public void handleFutureResult(WorkContainer<K, V> wc) {
        if (this.checkIfWorkIsStale(wc)) {
            log.debug("Work result received, but from an old generation. Dropping work from revoked partition {}", wc);
            wc.endFlight();
            --this.numberRecordsOutForProcessing;
        } else {
            Optional<Boolean> userFunctionSucceeded = wc.getMaybeUserFunctionSucceeded();
            if (userFunctionSucceeded.isPresent()) {
                if (Boolean.TRUE.equals(userFunctionSucceeded.get())) {
                    this.onSuccessResult(wc);
                } else {
                    this.onFailureResult(wc);
                }
            } else {
                throw new IllegalStateException("Work returned, but without a success flag - report a bug");
            }
        }
    }

    public boolean isNoRecordsOutForProcessing() {
        return this.getNumberRecordsOutForProcessing() == 0;
    }

    public Optional<Duration> getLowestRetryTime() {
        return this.sm.getLowestRetryTime();
    }

    public boolean isDirty() {
        return this.pm.isDirty();
    }

    public ParallelConsumerOptions<K, V> getOptions() {
        return this.options;
    }

    public PartitionStateManager<K, V> getPm() {
        return this.pm;
    }

    public ShardManager<K, V> getSm() {
        return this.sm;
    }

    public int getNumberRecordsOutForProcessing() {
        return this.numberRecordsOutForProcessing;
    }

    public List<Consumer<WorkContainer<K, V>>> getSuccessfulWorkListeners() {
        return this.successfulWorkListeners;
    }
}

