/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer;

import io.confluent.csid.utils.BackportUtils;
import io.confluent.parallelconsumer.DrainingCloseable;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.WorkManager;
import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniMaps;

public class BrokerPollSystem<K, V> {
    private static final Logger log = LoggerFactory.getLogger(BrokerPollSystem.class);
    private final Consumer<K, V> consumer;
    public ParallelEoSStreamProcessor.State state = ParallelEoSStreamProcessor.State.running;
    private Optional<Future<Boolean>> pollControlThreadFuture;
    private volatile boolean paused = false;
    private final ParallelEoSStreamProcessor<K, V> pc;
    private static Duration longPollTimeout = Duration.ofMillis(2000L);
    private final WorkManager<K, V> wm;

    public BrokerPollSystem(Consumer<K, V> consumer, WorkManager<K, V> wm, ParallelEoSStreamProcessor<K, V> pc) {
        this.consumer = consumer;
        this.wm = wm;
        this.pc = pc;
    }

    public void start() {
        this.pollControlThreadFuture = Optional.of(Executors.newSingleThreadExecutor().submit(this::controlLoop));
    }

    private boolean controlLoop() {
        Thread.currentThread().setName("broker-poll");
        log.trace("Broker poll control loop start");
        while (this.state != ParallelEoSStreamProcessor.State.closed) {
            log.trace("Loop: Poll broker");
            ConsumerRecords<K, V> polledRecords = this.pollBrokerForRecords();
            if (!polledRecords.isEmpty()) {
                log.trace("Loop: Register work");
                this.wm.registerWork(polledRecords);
                this.pc.notifyNewWorkRegistered();
            }
            switch (this.state) {
                case draining: {
                    this.doPause();
                    this.state = ParallelEoSStreamProcessor.State.closing;
                    break;
                }
                case closing: {
                    if (polledRecords.isEmpty()) {
                        this.doClose();
                        break;
                    }
                    log.info("Subscriptions are paused, but records are still being drained (count: {})", (Object)polledRecords.count());
                }
            }
        }
        log.trace("Broker poll thread returning true");
        return true;
    }

    private void doClose() {
        log.debug("Closing {}, first closing consumer...", (Object)this.getClass().getSimpleName());
        this.consumer.close(DrainingCloseable.DEFAULT_TIMEOUT);
        log.debug("Consumer closed.");
        this.state = ParallelEoSStreamProcessor.State.closed;
    }

    private ConsumerRecords<K, V> pollBrokerForRecords() {
        ConsumerRecords records;
        this.managePauseOfSubscription();
        Duration thisLongPollTimeout = this.state == ParallelEoSStreamProcessor.State.running ? longPollTimeout : Duration.ofMillis(1L);
        log.debug("Long polling broker with timeout {} seconds, might appear to sleep here if no data available on broker.", (Object)BackportUtils.toSeconds(thisLongPollTimeout));
        try {
            records = this.consumer.poll(thisLongPollTimeout);
            log.debug("Poll completed normally and returned {}...", (Object)records.count());
        }
        catch (WakeupException w) {
            log.warn("Awoken from poll. State? {}", (Object)this.state);
            records = new ConsumerRecords(UniMaps.of());
        }
        return records;
    }

    public void drain() {
        if (this.state != ParallelEoSStreamProcessor.State.draining) {
            log.debug("Poll system signaling to drain...");
            this.state = ParallelEoSStreamProcessor.State.draining;
            this.consumer.wakeup();
        }
    }

    private void doPause() {
        if (this.paused) {
            log.trace("Already paused");
        } else {
            this.paused = true;
            log.debug("Pausing subs");
            Set assignment = this.consumer.assignment();
            this.consumer.pause((Collection)assignment);
        }
    }

    public void closeAndWait() throws TimeoutException, ExecutionException {
        log.debug("Requesting broker polling system to close...");
        this.transitionToClosing();
        if (this.pollControlThreadFuture.isPresent()) {
            log.debug("Wait for loop to finish ending...");
            Future<Boolean> pollControlResult = this.pollControlThreadFuture.get();
            boolean interrupted = true;
            while (interrupted) {
                try {
                    Boolean pollShutdownSuccess = pollControlResult.get(DrainingCloseable.DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
                    interrupted = false;
                    if (pollShutdownSuccess.booleanValue()) continue;
                    log.warn("Broker poll control thread not closed cleanly.");
                }
                catch (InterruptedException e) {
                    log.debug("Interrupted", (Throwable)e);
                }
                catch (ExecutionException | TimeoutException e) {
                    log.error("Execution or timeout exception", (Throwable)e);
                    throw e;
                }
            }
        }
        log.debug("Broker poll system finished closing");
    }

    private void transitionToClosing() {
        this.state = ParallelEoSStreamProcessor.State.closing;
        this.consumer.wakeup();
    }

    private void managePauseOfSubscription() {
        if (this.shouldThrottle()) {
            this.doPause();
        } else {
            this.resumeIfPaused();
        }
    }

    private void resumeIfPaused() {
        if (this.paused) {
            log.debug("Resuming consumer");
            Set pausedTopics = this.consumer.paused();
            this.consumer.resume((Collection)pausedTopics);
            this.consumer.wakeup();
            this.paused = false;
        }
    }

    private boolean shouldThrottle() {
        return this.wm.shouldThrottle();
    }

    public static void setLongPollTimeout(Duration longPollTimeout) {
        BrokerPollSystem.longPollTimeout = longPollTimeout;
    }

    public static Duration getLongPollTimeout() {
        return longPollTimeout;
    }
}

