/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.internal;

import io.confluent.csid.utils.BackportUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.csid.utils.WallClock;
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.BrokerPollSystem;
import io.confluent.parallelconsumer.internal.ConsumerManager;
import io.confluent.parallelconsumer.internal.DrainingCloseable;
import io.confluent.parallelconsumer.internal.DynamicLoadFactor;
import io.confluent.parallelconsumer.internal.InternalRuntimeError;
import io.confluent.parallelconsumer.internal.OffsetCommitter;
import io.confluent.parallelconsumer.internal.ProducerManager;
import io.confluent.parallelconsumer.internal.RateLimiter;
import io.confluent.parallelconsumer.internal.State;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import java.io.Closeable;
import java.lang.reflect.Field;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.regex.Pattern;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public abstract class AbstractParallelEoSStreamProcessor<K, V>
implements ParallelConsumer<K, V>,
ConsumerRebalanceListener,
Closeable {
    private static final Logger log = LoggerFactory.getLogger(AbstractParallelEoSStreamProcessor.class);
    public static final String MDC_INSTANCE_ID = "pcId";
    private final ParallelConsumerOptions options;
    private WallClock clock = new WallClock();
    private static final int KAFKA_DEFAULT_AUTO_COMMIT_FREQUENCY = 5000;
    private Duration timeBetweenCommits = Duration.ofMillis(5000L);
    private Instant lastCommitCheckTime = Instant.now();
    private final Optional<ProducerManager<K, V>> producerManager;
    private final Consumer<K, V> consumer;
    private final ThreadPoolExecutor workerPool;
    private Optional<Future<Boolean>> controlThreadFuture = Optional.empty();
    protected WorkManager<K, V> wm;
    private final BlockingQueue<WorkContainer<K, V>> workMailBox = new LinkedBlockingQueue<WorkContainer<K, V>>();
    private final BrokerPollSystem<K, V> brokerPollSubsystem;
    private final List<Runnable> controlLoopHooks = new ArrayList<Runnable>();
    private Thread blockableControlThread;
    private final AtomicBoolean currentlyPollingWorkCompleteMailBox = new AtomicBoolean();
    private final OffsetCommitter committer;
    private final AtomicBoolean commitCommand = new AtomicBoolean(false);
    protected final DynamicLoadFactor dynamicExtraLoadFactor = new DynamicLoadFactor();
    private Exception failureReason;
    private State state = State.unused;
    private Optional<ConsumerRebalanceListener> usersConsumerRebalanceListener = Optional.empty();
    private int numberOfAssignedPartitions;
    private final RateLimiter queueStatsLimiter = new RateLimiter();
    private boolean lastWorkRequestWasFulfilled = false;
    private Optional<String> myId = Optional.empty();

    public boolean isClosedOrFailed() {
        boolean closed = this.state == State.closed;
        boolean doneOrCancelled = false;
        if (this.controlThreadFuture.isPresent()) {
            Future<Boolean> threadFuture = this.controlThreadFuture.get();
            doneOrCancelled = threadFuture.isDone() || threadFuture.isCancelled();
        }
        return closed || doneOrCancelled;
    }

    public Exception getFailureCause() {
        return this.failureReason;
    }

    public AbstractParallelEoSStreamProcessor(ParallelConsumerOptions newOptions) {
        Objects.requireNonNull(newOptions, "Options must be supplied");
        log.info("Confluent Parallel Consumer initialise... Options: {}", (Object)newOptions);
        this.options = newOptions;
        this.options.validate();
        this.consumer = this.options.getConsumer();
        this.checkGroupIdConfigured(this.consumer);
        this.checkNotSubscribed(this.consumer);
        this.checkAutoCommitIsDisabled(this.consumer);
        this.workerPool = this.setupWorkerPool(newOptions.getMaxConcurrency());
        this.wm = new WorkManager<K, V>(newOptions, this.consumer, this.dynamicExtraLoadFactor);
        ConsumerManager<K, V> consumerMgr = new ConsumerManager<K, V>(this.consumer);
        this.brokerPollSubsystem = new BrokerPollSystem<K, V>(consumerMgr, this.wm, this, newOptions);
        if (this.options.isProducerSupplied()) {
            this.producerManager = Optional.of(new ProducerManager(this.options.getProducer(), consumerMgr, this.wm, this.options));
            this.committer = this.options.isUsingTransactionalProducer() ? (OffsetCommitter)this.producerManager.get() : this.brokerPollSubsystem;
        } else {
            this.producerManager = Optional.empty();
            this.committer = this.brokerPollSubsystem;
        }
    }

    private void checkGroupIdConfigured(Consumer<K, V> consumer) {
        try {
            consumer.groupMetadata();
        }
        catch (RuntimeException e) {
            throw new IllegalArgumentException("Error validating Consumer configuration - no group metadata - missing a configured GroupId on your Consumer?", e);
        }
    }

    protected ThreadPoolExecutor setupWorkerPool(int poolSize) {
        ThreadFactory defaultFactory;
        try {
            defaultFactory = (ThreadFactory)InitialContext.doLookup(this.options.getManagedThreadFactory());
        }
        catch (NamingException e) {
            log.debug("Using Java SE Thread", (Throwable)e);
            defaultFactory = Executors.defaultThreadFactory();
        }
        ThreadFactory finalDefaultFactory = defaultFactory;
        ThreadFactory namingThreadFactory = r -> {
            Thread thread = finalDefaultFactory.newThread(r);
            String name = thread.getName();
            thread.setName("pc-" + name);
            return thread;
        };
        ThreadPoolExecutor.AbortPolicy rejectionHandler = new ThreadPoolExecutor.AbortPolicy();
        LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
        return new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, workQueue, namingThreadFactory, rejectionHandler);
    }

    private void checkNotSubscribed(Consumer<K, V> consumerToCheck) {
        if (consumerToCheck instanceof MockConsumer) {
            return;
        }
        Set subscription = consumerToCheck.subscription();
        Set assignment = consumerToCheck.assignment();
        if (!subscription.isEmpty() || !assignment.isEmpty()) {
            throw new IllegalStateException("Consumer subscription must be managed by this class. Use " + this.getClass().getName() + "#subcribe methods instead.");
        }
    }

    @Override
    public void subscribe(Collection<String> topics) {
        log.debug("Subscribing to {}", topics);
        this.consumer.subscribe(topics, (ConsumerRebalanceListener)this);
    }

    @Override
    public void subscribe(Pattern pattern) {
        log.debug("Subscribing to {}", (Object)pattern);
        this.consumer.subscribe(pattern, (ConsumerRebalanceListener)this);
    }

    @Override
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
        log.debug("Subscribing to {}", topics);
        this.usersConsumerRebalanceListener = Optional.of(callback);
        this.consumer.subscribe(topics, (ConsumerRebalanceListener)this);
    }

    @Override
    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
        log.debug("Subscribing to {}", (Object)pattern);
        this.usersConsumerRebalanceListener = Optional.of(callback);
        this.consumer.subscribe(pattern, (ConsumerRebalanceListener)this);
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        log.debug("Partitions revoked {}, state: {}", partitions, (Object)this.state);
        this.numberOfAssignedPartitions -= partitions.size();
        try {
            this.commitOffsetsThatAreReady();
            this.wm.onPartitionsRevoked(partitions);
            this.usersConsumerRebalanceListener.ifPresent(x -> x.onPartitionsRevoked(partitions));
        }
        catch (Exception e) {
            throw new InternalRuntimeError("onPartitionsRevoked event error", e);
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        this.numberOfAssignedPartitions += partitions.size();
        log.info("Assigned {} total ({} new) partition(s) {}", new Object[]{this.numberOfAssignedPartitions, partitions.size(), partitions});
        this.wm.onPartitionsAssigned(partitions);
        this.usersConsumerRebalanceListener.ifPresent(x -> x.onPartitionsAssigned(partitions));
        this.notifySomethingToDo();
    }

    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        this.numberOfAssignedPartitions -= partitions.size();
        this.wm.onPartitionsLost(partitions);
        this.usersConsumerRebalanceListener.ifPresent(x -> x.onPartitionsLost(partitions));
    }

    private void checkAutoCommitIsDisabled(Consumer<K, V> consumer) {
        if (consumer instanceof KafkaConsumer) {
            Field coordinatorField = consumer.getClass().getDeclaredField("coordinator");
            coordinatorField.setAccessible(true);
            ConsumerCoordinator coordinator = (ConsumerCoordinator)coordinatorField.get(consumer);
            if (coordinator == null) {
                throw new IllegalStateException("Coordinator for Consumer is null - missing GroupId? Reflection broken?");
            }
            Field autoCommitEnabledField = coordinator.getClass().getDeclaredField("autoCommitEnabled");
            autoCommitEnabledField.setAccessible(true);
            Boolean isAutoCommitEnabled = (Boolean)autoCommitEnabledField.get(coordinator);
            if (isAutoCommitEnabled.booleanValue()) {
                throw new IllegalArgumentException("Consumer auto commit must be disabled, as commits are handled by the library.");
            }
        }
    }

    @Override
    public void close() {
        Duration timeout = DrainingCloseable.DEFAULT_TIMEOUT.multipliedBy(2L);
        this.closeDontDrainFirst(timeout);
    }

    @Override
    public void close(Duration timeout, DrainingCloseable.DrainingMode drainMode) {
        if (this.state == State.closed) {
            log.info("Already closed, checking end state..");
        } else {
            log.info("Signaling to close...");
            switch (drainMode) {
                case DRAIN: {
                    log.info("Will wait for all in flight to complete before");
                    this.transitionToDraining();
                    break;
                }
                case DONT_DRAIN: {
                    log.info("Not waiting for in flight to complete, will transition directly to closing");
                    this.transitionToClosing();
                }
            }
            this.waitForClose(timeout);
        }
        if (this.controlThreadFuture.isPresent()) {
            log.debug("Checking for control thread exception...");
            Future<Boolean> future = this.controlThreadFuture.get();
            future.get(BackportUtils.toSeconds(timeout), TimeUnit.SECONDS);
        }
        log.info("Close complete.");
    }

    private void waitForClose(Duration timeout) throws TimeoutException, ExecutionException {
        log.info("Waiting on closed state...");
        while (!this.state.equals((Object)State.closed)) {
            try {
                Future<Boolean> booleanFuture = this.controlThreadFuture.get();
                log.debug("Blocking on control future");
                boolean signaled = booleanFuture.get(BackportUtils.toSeconds(timeout), TimeUnit.SECONDS);
                if (!signaled) {
                    throw new TimeoutException("Timeout waiting for system to close (" + timeout + ")");
                }
            }
            catch (InterruptedException e) {
                log.trace("Interrupted", (Throwable)e);
            }
            catch (ExecutionException | TimeoutException e) {
                log.error("Execution or timeout exception while waiting for the control thread to close cleanly (state was {}). Try increasing your time-out to allow the system to drain, or close without draining.", (Object)this.state, (Object)e);
                throw e;
            }
            log.trace("Still waiting for system to close...");
        }
    }

    private void doClose(Duration timeout) throws TimeoutException, ExecutionException {
        log.debug("Doing closing state: {}...", (Object)this.state);
        log.debug("Closing and waiting for broker poll system...");
        this.brokerPollSubsystem.closeAndWait();
        this.maybeCloseConsumer();
        this.producerManager.ifPresent(x -> x.close(timeout));
        log.debug("Shutting down execution pool...");
        List<Runnable> unfinished = this.workerPool.shutdownNow();
        if (!unfinished.isEmpty()) {
            log.warn("Threads not done count: {}", (Object)unfinished.size());
        }
        log.trace("Awaiting worker pool termination...");
        boolean interrupted = true;
        while (interrupted) {
            log.debug("Still interrupted");
            try {
                boolean terminationFinishedWithoutTimeout = this.workerPool.awaitTermination(BackportUtils.toSeconds(timeout), TimeUnit.SECONDS);
                interrupted = false;
                if (terminationFinishedWithoutTimeout) continue;
                log.warn("Thread execution pool termination await timeout ({})! Were any processing jobs dead locked or otherwise stuck?", (Object)timeout);
                boolean shutdown = this.workerPool.isShutdown();
                boolean bl = this.workerPool.isTerminated();
            }
            catch (InterruptedException e) {
                log.error("InterruptedException", (Throwable)e);
                interrupted = true;
            }
        }
        log.debug("Close complete.");
        this.state = State.closed;
    }

    private void maybeCloseConsumer() {
        if (this.isResponsibleForCommits()) {
            this.consumer.close();
        }
    }

    private boolean isResponsibleForCommits() {
        return this.committer instanceof ProducerManager;
    }

    public void waitForProcessedNotCommitted(Duration timeout) {
        log.debug("Waiting processed but not committed...");
        Timer timer = Time.SYSTEM.timer(timeout);
        while (this.wm.isRecordsAwaitingToBeCommitted()) {
            log.trace("Waiting for no in processing...");
            Thread.sleep(100L);
            timer.update();
            if (!timer.isExpired()) continue;
            throw new TimeoutException("Waiting for no more records in processing");
        }
        log.debug("No longer anything in flight.");
    }

    private boolean isRecordsAwaitingProcessing() {
        boolean isRecordsAwaitingProcessing = this.wm.isRecordsAwaitingProcessing();
        boolean threadsDone = this.areMyThreadsDone();
        log.trace("isRecordsAwaitingProcessing {} || threadsDone {}", (Object)isRecordsAwaitingProcessing, (Object)threadsDone);
        return isRecordsAwaitingProcessing || threadsDone;
    }

    private void transitionToDraining() {
        log.debug("Transitioning to draining...");
        this.state = State.draining;
        this.notifySomethingToDo();
    }

    private void interruptControlThread() {
        if (this.blockableControlThread != null) {
            log.debug("Interrupting {} thread in case it's waiting for work", (Object)this.blockableControlThread.getName());
            this.blockableControlThread.interrupt();
        }
    }

    private boolean areMyThreadsDone() {
        if (BackportUtils.isEmpty(this.controlThreadFuture)) {
            return false;
        }
        return this.controlThreadFuture.get().isDone();
    }

    protected <R> void supervisorLoop(Function<ConsumerRecord<K, V>, List<R>> userFunction, java.util.function.Consumer<R> callback) {
        ExecutorService executorService;
        log.info("Control loop starting up...");
        if (this.state != State.unused) {
            throw new IllegalStateException(StringUtils.msg("Invalid state - the consumer cannot be used more than once (current state is {})", new Object[]{this.state}));
        }
        this.state = State.running;
        Callable<Boolean> controlTask = () -> {
            Thread controlThread = Thread.currentThread();
            this.addInstanceMDC();
            controlThread.setName("pc-control");
            log.trace("Control task scheduled");
            this.blockableControlThread = controlThread;
            while (this.state != State.closed) {
                try {
                    this.controlLoop(userFunction, callback);
                }
                catch (Exception e) {
                    log.error("Error from poll control thread, will attempt controlled shutdown, then rethrow. Error: " + e.getMessage(), (Throwable)e);
                    this.doClose(DrainingCloseable.DEFAULT_TIMEOUT);
                    this.failureReason = new RuntimeException("Error from poll control thread: " + e.getMessage(), e);
                    throw this.failureReason;
                }
            }
            log.info("Control loop ending clean (state:{})...", (Object)this.state);
            return true;
        };
        this.brokerPollSubsystem.start(this.options.getManagedExecutorService());
        try {
            executorService = (ExecutorService)InitialContext.doLookup(this.options.getManagedExecutorService());
        }
        catch (NamingException e) {
            log.debug("Using Java SE Thread", (Throwable)e);
            executorService = Executors.newSingleThreadExecutor();
        }
        Future<Boolean> controlTaskFutureResult = executorService.submit(controlTask);
        this.controlThreadFuture = Optional.of(controlTaskFutureResult);
    }

    private void addInstanceMDC() {
        this.myId.ifPresent(id -> MDC.put((String)MDC_INSTANCE_ID, (String)id));
    }

    private <R> void controlLoop(Function<ConsumerRecord<K, V>, List<R>> userFunction, java.util.function.Consumer<R> callback) throws TimeoutException, ExecutionException, InterruptedException {
        int newWork = this.handleWork(userFunction, callback);
        if (this.state == State.running && !this.wm.isSufficientlyLoaded() & this.brokerPollSubsystem.isPaused()) {
            log.debug("Found Poller paused with not enough front loaded messages, ensuring poller is awake (mail: {} vs concurrency: {})", (Object)this.wm.getWorkQueuedInMailboxCount(), (Object)this.options.getMaxConcurrency());
            this.brokerPollSubsystem.wakeupIfPaused();
        }
        log.trace("Loop: Process mailbox");
        this.processWorkCompleteMailBox();
        if (this.state == State.running) {
            log.trace("Loop: Maybe commit");
            this.commitOffsetsMaybe();
        }
        log.trace("Loop: Running {} loop end plugin(s)", (Object)this.controlLoopHooks.size());
        this.controlLoopHooks.forEach(Runnable::run);
        log.trace("Current state: {}", (Object)this.state);
        switch (this.state) {
            case draining: {
                this.drain();
                break;
            }
            case closing: {
                this.doClose(DrainingCloseable.DEFAULT_TIMEOUT);
            }
        }
        this.brokerPollSubsystem.supervise();
        Duration duration = Duration.ofMillis(1L);
        try {
            Thread.sleep(duration.toMillis());
        }
        catch (InterruptedException e) {
            log.trace("Woke up", (Throwable)e);
        }
        log.trace("End of control loop, waiting processing {}, remaining in partition queues: {}, out for processing: {}. In state: {}", new Object[]{this.wm.getTotalWorkWaitingProcessing(), this.wm.getNumberOfEntriesInPartitionQueues(), this.wm.getNumberRecordsOutForProcessing(), this.state});
    }

    private <R> int handleWork(Function<ConsumerRecord<K, V>, List<R>> userFunction, java.util.function.Consumer<R> callback) {
        this.checkPressure();
        int gotWorkCount = 0;
        if (this.state == State.running || this.state == State.draining) {
            int delta = this.calculateQuantityToRequest();
            List<WorkContainer<K, V>> records = this.wm.maybeGetWork(delta);
            gotWorkCount = records.size();
            this.lastWorkRequestWasFulfilled = gotWorkCount >= delta;
            log.trace("Loop: Submit to pool");
            this.submitWorkToPool(userFunction, callback, records);
        }
        this.queueStatsLimiter.performIfNotLimited(() -> {
            int queueSize = this.getWorkerQueueSize();
            log.debug("Stats: \n- pool active: {} queued:{} \n- queue size: {} target: {} loading factor: {}", new Object[]{this.workerPool.getActiveCount(), queueSize, queueSize, this.getPoolQueueTarget(), this.dynamicExtraLoadFactor.getCurrentFactor()});
        });
        return gotWorkCount;
    }

    protected int calculateQuantityToRequest() {
        int target = this.getQueueTargetLoaded();
        BlockingQueue<Runnable> queue = this.workerPool.getQueue();
        int current = queue.size();
        int delta = target - current;
        log.debug("Loop: Will try to get work - target: {}, current queue size: {}, requesting: {}, loading factor: {}", new Object[]{target, current, delta, this.dynamicExtraLoadFactor.getCurrentFactor()});
        return delta;
    }

    private int getQueueTargetLoaded() {
        return this.getPoolQueueTarget() * this.dynamicExtraLoadFactor.getCurrentFactor();
    }

    protected void checkPressure() {
        boolean moreWorkInQueuesAvailableThatHaveNotBeenPulled;
        boolean bl = moreWorkInQueuesAvailableThatHaveNotBeenPulled = this.wm.getWorkQueuedInMailboxCount() > this.options.getMaxConcurrency();
        if (log.isTraceEnabled()) {
            log.trace("Queue pressure check: (current size: {}, loaded target: {}, factor: {}) if (isPoolQueueLow() {} && dynamicExtraLoadFactor.isWarmUpPeriodOver() {} && moreWorkInQueuesAvailableThatHaveNotBeenPulled {})", new Object[]{this.getWorkerQueueSize(), this.getQueueTargetLoaded(), this.dynamicExtraLoadFactor.getCurrentFactor(), this.isPoolQueueLow(), this.dynamicExtraLoadFactor.isWarmUpPeriodOver(), moreWorkInQueuesAvailableThatHaveNotBeenPulled});
        }
        if (this.isPoolQueueLow() && moreWorkInQueuesAvailableThatHaveNotBeenPulled && this.lastWorkRequestWasFulfilled) {
            boolean steppedUp = this.dynamicExtraLoadFactor.maybeStepUp();
            if (steppedUp) {
                log.debug("isPoolQueueLow(): Executor pool queue is not loaded with enough work (queue: {} vs target: {}), stepped up loading factor to {}", new Object[]{this.getWorkerQueueSize(), this.getPoolQueueTarget(), this.dynamicExtraLoadFactor.getCurrentFactor()});
            } else if (this.dynamicExtraLoadFactor.isMaxReached()) {
                log.warn("isPoolQueueLow(): Max loading factor steps reached: {}/{}", (Object)this.dynamicExtraLoadFactor.getCurrentFactor(), (Object)this.dynamicExtraLoadFactor.getMaxFactor());
            }
        }
    }

    private int getPoolQueueTarget() {
        return this.options.getMaxConcurrency();
    }

    private boolean isPoolQueueLow() {
        int queueTarget;
        int queueSize = this.getWorkerQueueSize();
        boolean workAmountBelowTarget = queueSize <= (queueTarget = this.getPoolQueueTarget());
        boolean hasWorkInMailboxes = this.wm.hasWorkInMailboxes();
        log.debug("workAmountBelowTarget {} {} vs {} && wm.hasWorkInMailboxes() {};", new Object[]{workAmountBelowTarget, queueSize, queueTarget, hasWorkInMailboxes});
        return workAmountBelowTarget && hasWorkInMailboxes;
    }

    private void drain() {
        log.debug("Signaling to drain...");
        this.brokerPollSubsystem.drain();
        if (!this.isRecordsAwaitingProcessing()) {
            this.transitionToClosing();
        } else {
            log.debug("Records still waiting processing, won't transition to closing.");
        }
    }

    private void transitionToClosing() {
        log.debug("Transitioning to closing...");
        this.state = this.state == State.unused ? State.closed : State.closing;
        this.notifySomethingToDo();
    }

    private void processWorkCompleteMailBox() {
        log.trace("Processing mailbox (might block waiting for results)...");
        HashSet<WorkContainer<K, V>> results = new HashSet<WorkContainer<K, V>>();
        Duration timeout = this.getTimeToNextCommitCheck();
        WorkContainer<K, V> firstBlockingPoll = null;
        try {
            boolean noWorkToDoAndStillRunning;
            boolean workAvailable = this.wm.hasWorkInMailboxes() && this.wm.isSystemIdle();
            boolean bl = noWorkToDoAndStillRunning = this.workMailBox.isEmpty() && this.state.equals((Object)State.running) && !workAvailable;
            if (noWorkToDoAndStillRunning) {
                if (log.isDebugEnabled()) {
                    log.debug("Blocking poll on work until next scheduled offset commit attempt for {}. active threads: {}, queue: {}", new Object[]{timeout, this.workerPool.getActiveCount(), this.getWorkerQueueSize()});
                }
                this.currentlyPollingWorkCompleteMailBox.getAndSet(true);
                log.trace("Blocking poll {}", (Object)timeout);
                firstBlockingPoll = this.workMailBox.poll(timeout.toMillis(), TimeUnit.MILLISECONDS);
                log.trace("Blocking poll finish");
                this.currentlyPollingWorkCompleteMailBox.getAndSet(false);
            } else {
                firstBlockingPoll = (WorkContainer<K, V>)this.workMailBox.poll();
            }
        }
        catch (InterruptedException e) {
            log.debug("Interrupted waiting on work results");
        }
        if (firstBlockingPoll == null) {
            log.debug("Mailbox results returned null, indicating timeout (which was set as {}) or interruption during a blocking wait for returned work results", (Object)timeout);
        } else {
            results.add(firstBlockingPoll);
        }
        int size = this.workMailBox.size();
        log.trace("Draining {} more, got {} already...", (Object)size, (Object)results.size());
        this.workMailBox.drainTo(results, size);
        log.trace("Processing drained work {}...", (Object)results.size());
        for (WorkContainer workContainer : results) {
            MDC.put((String)"offset", (String)workContainer.toString());
            this.wm.handleFutureResult(workContainer);
            MDC.clear();
        }
    }

    private void commitOffsetsMaybe() {
        boolean shouldCommitNow;
        Duration elapsedSinceLast = this.getTimeSinceLastCommit();
        boolean commitFrequencyOK = elapsedSinceLast.compareTo(this.timeBetweenCommits) > 0;
        boolean lingerBeneficial = this.lingeringOnCommitWouldBeBeneficial();
        boolean commitCommand = this.isCommandedToCommit();
        boolean bl = shouldCommitNow = commitFrequencyOK || !lingerBeneficial || commitCommand;
        if (shouldCommitNow) {
            log.debug("commitFrequencyOK {} || !lingerBeneficial {} || commitCommand {}", new Object[]{commitFrequencyOK, !lingerBeneficial, commitCommand});
            this.commitOffsetsThatAreReady();
        } else if (log.isDebugEnabled()) {
            if (this.wm.hasCommittableOffsets()) {
                log.debug("Have offsets to commit, but not enough time elapsed ({}), waiting for at least {}...", (Object)elapsedSinceLast, (Object)this.timeBetweenCommits);
            } else {
                log.trace("Could commit now, but no offsets committable");
            }
        }
    }

    private int getWorkerQueueSize() {
        return this.workerPool.getQueue().size();
    }

    private boolean lingeringOnCommitWouldBeBeneficial() {
        boolean workIsWaitingToBeCompletedSuccessfully = this.wm.workIsWaitingToBeProcessed();
        boolean workInFlight = this.wm.hasWorkInFlight();
        boolean workWaitingInMailbox = !this.workMailBox.isEmpty();
        boolean workWaitingToCommit = this.wm.hasWorkInCommitQueues();
        log.trace("workIsWaitingToBeCompletedSuccessfully {} || workInFlight {} || workWaitingInMailbox {} || !workWaitingToCommit {};", new Object[]{workIsWaitingToBeCompletedSuccessfully, workInFlight, workWaitingInMailbox, !workWaitingToCommit});
        return workIsWaitingToBeCompletedSuccessfully || workInFlight || workWaitingInMailbox || !workWaitingToCommit;
    }

    private Duration getTimeToNextCommitCheck() {
        if (this.state == State.running || this.state == State.draining) {
            Duration timeSinceLastCommit = this.getTimeSinceLastCommit();
            Duration timeBetweenCommits = this.getTimeBetweenCommits();
            Duration minus = timeBetweenCommits.minus(timeSinceLastCommit);
            return minus;
        }
        log.debug("System not {} (state: {}), so don't wait to commit, only a small thread yield time", (Object)State.running, (Object)this.state);
        return Duration.ZERO;
    }

    private Duration getTimeSinceLastCommit() {
        Instant now = this.clock.getNow();
        return Duration.between(this.lastCommitCheckTime, now);
    }

    private void commitOffsetsThatAreReady() {
        if (this.wm.isClean()) {
            log.debug("Nothing changed since last commit, skipping");
            return;
        }
        this.committer.retrieveOffsetsAndCommit();
        this.updateLastCommitCheckTime();
    }

    private void updateLastCommitCheckTime() {
        this.lastCommitCheckTime = Instant.now();
    }

    protected <R> void submitWorkToPool(Function<ConsumerRecord<K, V>, List<R>> usersFunction, java.util.function.Consumer<R> callback, List<WorkContainer<K, V>> workToProcess) {
        if (!workToProcess.isEmpty()) {
            log.debug("New work incoming: {}, Pool stats: {}", (Object)workToProcess.size(), (Object)this.workerPool);
            for (WorkContainer<K, V> work : workToProcess) {
                log.trace("Sending work ({}) to pool", work);
                Future<List<?>> outputRecordFuture = this.workerPool.submit(() -> {
                    this.addInstanceMDC();
                    return this.userFunctionRunner(usersFunction, callback, work);
                });
                work.setFuture(outputRecordFuture);
            }
        }
    }

    protected <R> List<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> userFunctionRunner(Function<ConsumerRecord<K, V>, List<R>> usersFunction, java.util.function.Consumer<R> callback, WorkContainer<K, V> wc) {
        try {
            MDC.put((String)"offset", (String)wc.toString());
            boolean epochIsStale = this.wm.checkEpochIsStale(wc);
            if (epochIsStale) {
                log.debug("Pool found work from old generation of assigned work, skipping message as epoch doesn't match current {}", wc);
                return null;
            }
            log.trace("Pool received: {}", wc);
            ConsumerRecord<K, V> rec = wc.getCr();
            List<R> resultsFromUserFunction = usersFunction.apply(rec);
            this.onUserFunctionSuccess(wc, resultsFromUserFunction);
            ArrayList<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> intermediateResults = new ArrayList<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>>();
            for (R result : resultsFromUserFunction) {
                log.trace("Running users call back...");
                callback.accept(result);
            }
            log.trace("User function future registered");
            this.addToMailBoxOnUserFunctionSuccess(wc, resultsFromUserFunction);
            return intermediateResults;
        }
        catch (Exception e) {
            log.error("Exception caught in user function running stage, registering WC as failed, returning to mailbox", (Throwable)e);
            wc.onUserFunctionFailure();
            this.addToMailbox(wc);
            throw e;
        }
    }

    protected void addToMailBoxOnUserFunctionSuccess(WorkContainer<K, V> wc, List<?> resultsFromUserFunction) {
        this.addToMailbox(wc);
    }

    protected void onUserFunctionSuccess(WorkContainer<K, V> wc, List<?> resultsFromUserFunction) {
        log.trace("User function success");
        wc.onUserFunctionSuccess();
    }

    protected void addToMailbox(WorkContainer<K, V> wc) {
        log.trace("Adding {} to mailbox...", wc);
        this.workMailBox.add(wc);
    }

    public void notifySomethingToDo() {
        if (this.currentlyPollingWorkCompleteMailBox.get()) {
            boolean noTransactionInProgress;
            boolean bl = noTransactionInProgress = this.producerManager.map(ProducerManager::isTransactionInProgress).orElse(false) == false;
            if (noTransactionInProgress) {
                log.trace("Interrupting control thread: Knock knock, wake up! You've got mail (tm)!");
                this.interruptControlThread();
            } else {
                log.trace("Would have interrupted control thread, but TX in progress");
            }
        } else {
            log.trace("Work box not being polled currently, so thread not blocked, will come around to the bail box in the next looop.");
        }
    }

    @Override
    public long workRemaining() {
        return this.wm.getNumberOfEntriesInPartitionQueues();
    }

    public void addLoopEndCallBack(Runnable r) {
        this.controlLoopHooks.add(r);
    }

    public void setLongPollTimeout(Duration ofMillis) {
        BrokerPollSystem.setLongPollTimeout(ofMillis);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void requestCommitAsap() {
        log.debug("Registering command to commit next chance");
        AtomicBoolean atomicBoolean = this.commitCommand;
        synchronized (atomicBoolean) {
            this.commitCommand.set(true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isCommandedToCommit() {
        AtomicBoolean atomicBoolean = this.commitCommand;
        synchronized (atomicBoolean) {
            boolean commitAsap = this.commitCommand.get();
            if (commitAsap) {
                log.debug("Command to commit asap received, clearing");
                this.commitCommand.set(false);
            }
            return commitAsap;
        }
    }

    protected ParallelConsumerOptions getOptions() {
        return this.options;
    }

    void setClock(WallClock clock) {
        this.clock = clock;
    }

    public void setTimeBetweenCommits(Duration timeBetweenCommits) {
        this.timeBetweenCommits = timeBetweenCommits;
    }

    public Duration getTimeBetweenCommits() {
        return this.timeBetweenCommits;
    }

    protected Optional<ProducerManager<K, V>> getProducerManager() {
        return this.producerManager;
    }

    public WorkManager<K, V> getWm() {
        return this.wm;
    }

    protected BlockingQueue<WorkContainer<K, V>> getWorkMailBox() {
        return this.workMailBox;
    }

    public int getNumberOfAssignedPartitions() {
        return this.numberOfAssignedPartitions;
    }

    public void setMyId(Optional<String> myId) {
        this.myId = myId;
    }

    public Optional<String> getMyId() {
        return this.myId;
    }
}

