/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer;

import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniMaps;

public class ConsumerManager<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ConsumerManager.class);
    private final Consumer<K, V> consumer;
    private final AtomicBoolean pollingBroker = new AtomicBoolean(false);
    private int erroneousWakups = 0;
    private int correctPollWakeups = 0;
    private int noWakeups = 0;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    ConsumerRecords<K, V> poll(Duration thisLongPollTimeout) {
        ConsumerRecords records;
        try {
            this.pollingBroker.set(true);
            records = this.consumer.poll(thisLongPollTimeout);
            log.debug("Poll completed normally and returned {}...", (Object)records.count());
        }
        catch (WakeupException w) {
            ++this.correctPollWakeups;
            log.debug("Awoken from broker poll", (Throwable)w);
            records = new ConsumerRecords(UniMaps.of());
        }
        finally {
            this.pollingBroker.set(false);
        }
        return records;
    }

    public void wakeup() {
        if (this.pollingBroker.get()) {
            log.debug("Waking up consumer");
            this.consumer.wakeup();
        }
    }

    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsetsToSend) {
        boolean inProgress = true;
        ++this.noWakeups;
        while (inProgress) {
            try {
                this.consumer.commitSync(offsetsToSend);
                inProgress = false;
            }
            catch (WakeupException w) {
                log.debug("Got woken up, retry. errors: " + this.erroneousWakups + " none: " + this.noWakeups + " correct:" + this.correctPollWakeups, (Throwable)w);
                ++this.erroneousWakups;
            }
        }
    }

    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        boolean inProgress = true;
        ++this.noWakeups;
        while (inProgress) {
            try {
                this.consumer.commitAsync(offsets, callback);
                inProgress = false;
            }
            catch (WakeupException w) {
                log.debug("Got woken up, retry. errors: " + this.erroneousWakups + " none: " + this.noWakeups + " correct:" + this.correctPollWakeups, (Throwable)w);
                ++this.erroneousWakups;
            }
        }
    }

    public ConsumerGroupMetadata groupMetadata() {
        return this.consumer.groupMetadata();
    }

    public void close(Duration defaultTimeout) {
        this.consumer.close(defaultTimeout);
    }

    public Set<TopicPartition> assignment() {
        return this.consumer.assignment();
    }

    public void pause(Set<TopicPartition> assignment) {
        this.consumer.pause(assignment);
    }

    public Set<TopicPartition> paused() {
        return this.consumer.paused();
    }

    public void resume(Set<TopicPartition> pausedTopics) {
        this.consumer.resume(pausedTopics);
    }

    public ConsumerManager(Consumer<K, V> consumer) {
        this.consumer = consumer;
    }
}

