/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.state;

import io.confluent.csid.utils.LoopingResumingIterator;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.state.WorkContainer;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.TreeMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShardManager<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ShardManager.class);
    private final ParallelConsumerOptions options;
    private final Map<Object, NavigableMap<Long, WorkContainer<K, V>>> processingShards = new HashMap<Object, NavigableMap<Long, WorkContainer<K, V>>>();

    private Map<Object, NavigableMap<Long, WorkContainer<K, V>>> getShards() {
        return this.processingShards;
    }

    NavigableMap<Long, WorkContainer<K, V>> getShard(Object key) {
        return this.processingShards.get(key);
    }

    LoopingResumingIterator<Object, NavigableMap<Long, WorkContainer<K, V>>> getIterator(Optional<Object> iterationResumePoint) {
        return new LoopingResumingIterator<Object, NavigableMap<Long, WorkContainer<K, V>>>(iterationResumePoint, this.getShards());
    }

    Object computeShardKey(ConsumerRecord<K, V> rec) {
        Object object;
        switch (this.options.getOrdering()) {
            case KEY: {
                object = rec.key();
                break;
            }
            case PARTITION: 
            case UNORDERED: {
                object = new TopicPartition(rec.topic(), rec.partition());
                break;
            }
            default: {
                throw new IncompatibleClassChangeError();
            }
        }
        return object;
    }

    public WorkContainer<K, V> getWorkContainerForRecord(ConsumerRecord<K, V> rec) {
        Object key = this.computeShardKey(rec);
        NavigableMap<Long, WorkContainer<K, V>> longWorkContainerTreeMap = this.processingShards.get(key);
        long offset = rec.offset();
        WorkContainer wc = (WorkContainer)longWorkContainerTreeMap.get(offset);
        return wc;
    }

    public int getWorkQueuedInShardsCount() {
        int count = 0;
        for (Map.Entry<Object, NavigableMap<Long, WorkContainer<K, V>>> e : this.processingShards.entrySet()) {
            count += e.getValue().size();
        }
        return count;
    }

    public boolean workIsWaitingToBeProcessed() {
        Collection<NavigableMap<Long, WorkContainer<K, V>>> values = this.processingShards.values();
        for (NavigableMap<Long, WorkContainer<K, V>> value : values) {
            if (value.isEmpty()) continue;
            return true;
        }
        return false;
    }

    void removeShardsFoundIn(NavigableMap<Long, WorkContainer<K, V>> oldWorkPartitionQueue) {
        for (WorkContainer work : oldWorkPartitionQueue.values()) {
            this.removeWorkFromShard(work);
        }
    }

    void removeWorkFromShard(WorkContainer<K, V> work) {
        Object shardKey = this.computeShardKey(work.getCr());
        log.debug("Removing expired work {} for shard key: {}", work, shardKey);
        this.processingShards.remove(shardKey);
    }

    public void addWorkContainer(WorkContainer<K, V> wc) {
        Object shardKey = this.computeShardKey(wc.getCr());
        this.processingShards.computeIfAbsent(shardKey, ignore -> new TreeMap()).put(wc.offset(), wc);
    }

    void removeShard(Object key) {
        this.getShards().remove(key);
    }

    public ShardManager(ParallelConsumerOptions options) {
        this.options = options;
    }

    public ParallelConsumerOptions getOptions() {
        return this.options;
    }
}

