/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.state;

import io.confluent.csid.utils.BackportUtils;
import io.confluent.csid.utils.JavaUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.RateLimiter;
import io.confluent.parallelconsumer.state.PartitionStateManager;
import io.confluent.parallelconsumer.state.ShardKey;
import io.confluent.parallelconsumer.state.WorkContainer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessingShard<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ProcessingShard.class);
    private final NavigableMap<Long, WorkContainer<K, V>> entries = new ConcurrentSkipListMap<Long, WorkContainer<K, V>>();
    private final ShardKey key;
    private final ParallelConsumerOptions<?, ?> options;
    private final PartitionStateManager<K, V> pm;
    private final RateLimiter slowWarningRateLimit = new RateLimiter(5);

    public boolean workIsWaitingToBeProcessed() {
        return this.entries.values().parallelStream().anyMatch(kvWorkContainer -> kvWorkContainer.isAvailableToTakeAsWork());
    }

    public void addWorkContainer(WorkContainer<K, V> wc) {
        long key = wc.offset();
        if (this.entries.containsKey(key)) {
            log.debug("Entry for {} already exists in shard queue, dropping record", wc);
        } else {
            this.entries.put(key, wc);
        }
    }

    public void onSuccess(WorkContainer<?, ?> wc) {
        this.entries.remove(wc.offset());
    }

    public boolean isEmpty() {
        return this.entries.isEmpty();
    }

    public long getCountOfWorkAwaitingSelection() {
        return this.entries.values().stream().filter(WorkContainer::isAvailableToTakeAsWork).count();
    }

    public long getCountOfWorkTracked() {
        return this.entries.size();
    }

    public long getCountWorkInFlight() {
        return this.entries.values().stream().filter(WorkContainer::isInFlight).count();
    }

    public WorkContainer<K, V> remove(long offset) {
        return (WorkContainer)this.entries.remove(offset);
    }

    ArrayList<WorkContainer<K, V>> getWorkIfAvailable(int workToGetDelta) {
        log.trace("Looking for work on shardQueueEntry: {}", (Object)this.getKey());
        HashSet slowWork = new HashSet();
        ArrayList<WorkContainer<K, V>> workTaken = new ArrayList<WorkContainer<K, V>>();
        Iterator iterator = this.entries.entrySet().iterator();
        while (workTaken.size() < workToGetDelta && iterator.hasNext()) {
            WorkContainer workContainer = (WorkContainer)iterator.next().getValue();
            if (!this.pm.couldBeTakenAsWork(workContainer)) continue;
            if (workContainer.isAvailableToTakeAsWork()) {
                log.trace("Taking {} as work", (Object)workContainer);
                workContainer.onQueueingForExecution();
                workTaken.add(workContainer);
            } else {
                this.addToSlowWorkMaybe(slowWork, workContainer);
            }
            if (!this.isOrderRestricted()) continue;
            log.trace("Processing by {}, so have cannot get more messages on this ({}) shardEntry.", (Object)this.options.getOrdering(), (Object)this.getKey());
            break;
        }
        if (workTaken.size() == workToGetDelta) {
            log.trace("Work taken ({}) exceeds max ({})", (Object)workTaken.size(), (Object)workToGetDelta);
        }
        this.logSlowWork(slowWork);
        return workTaken;
    }

    private void logSlowWork(Set<WorkContainer<?, ?>> slowWork) {
        if (!slowWork.isEmpty()) {
            List slowTopics = slowWork.parallelStream().map(x -> x.getTopicPartition().toString()).distinct().collect(Collectors.toList());
            this.slowWarningRateLimit.performIfNotLimited(() -> log.warn("Warning: {} records in the queue have been waiting longer than {}s for following topics {}.", new Object[]{slowWork.size(), BackportUtils.toSeconds(this.options.getThresholdForTimeSpendInQueueWarning()), slowTopics}));
        }
    }

    private void addToSlowWorkMaybe(Set<WorkContainer<?, ?>> slowWork, WorkContainer<?, ?> workContainer) {
        String msgTemplate = "Can't take as work: Work ({}). Must all be true: Delay passed= {}. Is not in flight= {}. Has not succeeded already= {}. Time spent in execution queue: {}.";
        Duration timeInFlight = workContainer.getTimeInFlight();
        String msg = StringUtils.msg(msgTemplate, workContainer, workContainer.hasDelayPassed(), workContainer.isNotInFlight(), !workContainer.isUserFunctionSucceeded(), timeInFlight);
        Duration slowThreshold = this.options.getThresholdForTimeSpendInQueueWarning();
        if (JavaUtils.isGreaterThan(timeInFlight, slowThreshold)) {
            slowWork.add(workContainer);
            log.trace("Work has spent over " + slowThreshold + " in queue! " + msg);
        } else {
            log.trace(msg);
        }
    }

    private boolean isOrderRestricted() {
        return this.options.getOrdering() != ParallelConsumerOptions.ProcessingOrder.UNORDERED;
    }

    public ProcessingShard(ShardKey key, ParallelConsumerOptions<?, ?> options, PartitionStateManager<K, V> pm) {
        this.key = key;
        this.options = options;
        this.pm = pm;
    }

    public NavigableMap<Long, WorkContainer<K, V>> getEntries() {
        return this.entries;
    }

    private ShardKey getKey() {
        return this.key;
    }
}

