/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.internal;

import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.ConsumerManager;
import io.confluent.parallelconsumer.internal.ConsumerOffsetCommitter;
import io.confluent.parallelconsumer.internal.DrainingCloseable;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.internal.InternalRuntimeError;
import io.confluent.parallelconsumer.internal.OffsetCommitter;
import io.confluent.parallelconsumer.internal.RateLimiter;
import io.confluent.parallelconsumer.internal.State;
import io.confluent.parallelconsumer.state.WorkManager;
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class BrokerPollSystem<K, V>
implements OffsetCommitter {
    private static final Logger log = LoggerFactory.getLogger(BrokerPollSystem.class);
    private final ConsumerManager<K, V> consumerManager;
    private State state = State.running;
    private Optional<Future<Boolean>> pollControlThreadFuture;
    private volatile boolean paused = false;
    private final AbstractParallelEoSStreamProcessor<K, V> pc;
    private Optional<ConsumerOffsetCommitter<K, V>> committer = Optional.empty();
    private static Duration longPollTimeout = Duration.ofMillis(2000L);
    private final WorkManager<K, V> wm;
    private final RateLimiter pauseLimiter = new RateLimiter(1);

    public BrokerPollSystem(ConsumerManager<K, V> consumerMgr, WorkManager<K, V> wm, AbstractParallelEoSStreamProcessor<K, V> pc, ParallelConsumerOptions<K, V> options) {
        this.wm = wm;
        this.pc = pc;
        this.consumerManager = consumerMgr;
        switch (options.getCommitMode()) {
            case PERIODIC_CONSUMER_SYNC: 
            case PERIODIC_CONSUMER_ASYNCHRONOUS: {
                ConsumerOffsetCommitter<K, V> consumerCommitter = new ConsumerOffsetCommitter<K, V>(consumerMgr, wm, options);
                this.committer = Optional.of(consumerCommitter);
            }
        }
    }

    public void start(String managedExecutorService) {
        ExecutorService executorService;
        try {
            executorService = (ExecutorService)InitialContext.doLookup(managedExecutorService);
        }
        catch (NamingException e) {
            log.debug("Couldn't look up an execution service, falling back to Java SE Thread", (Throwable)e);
            executorService = Executors.newSingleThreadExecutor();
        }
        Future<Boolean> submit = executorService.submit(this::controlLoop);
        this.pollControlThreadFuture = Optional.of(submit);
    }

    public void supervise() {
        Future<Boolean> booleanFuture;
        if (this.pollControlThreadFuture.isPresent() && ((booleanFuture = this.pollControlThreadFuture.get()).isCancelled() || booleanFuture.isDone())) {
            try {
                booleanFuture.get();
            }
            catch (Exception e) {
                throw new InternalRuntimeError("Error in " + BrokerPollSystem.class.getSimpleName() + " system.", e);
            }
        }
    }

    private boolean controlLoop() {
        Thread.currentThread().setName("pc-broker-poll");
        this.pc.getMyId().ifPresent(id -> MDC.put((String)"pcId", (String)id));
        log.trace("Broker poll control loop start");
        this.committer.ifPresent(x -> x.claim());
        try {
            while (this.state != State.closed) {
                this.handlePoll();
                this.maybeDoCommit();
                switch (this.state) {
                    case draining: {
                        this.doPause();
                        break;
                    }
                    case closing: {
                        this.doClose();
                    }
                }
            }
            log.debug("Broker poller thread finished normally, returning OK (true) to future...");
            return true;
        }
        catch (Exception e) {
            log.error("Unknown error", (Throwable)e);
            throw e;
        }
    }

    private void handlePoll() {
        log.trace("Loop: Broker poller: ({})", (Object)this.state);
        if (this.state == State.running || this.state == State.draining) {
            EpochAndRecordsMap<K, V> polledRecords = this.pollBrokerForRecords();
            int count = polledRecords.count();
            log.debug("Got {} records in poll result", (Object)count);
            if (count > 0) {
                log.trace("Loop: Register work");
                this.pc.registerWork(polledRecords);
            }
        }
    }

    private void doClose() {
        log.debug("Doing close...");
        this.doPause();
        this.maybeCloseConsumerManager();
        this.state = State.closed;
    }

    private void maybeCloseConsumerManager() {
        if (this.isResponsibleForCommits()) {
            log.debug("Closing {}, first closing consumer...", (Object)this.getClass().getSimpleName());
            this.consumerManager.close(DrainingCloseable.DEFAULT_TIMEOUT);
            log.debug("Consumer closed.");
        }
    }

    private boolean isResponsibleForCommits() {
        return this.committer.isPresent();
    }

    private EpochAndRecordsMap<K, V> pollBrokerForRecords() {
        this.managePauseOfSubscription();
        log.debug("Subscriptions are paused: {}", (Object)this.paused);
        boolean pollTimeoutNormally = this.state == State.running || this.state == State.draining;
        Duration thisLongPollTimeout = pollTimeoutNormally ? longPollTimeout : Duration.ofMillis(1L);
        log.debug("Long polling broker with timeout {}, might appear to sleep here if subs are paused, or no data available on broker. Run state: {}", (Object)thisLongPollTimeout, (Object)this.state);
        ConsumerRecords<K, V> poll = this.consumerManager.poll(thisLongPollTimeout);
        log.debug("Poll completed");
        return new EpochAndRecordsMap<K, V>(poll, this.wm.getPm());
    }

    public void drain() {
        if (this.state != State.draining) {
            log.debug("Signaling poll system to drain, waking up consumer...");
            this.state = State.draining;
            this.consumerManager.wakeup();
        }
    }

    private void doPauseMaybe() {
        if (this.paused) {
            log.trace("Already paused");
        } else if (this.pauseLimiter.couldPerform()) {
            this.pauseLimiter.performIfNotLimited(() -> this.doPause());
        } else if (log.isDebugEnabled()) {
            log.debug("Should pause but pause rate limit exceeded {} vs {}.", (Object)this.pauseLimiter.getElapsedDuration(), (Object)this.pauseLimiter.getRate());
        }
    }

    private void doPause() {
        if (!this.paused) {
            this.paused = true;
            log.debug("Pausing subs");
            Set<TopicPartition> assignment = this.consumerManager.assignment();
            this.consumerManager.pause(assignment);
        } else {
            log.debug("Already paused, skipping");
        }
    }

    public void closeAndWait() throws TimeoutException, ExecutionException {
        log.debug("Requesting broker polling system to close...");
        this.transitionToClosing();
        if (this.pollControlThreadFuture.isPresent()) {
            log.debug("Wait for loop to finish ending...");
            Future<Boolean> pollControlResult = this.pollControlThreadFuture.get();
            boolean interrupted = true;
            while (interrupted) {
                try {
                    Boolean pollShutdownSuccess = pollControlResult.get(DrainingCloseable.DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
                    interrupted = false;
                    if (pollShutdownSuccess.booleanValue()) continue;
                    log.warn("Broker poll control thread not closed cleanly.");
                }
                catch (InterruptedException e) {
                    log.debug("Interrupted waiting for broker poller thread to finish", (Throwable)e);
                }
                catch (ExecutionException | TimeoutException e) {
                    log.error("Execution or timeout exception waiting for broker poller thread to finish", (Throwable)e);
                    throw e;
                }
            }
        }
        log.debug("Broker poll system finished closing");
    }

    private void transitionToClosing() {
        log.debug("Poller transitioning to closing, waking up consumer");
        this.state = State.closing;
        this.consumerManager.wakeup();
    }

    private void managePauseOfSubscription() {
        boolean throttle = this.shouldThrottle();
        log.trace("Need to throttle: {}", (Object)throttle);
        if (throttle) {
            this.doPauseMaybe();
        } else {
            this.resumeIfPaused();
        }
    }

    private void resumeIfPaused() {
        if (this.paused) {
            log.debug("Resuming consumer, waking up");
            Set<TopicPartition> pausedTopics = this.consumerManager.paused();
            this.consumerManager.resume(pausedTopics);
            this.consumerManager.wakeup();
            this.paused = false;
        }
    }

    private boolean shouldThrottle() {
        return this.wm.shouldThrottle();
    }

    @Override
    public void retrieveOffsetsAndCommit() {
        if (this.state != State.running && this.state != State.draining && this.state != State.closing) {
            throw new IllegalStateException(StringUtils.msg("Can't commit - not running (state: {}", new Object[]{this.state}));
        }
        ConsumerOffsetCommitter<K, V> committer = this.committer.orElseThrow(() -> {
            throw new IllegalStateException("No committer configured");
        });
        committer.commit();
    }

    private void maybeDoCommit() {
        this.committer.ifPresent(ConsumerOffsetCommitter::maybeDoCommit);
    }

    public void wakeupIfPaused() {
        if (this.paused) {
            this.consumerManager.wakeup();
        }
    }

    public boolean isPaused() {
        return this.paused;
    }

    public static void setLongPollTimeout(Duration longPollTimeout) {
        BrokerPollSystem.longPollTimeout = longPollTimeout;
    }

    public static Duration getLongPollTimeout() {
        return longPollTimeout;
    }
}

