/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.state;

import io.confluent.csid.utils.BackportUtils;
import io.confluent.csid.utils.KafkaUtils;
import io.confluent.csid.utils.LoopingResumingIterator;
import io.confluent.csid.utils.WallClock;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.DynamicLoadFactor;
import io.confluent.parallelconsumer.internal.RateLimiter;
import io.confluent.parallelconsumer.state.PartitionMonitor;
import io.confluent.parallelconsumer.state.PartitionState;
import io.confluent.parallelconsumer.state.ShardManager;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkMailBoxManager;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;

public class WorkManager<K, V>
implements ConsumerRebalanceListener {
    private static final Logger log = LoggerFactory.getLogger(WorkManager.class);
    private final ParallelConsumerOptions options;
    final PartitionMonitor<K, V> pm;
    private final ShardManager<K, V> sm;
    private final DynamicLoadFactor dynamicLoadFactor;
    private final WorkMailBoxManager<K, V> wmbm;
    private Optional<Object> iterationResumePoint = Optional.empty();
    private int numberRecordsOutForProcessing = 0;
    private final List<java.util.function.Consumer<WorkContainer<K, V>>> successfulWorkListeners = new ArrayList<java.util.function.Consumer<WorkContainer<K, V>>>();
    private WallClock clock = new WallClock();
    Consumer<K, V> consumer;
    private final AtomicBoolean workStateIsDirtyNeedsCommitting = new AtomicBoolean(false);
    private final Duration thresholdForTimeSpentInQueueWarning = Duration.ofSeconds(10L);
    private final RateLimiter slowWarningRateLimit = new RateLimiter(5);

    public WorkManager(ParallelConsumerOptions<K, V> options, Consumer<K, V> consumer) {
        this(options, consumer, new DynamicLoadFactor());
    }

    public WorkManager(ParallelConsumerOptions<K, V> newOptions, Consumer<K, V> consumer, DynamicLoadFactor dynamicExtraLoadFactor) {
        this.options = newOptions;
        this.consumer = consumer;
        this.dynamicLoadFactor = dynamicExtraLoadFactor;
        this.wmbm = new WorkMailBoxManager();
        this.sm = new ShardManager(this.options);
        this.pm = new PartitionMonitor<K, V>(consumer, this.sm);
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        this.pm.onPartitionsAssigned(partitions);
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        this.pm.onPartitionsRevoked(partitions);
        this.onPartitionsRemoved(partitions);
    }

    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        this.pm.onPartitionsLost(partitions);
        this.onPartitionsRemoved(partitions);
    }

    void onPartitionsRemoved(Collection<TopicPartition> partitions) {
        this.wmbm.onPartitionsRemoved(partitions);
    }

    public void registerWork(ConsumerRecords<K, V> records) {
        this.wmbm.registerWork(records);
    }

    private void ingestPolledRecordsIntoQueues(int requestedMaxWorkToRetrieve) {
        ConsumerRecord<K, V> poll;
        boolean stillProcessing;
        log.debug("Will attempt to register the requested {} - {} available in internal mailbox", (Object)requestedMaxWorkToRetrieve, (Object)this.wmbm.internalFlattenedMailQueueSize());
        int taken = 0;
        do {
            boolean takenAsWork;
            if (!(takenAsWork = this.maybeRegisterNewRecordAsWork(poll = this.wmbm.internalFlattenedMailQueuePoll()))) continue;
            ++taken;
        } while (stillProcessing = taken < requestedMaxWorkToRetrieve && poll != null);
        log.debug("{} new records were registered.", (Object)taken);
    }

    private boolean maybeRegisterNewRecordAsWork(ConsumerRecord<K, V> rec) {
        if (rec == null) {
            return false;
        }
        if (!this.pm.isPartitionAssigned(rec)) {
            log.debug("Record in buffer for a partition no longer assigned. Dropping. TP: {} rec: {}", (Object)KafkaUtils.toTP(rec), rec);
            return false;
        }
        if (this.pm.isRecordPreviouslyProcessed(rec)) {
            log.trace("Record previously processed, skipping. offset: {}", (Object)rec.offset());
            return false;
        }
        TopicPartition tp = KafkaUtils.toTP(rec);
        int currentPartitionEpoch = this.pm.getEpoch(rec, tp);
        WorkContainer<K, V> wc = new WorkContainer<K, V>(currentPartitionEpoch, rec);
        this.sm.addWorkContainer(wc);
        this.pm.addWorkContainer(wc);
        return true;
    }

    public <R> List<WorkContainer<K, V>> maybeGetWork() {
        return this.maybeGetWork(Integer.MAX_VALUE);
    }

    public List<WorkContainer<K, V>> maybeGetWork(int requestedMaxWorkToRetrieve) {
        int workToGetDelta = requestedMaxWorkToRetrieve;
        if (workToGetDelta < 1) {
            return UniLists.of();
        }
        this.tryToEnsureAvailableCapacity(requestedMaxWorkToRetrieve);
        ArrayList<WorkContainer<K, V>> work = new ArrayList<WorkContainer<K, V>>();
        LoopingResumingIterator<Object, NavigableMap<Long, WorkContainer<K, V>>> it = this.sm.getIterator(this.iterationResumePoint);
        ArrayList<WorkContainer> staleWorkToRemove = new ArrayList<WorkContainer>();
        int slowWorkCount = 0;
        HashSet<String> slowWorkTopics = new HashSet<String>();
        for (Map.Entry<Object, NavigableMap<Long, WorkContainer<K, V>>> entry : it) {
            log.trace("Looking for work on shard: {}", entry.getKey());
            if (work.size() >= workToGetDelta) {
                this.iterationResumePoint = Optional.of(entry.getKey());
                log.debug("Work taken is now over max, stopping (saving iteration resume point {})", this.iterationResumePoint);
                break;
            }
            ArrayList<WorkContainer> shardWork = new ArrayList<WorkContainer>();
            SortedMap shardQueue = entry.getValue();
            Set shardQueueEntries = shardQueue.entrySet();
            for (Map.Entry queueEntry : shardQueueEntries) {
                boolean representedInEncodedPayloadAlready;
                int taken = work.size() + shardWork.size();
                if (taken >= workToGetDelta) {
                    log.trace("Work taken ({}) exceeds max ({})", (Object)taken, (Object)workToGetDelta);
                    break;
                }
                WorkContainer workContainer = (WorkContainer)queueEntry.getValue();
                if (this.checkEpochIsStale(workContainer)) {
                    log.debug("Work is in queue with stale epoch. Will remove now. Was it not removed properly on revoke? Or are we in a race state? {}", (Object)workContainer);
                    staleWorkToRemove.add(workContainer);
                    continue;
                }
                TopicPartition topicPartition = workContainer.getTopicPartition();
                boolean notAllowedMoreRecords = this.pm.isBlocked(topicPartition);
                boolean bl = representedInEncodedPayloadAlready = workContainer.offset() < this.pm.getState(topicPartition).getOffsetHighestSucceeded();
                if (notAllowedMoreRecords && !representedInEncodedPayloadAlready && workContainer.isNotInFlight()) {
                    log.debug("Not allowed more records for the partition ({}) as set from previous encode run (blocked), that this record ({}) belongs to due to offset encoding back pressure, is within the encoded payload already (offset lower than highest succeeded, not in flight ({}), continuing on to next container in shard.", new Object[]{topicPartition, workContainer.offset(), workContainer.isNotInFlight()});
                    continue;
                }
                boolean hasNotSucceededAlready = !workContainer.isUserFunctionSucceeded();
                boolean delayHasPassed = workContainer.hasDelayPassed(this.clock);
                if (delayHasPassed && workContainer.isNotInFlight() && hasNotSucceededAlready) {
                    log.trace("Taking {} as work", (Object)workContainer);
                    workContainer.queueingForExecution();
                    shardWork.add(workContainer);
                } else {
                    Duration timeInFlight = workContainer.getTimeInFlight();
                    String msg = "Can't take as work: Work ({}). Must all be true: Delay passed= {}. Is not in flight= {}. Has not succeeded already= {}. Time spent in execution queue: {}.";
                    if (BackportUtils.toSeconds(timeInFlight) > BackportUtils.toSeconds(this.thresholdForTimeSpentInQueueWarning)) {
                        ++slowWorkCount;
                        slowWorkTopics.add(workContainer.getCr().topic());
                        log.trace("Work has spent over " + this.thresholdForTimeSpentInQueueWarning + " in queue! " + msg, new Object[]{workContainer, delayHasPassed, workContainer.isNotInFlight(), hasNotSucceededAlready, timeInFlight});
                    } else {
                        log.trace(msg, new Object[]{workContainer, delayHasPassed, workContainer.isNotInFlight(), hasNotSucceededAlready, timeInFlight});
                    }
                }
                ParallelConsumerOptions.ProcessingOrder ordering = this.options.getOrdering();
                if (ordering == ParallelConsumerOptions.ProcessingOrder.UNORDERED) continue;
                log.trace("Processing by {}, so have cannot get more messages on this ({}) shard.", (Object)this.options.getOrdering(), entry.getKey());
                break;
            }
            work.addAll(shardWork);
        }
        if (slowWorkCount > 0) {
            int finalSlowWorkCount = slowWorkCount;
            this.slowWarningRateLimit.performIfNotLimited(() -> log.warn("Warning: {} records in the queue have been waiting longer than {}s for following topics {}.", new Object[]{finalSlowWorkCount, BackportUtils.toSeconds(this.thresholdForTimeSpentInQueueWarning), slowWorkTopics}));
        }
        for (WorkContainer workContainer : staleWorkToRemove) {
            this.sm.removeWorkFromShard(workContainer);
        }
        log.debug("Got {} records of work. In-flight: {}, Awaiting in commit queues: {}", new Object[]{work.size(), this.getNumberRecordsOutForProcessing(), this.getNumberOfEntriesInPartitionQueues()});
        this.numberRecordsOutForProcessing += work.size();
        return work;
    }

    private void tryToEnsureAvailableCapacity(int requestedMaxWorkToRetrieve) {
        int available = this.sm.getWorkQueuedInShardsCount();
        int extraNeededFromInboxToSatisfy = requestedMaxWorkToRetrieve - available;
        log.debug("Requested: {}, available in shards: {}, will try to process from mailbox the delta of: {}", new Object[]{requestedMaxWorkToRetrieve, available, extraNeededFromInboxToSatisfy});
        this.ingestPolledRecordsIntoQueues(extraNeededFromInboxToSatisfy);
    }

    public void onSuccess(WorkContainer<K, V> wc) {
        log.trace("Processing success...");
        this.workStateIsDirtyNeedsCommitting.set(true);
        ConsumerRecord<K, V> cr = wc.getCr();
        log.trace("Work success ({}), removing from processing shard queue", wc);
        wc.succeed();
        this.pm.onSuccess(wc);
        this.sm.onSuccess(cr);
        this.successfulWorkListeners.forEach(c -> c.accept(wc));
        --this.numberRecordsOutForProcessing;
    }

    public void onOffsetCommitSuccess(Map<TopicPartition, OffsetAndMetadata> offsetsToSend) {
        this.pm.onOffsetCommitSuccess(offsetsToSend);
    }

    public void onFailure(WorkContainer<K, V> wc) {
        wc.fail(this.clock);
        this.putBack(wc);
    }

    private void putBack(WorkContainer<K, V> wc) {
        log.debug("Work FAILED, returning to shard");
        ConsumerRecord<K, V> cr = wc.getCr();
        Object key = this.sm.computeShardKey(cr);
        NavigableMap<Long, WorkContainer<Long, WorkContainer<K, V>>> shard = this.sm.getShard(key);
        long offset = wc.getCr().offset();
        shard.put(offset, wc);
        --this.numberRecordsOutForProcessing;
    }

    public long getNumberOfEntriesInPartitionQueues() {
        return this.pm.getNumberOfEntriesInPartitionQueues();
    }

    public Integer getWorkQueuedInMailboxCount() {
        return this.wmbm.getWorkQueuedInMailboxCount();
    }

    public Map<TopicPartition, OffsetAndMetadata> findCompletedEligibleOffsetsAndRemove() {
        return this.findCompletedEligibleOffsetsAndRemove(true);
    }

    public boolean hasCommittableOffsets() {
        return this.isDirty();
    }

    <R> Map<TopicPartition, OffsetAndMetadata> findCompletedEligibleOffsetsAndRemove(boolean remove) {
        if (!this.isDirty()) {
            return UniMaps.of();
        }
        HashMap<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<TopicPartition, OffsetAndMetadata>();
        int count = 0;
        int removed = 0;
        log.trace("Scanning for in order in-flight work that has completed...");
        Set<Map.Entry<TopicPartition, PartitionState<K, V>>> set = this.pm.getPartitionStates().entrySet();
        for (Map.Entry<TopicPartition, PartitionState<K, V>> partitionStateEntry : set) {
            Map.Entry offsetAndItsWorkContainer;
            WorkContainer container;
            long offset;
            PartitionState<K, V> partitionState = partitionStateEntry.getValue();
            NavigableMap<Long, WorkContainer<K, V>> partitionQueue = partitionState.getCommitQueues();
            TopicPartition topicPartitionKey = partitionStateEntry.getKey();
            log.trace("Starting scan of partition: {}", (Object)topicPartitionKey);
            count += partitionQueue.size();
            LinkedList<WorkContainer> workToRemove = new LinkedList<WorkContainer>();
            LinkedHashSet<Long> incompleteOffsets = new LinkedHashSet<Long>();
            long lowWaterMark = -1L;
            long highestSucceeded = partitionState.getOffsetHighestSucceeded();
            boolean beyondSuccessiveSucceededOffsets = false;
            Iterator iterator = partitionQueue.entrySet().iterator();
            while (iterator.hasNext() && (offset = (container = (WorkContainer)(offsetAndItsWorkContainer = iterator.next()).getValue()).getCr().offset()) <= highestSucceeded) {
                boolean complete = container.isUserFunctionComplete();
                if (complete) {
                    if (container.getUserFunctionSucceeded().get().booleanValue() && !beyondSuccessiveSucceededOffsets) {
                        log.trace("Found offset candidate ({}) to add to offset commit map", (Object)container);
                        workToRemove.add(container);
                        long offsetOfNextExpectedMessageToBeCommitted = offset + 1L;
                        OffsetAndMetadata offsetData = new OffsetAndMetadata(offsetOfNextExpectedMessageToBeCommitted);
                        offsetsToSend.put(topicPartitionKey, offsetData);
                        continue;
                    }
                    if (container.getUserFunctionSucceeded().get().booleanValue() && beyondSuccessiveSucceededOffsets) {
                        log.trace("Offset {} is complete and succeeded, but we've iterated past the lowest committable offset ({}). Will mark as complete in the offset map.", (Object)container.getCr().offset(), (Object)lowWaterMark);
                        continue;
                    }
                    log.trace("Offset {} is complete, but failed processing. Will track in offset map as failed. Can't do normal offset commit past this point.", (Object)container.getCr().offset());
                    beyondSuccessiveSucceededOffsets = true;
                    incompleteOffsets.add(offset);
                    continue;
                }
                lowWaterMark = container.offset();
                beyondSuccessiveSucceededOffsets = true;
                log.trace("Offset ({}) is incomplete, holding up the queue ({}) of size {}.", new Object[]{container.getCr().offset(), topicPartitionKey, partitionQueue.size()});
                incompleteOffsets.add(offset);
            }
            this.pm.addEncodedOffsets(offsetsToSend, topicPartitionKey, incompleteOffsets);
            if (!remove) continue;
            removed += workToRemove.size();
            for (WorkContainer workContainer : workToRemove) {
                long offset2 = workContainer.getCr().offset();
                partitionQueue.remove(offset2);
            }
        }
        log.debug("Scan finished, {} were in flight, {} completed offsets removed, coalesced to {} offset(s) ({}) to be committed", new Object[]{count, removed, offsetsToSend.size(), offsetsToSend});
        return offsetsToSend;
    }

    public boolean checkEpochIsStale(WorkContainer<K, V> workContainer) {
        return this.pm.checkEpochIsStale(workContainer);
    }

    public boolean shouldThrottle() {
        return this.isSufficientlyLoaded();
    }

    public boolean isSufficientlyLoaded() {
        return this.getWorkQueuedInMailboxCount() > this.options.getMaxConcurrency() * this.getLoadingFactor();
    }

    private int getLoadingFactor() {
        return this.dynamicLoadFactor.getCurrentFactor();
    }

    public boolean workIsWaitingToBeProcessed() {
        return this.sm.workIsWaitingToBeProcessed();
    }

    public boolean hasWorkInFlight() {
        return this.getNumberRecordsOutForProcessing() != 0;
    }

    public boolean isClean() {
        return !this.isDirty();
    }

    private boolean isDirty() {
        return this.workStateIsDirtyNeedsCommitting.get();
    }

    public int getTotalWorkWaitingProcessing() {
        int workQueuedInShardsCount = this.sm.getWorkQueuedInShardsCount();
        Integer workQueuedInMailboxCount = this.getWorkQueuedInMailboxCount();
        return workQueuedInShardsCount + workQueuedInMailboxCount;
    }

    public boolean hasWorkInMailboxes() {
        return this.getWorkQueuedInMailboxCount() > 0;
    }

    public boolean hasWorkInCommitQueues() {
        return this.pm.hasWorkInCommitQueues();
    }

    public boolean isRecordsAwaitingProcessing() {
        int partitionWorkRemainingCount = this.sm.getWorkQueuedInShardsCount();
        boolean internalQueuesNotEmpty = this.hasWorkInMailboxes();
        return partitionWorkRemainingCount > 0 || internalQueuesNotEmpty;
    }

    public boolean isRecordsAwaitingToBeCommitted() {
        long partitionWorkRemainingCount = this.getNumberOfEntriesInPartitionQueues();
        return partitionWorkRemainingCount > 0L;
    }

    public void handleFutureResult(WorkContainer<K, V> wc) {
        if (this.checkEpochIsStale(wc)) {
            log.debug("Work result received, but from an old generation. Dropping work from revoked partition {}", wc);
            return;
        }
        if (wc.getUserFunctionSucceeded().get().booleanValue()) {
            this.onSuccess(wc);
        } else {
            this.onFailure(wc);
        }
    }

    public boolean isSystemIdle() {
        return this.getNumberRecordsOutForProcessing() == 0;
    }

    public ParallelConsumerOptions getOptions() {
        return this.options;
    }

    public PartitionMonitor<K, V> getPm() {
        return this.pm;
    }

    public ShardManager<K, V> getSm() {
        return this.sm;
    }

    public int getNumberRecordsOutForProcessing() {
        return this.numberRecordsOutForProcessing;
    }

    public List<java.util.function.Consumer<WorkContainer<K, V>>> getSuccessfulWorkListeners() {
        return this.successfulWorkListeners;
    }

    void setClock(WallClock clock) {
        this.clock = clock;
    }
}

