/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.internal;

import io.confluent.csid.utils.BackportUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.csid.utils.SupplierUtils;
import io.confluent.csid.utils.TimeUtils;
import io.confluent.parallelconsumer.ExceptionInUserFunctionException;
import io.confluent.parallelconsumer.PCRetriableException;
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelConsumerException;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.PollContextInternal;
import io.confluent.parallelconsumer.internal.BrokerPollSystem;
import io.confluent.parallelconsumer.internal.DrainingCloseable;
import io.confluent.parallelconsumer.internal.DynamicLoadFactor;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.internal.InternalRuntimeException;
import io.confluent.parallelconsumer.internal.OffsetCommitter;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.internal.ProducerManager;
import io.confluent.parallelconsumer.internal.RateLimiter;
import io.confluent.parallelconsumer.internal.State;
import io.confluent.parallelconsumer.metrics.PCMetrics;
import io.confluent.parallelconsumer.metrics.PCMetricsDef;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import java.io.Closeable;
import java.lang.reflect.Field;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public abstract class AbstractParallelEoSStreamProcessor<K, V>
implements ParallelConsumer<K, V>,
ConsumerRebalanceListener,
Closeable {
    private static final Logger log = LoggerFactory.getLogger(AbstractParallelEoSStreamProcessor.class);
    public static final String MDC_INSTANCE_ID = "pcId";
    private static final String MDC_WORK_CONTAINER_DESCRIPTOR = "offset";
    public static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30L);
    public static final Duration GRACE_PERIOD_FOR_OVERALL_SHUTDOWN = Duration.ofSeconds(10L);
    protected final ParallelConsumerOptions<K, V> options;
    private Clock clock = TimeUtils.getClock();
    private Instant lastCommitCheckTime = Instant.now();
    private final Optional<ProducerManager<K, V>> producerManager;
    private final Consumer<K, V> consumer;
    protected final Supplier<ThreadPoolExecutor> workerThreadPool;
    private Optional<Future<Boolean>> controlThreadFuture = Optional.empty();
    protected WorkManager<K, V> wm;
    private final BlockingQueue<ControllerEventMessage<K, V>> workMailBox = new LinkedBlockingQueue<ControllerEventMessage<K, V>>();
    private final AtomicBoolean isRebalanceInProgress = new AtomicBoolean(false);
    private final BrokerPollSystem<K, V> brokerPollSubsystem;
    private final List<Runnable> controlLoopHooks = new ArrayList<Runnable>();
    private Thread blockableControlThread;
    private final AtomicBoolean currentlyPollingWorkCompleteMailBox = new AtomicBoolean();
    private final AtomicBoolean awaitingInflightProcessingCompletionOnShutdown = new AtomicBoolean();
    private final OffsetCommitter committer;
    private final AtomicBoolean commitCommand = new AtomicBoolean(false);
    protected final DynamicLoadFactor dynamicExtraLoadFactor;
    private Exception failureReason;
    private Instant lastCommitTime;
    private State state = State.UNUSED;
    private Optional<ConsumerRebalanceListener> usersConsumerRebalanceListener = Optional.empty();
    private int numberOfAssignedPartitions;
    private final RateLimiter queueStatsLimiter = new RateLimiter();
    PCModule<K, V> module;
    private boolean lastWorkRequestWasFulfilled = false;
    private Timer userProcessingTimer;
    private Gauge loadFactorGauge;
    private Gauge statusGauge;
    private Duration shutdownTimeout;
    private Duration drainTimeout;
    private PCMetrics pcMetrics;
    private Optional<String> myId = Optional.empty();

    @Deprecated
    public void setTimeBetweenCommits(Duration timeBetweenCommits) {
        this.options.setCommitInterval(timeBetweenCommits);
    }

    @Deprecated
    public Duration getTimeBetweenCommits() {
        return this.options.getCommitInterval();
    }

    @Override
    public boolean isClosedOrFailed() {
        boolean closed = this.state == State.CLOSED;
        boolean doneOrCancelled = false;
        if (this.controlThreadFuture.isPresent()) {
            Future<Boolean> threadFuture = this.controlThreadFuture.get();
            doneOrCancelled = threadFuture.isDone() || threadFuture.isCancelled();
        }
        return closed || doneOrCancelled;
    }

    public Exception getFailureCause() {
        return this.failureReason;
    }

    protected AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOptions) {
        this(newOptions, new PCModule<K, V>(newOptions));
    }

    protected AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOptions, PCModule<K, V> module) {
        Objects.requireNonNull(newOptions, "Options must be supplied");
        this.module = module;
        this.options = newOptions;
        this.shutdownTimeout = this.options.getShutdownTimeout();
        this.drainTimeout = this.options.getDrainTimeout();
        this.consumer = this.options.getConsumer();
        this.validateConfiguration();
        module.setParallelEoSStreamProcessor(this);
        log.info("Confluent Parallel Consumer initialise... groupId: {}, Options: {}", (Object)newOptions.getConsumer().groupMetadata().groupId(), newOptions);
        this.pcMetrics = module.pcMetrics();
        this.dynamicExtraLoadFactor = module.dynamicExtraLoadFactor();
        this.workerThreadPool = SupplierUtils.memoize(() -> this.setupWorkerPool(newOptions.getMaxConcurrency()));
        this.wm = module.workManager();
        this.brokerPollSubsystem = module.brokerPoller(this);
        if (this.options.isProducerSupplied()) {
            this.producerManager = Optional.of(module.producerManager());
            this.committer = this.options.isUsingTransactionalProducer() ? (OffsetCommitter)this.producerManager.get() : this.brokerPollSubsystem;
        } else {
            this.producerManager = Optional.empty();
            this.committer = this.brokerPollSubsystem;
        }
        this.initMetrics();
    }

    private void initMetrics() {
        this.userProcessingTimer = this.pcMetrics.getTimerFromMetricDef(PCMetricsDef.USER_FUNCTION_PROCESSING_TIME, new Tag[0]);
        this.loadFactorGauge = this.pcMetrics.gaugeFromMetricDef(PCMetricsDef.DYNAMIC_EXTRA_LOAD_FACTOR, this.dynamicExtraLoadFactor, DynamicLoadFactor::getCurrentFactor, new Tag[0]);
        this.statusGauge = this.pcMetrics.gaugeFromMetricDef(PCMetricsDef.PC_STATUS, this, pc -> pc.state.getValue(), new Tag[0]);
        new ExecutorServiceMetrics((ExecutorService)this.getWorkerThreadPool().get(), "pc-user-function-executor", "pc.user.function.", this.pcMetrics.getCommonTags()).bindTo(this.pcMetrics.getMeterRegistry());
    }

    private void validateConfiguration() {
        this.options.validate();
        this.checkGroupIdConfigured(this.consumer);
        this.checkNotSubscribed(this.consumer);
        this.checkAutoCommitIsDisabled(this.consumer);
    }

    private void checkGroupIdConfigured(Consumer<K, V> consumer) {
        try {
            consumer.groupMetadata();
        }
        catch (RuntimeException e) {
            throw new IllegalArgumentException("Error validating Consumer configuration - no group metadata - missing a configured GroupId on your Consumer?", e);
        }
    }

    protected ThreadPoolExecutor setupWorkerPool(int poolSize) {
        ThreadFactory defaultFactory;
        try {
            defaultFactory = (ThreadFactory)InitialContext.doLookup(this.options.getManagedThreadFactory());
        }
        catch (NamingException e) {
            log.debug("Using Java SE Thread", (Throwable)e);
            defaultFactory = Executors.defaultThreadFactory();
        }
        ThreadFactory finalDefaultFactory = defaultFactory;
        ThreadFactory namingThreadFactory = r -> {
            Thread thread = finalDefaultFactory.newThread(r);
            String name = thread.getName();
            thread.setName("pc-" + name);
            this.getMyId().ifPresent(id -> thread.setName("pc-" + name + "-" + id));
            return thread;
        };
        ThreadPoolExecutor.AbortPolicy rejectionHandler = new ThreadPoolExecutor.AbortPolicy();
        LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
        return new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, workQueue, namingThreadFactory, rejectionHandler);
    }

    private void checkNotSubscribed(Consumer<K, V> consumerToCheck) {
        if (consumerToCheck instanceof MockConsumer) {
            return;
        }
        Set subscription = consumerToCheck.subscription();
        Set assignment = consumerToCheck.assignment();
        if (!subscription.isEmpty() || !assignment.isEmpty()) {
            throw new IllegalStateException("Consumer subscription must be managed by the Parallel Consumer. Use " + this.getClass().getName() + "#subcribe methods instead.");
        }
    }

    @Override
    public void subscribe(Collection<String> topics) {
        log.debug("Subscribing to {}", topics);
        this.consumer.subscribe(topics, (ConsumerRebalanceListener)this);
    }

    @Override
    public void subscribe(Pattern pattern) {
        log.debug("Subscribing to {}", (Object)pattern);
        this.consumer.subscribe(pattern, (ConsumerRebalanceListener)this);
    }

    @Override
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
        log.debug("Subscribing to {}", topics);
        this.usersConsumerRebalanceListener = Optional.of(callback);
        this.consumer.subscribe(topics, (ConsumerRebalanceListener)this);
    }

    @Override
    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
        log.debug("Subscribing to {}", (Object)pattern);
        this.usersConsumerRebalanceListener = Optional.of(callback);
        this.consumer.subscribe(pattern, (ConsumerRebalanceListener)this);
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        log.debug("Partitions revoked {}, state: {}", partitions, (Object)this.state);
        this.isRebalanceInProgress.set(true);
        while (this.isTransactionCommittingInProgress()) {
            Thread.sleep(100L);
        }
        this.numberOfAssignedPartitions -= partitions.size();
        try {
            this.commitOffsetsThatAreReady();
            this.wm.onPartitionsRevoked(partitions);
        }
        catch (Exception e) {
            throw new InternalRuntimeException("onPartitionsRevoked event error", e);
        }
        finally {
            this.isRebalanceInProgress.set(false);
        }
        try {
            this.usersConsumerRebalanceListener.ifPresent(listener -> listener.onPartitionsRevoked(partitions));
        }
        catch (Exception e) {
            throw new ExceptionInUserFunctionException("Error from rebalance listener function after #onPartitionsRevoked", e);
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        this.numberOfAssignedPartitions += partitions.size();
        log.info("Assigned {} total ({} new) partition(s) {}", new Object[]{this.numberOfAssignedPartitions, partitions.size(), partitions});
        this.wm.onPartitionsAssigned(partitions);
        this.usersConsumerRebalanceListener.ifPresent(x -> x.onPartitionsAssigned(partitions));
        this.notifySomethingToDo();
    }

    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        this.numberOfAssignedPartitions -= partitions.size();
        this.wm.onPartitionsLost(partitions);
        this.usersConsumerRebalanceListener.ifPresent(x -> x.onPartitionsLost(partitions));
    }

    private void checkAutoCommitIsDisabled(Consumer<K, V> consumer) {
        Optional<Boolean> isAutoCommitEnabled;
        try {
            isAutoCommitEnabled = AbstractParallelEoSStreamProcessor.getAutoCommitEnabled(consumer);
        }
        catch (ClassNotFoundException | IllegalAccessException | NoSuchFieldException | NullPointerException e) {
            if (!this.options.isIgnoreReflectiveAccessExceptionsForAutoCommitDisabledCheck()) {
                throw new ParallelConsumerException("Failed to check whether auto commit is enabled for consumer type " + consumer.getClass() + ". This exception can be ignored by enabling the ignoreReflectiveAccessExceptionsForAutoCommitDisabledCheck option.");
            }
            log.warn("Failed to check whether auto commit is enabled for consumer type {}. Ignoring because ignoreReflectiveAccessExceptionsForAutoCommitDisabledCheck is enabled.", consumer.getClass(), (Object)e);
            return;
        }
        if (isAutoCommitEnabled.isPresent() && isAutoCommitEnabled.get().booleanValue()) {
            throw new ParallelConsumerException("Consumer auto commit must be disabled, as commits are handled by the library.");
        }
        if (!isAutoCommitEnabled.isPresent()) {
            if (this.options.isIgnoreReflectiveAccessExceptionsForAutoCommitDisabledCheck()) {
                log.warn("Unable to check whether auto commit is enabled for consumer type {}. Ignoring because ignoreReflectiveAccessExceptionsForAutoCommitDisabledCheck is enabled.", consumer.getClass());
            } else {
                throw new ParallelConsumerException("Unable to check whether auto commit is enabled for consumer type " + consumer.getClass() + ". This exception can be ignored by enabling the ignoreReflectiveAccessExceptionsForAutoCommitDisabledCheck option.");
            }
        }
    }

    private static Optional<Boolean> getAutoCommitEnabled(Consumer<?, ?> consumer) throws ClassNotFoundException, IllegalAccessException, NoSuchFieldException {
        Field delegateField;
        if (consumer instanceof MockConsumer) {
            log.debug("Detected MockConsumer class which doesn't do auto commits");
            return Optional.of(false);
        }
        if (!(consumer instanceof KafkaConsumer)) {
            log.warn("Consumer is neither a KafkaConsumer nor a MockConsumer - cannot check auto commit is disabled for consumer type: {}", consumer.getClass());
            return Optional.of(false);
        }
        KafkaConsumer kafkaConsumer = (KafkaConsumer)consumer;
        try {
            delegateField = KafkaConsumer.class.getDeclaredField("delegate");
            delegateField.setAccessible(true);
        }
        catch (NoSuchFieldException ignored) {
            delegateField = null;
        }
        if (delegateField != null) {
            Consumer delegate = (Consumer)delegateField.get(kafkaConsumer);
            Objects.requireNonNull(delegate, "Consumer delegate must not be null");
            if ("org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer".equals(delegate.getClass().getName())) {
                boolean autoCommitEnabled = AbstractParallelEoSStreamProcessor.getAutoCommitEnabledFromCoordinator(delegate.getClass(), delegate);
                return Optional.of(autoCommitEnabled);
            }
            if ("org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer".equals(delegate.getClass().getName())) {
                Field autoCommitEnabledField = delegate.getClass().getDeclaredField("autoCommitEnabled");
                autoCommitEnabledField.setAccessible(true);
                boolean autoCommitEnabled = (Boolean)autoCommitEnabledField.get(delegate);
                return Optional.of(autoCommitEnabled);
            }
            log.warn("Encountered unknown consumer delegate {}", consumer.getClass());
            return Optional.empty();
        }
        boolean autoCommitEnabled = AbstractParallelEoSStreamProcessor.getAutoCommitEnabledFromCoordinator(kafkaConsumer.getClass(), kafkaConsumer);
        return Optional.of(autoCommitEnabled);
    }

    private static <T extends Consumer, U extends Consumer<?, ?>> boolean getAutoCommitEnabledFromCoordinator(Class<T> consumerClass, U consumer) throws IllegalAccessException, NoSuchFieldException {
        Field coordinatorField = consumerClass.getDeclaredField("coordinator");
        coordinatorField.setAccessible(true);
        ConsumerCoordinator coordinator = (ConsumerCoordinator)coordinatorField.get(consumer);
        Objects.requireNonNull(coordinator, "Consumer coordinator must not be null. Ensure that group.id is configured for this consumer.");
        Field autoCommitEnabledField = coordinator.getClass().getDeclaredField("autoCommitEnabled");
        autoCommitEnabledField.setAccessible(true);
        return (Boolean)autoCommitEnabledField.get(coordinator);
    }

    @Override
    public void close() {
        this.closeDontDrainFirst();
    }

    @Override
    public void close(Duration timeout, DrainingCloseable.DrainingMode drainMode) {
        this.shutdownTimeout = timeout;
        this.close(drainMode);
    }

    @Override
    public void close(DrainingCloseable.DrainingMode drainMode) {
        if (this.state == State.CLOSED) {
            log.info("Already closed, checking end state..");
        } else {
            log.info("Signaling to close...");
            switch (drainMode) {
                case DRAIN: {
                    log.info("Will wait for all in flight to complete before");
                    this.transitionToDraining();
                    this.waitForClose(this.drainTimeout.plus(this.shutdownTimeout).plus(GRACE_PERIOD_FOR_OVERALL_SHUTDOWN));
                    break;
                }
                case DONT_DRAIN: {
                    log.info("Not waiting for remaining queued to complete, will finish in flight, then close...");
                    this.transitionToClosing();
                    this.waitForClose(this.shutdownTimeout.plus(GRACE_PERIOD_FOR_OVERALL_SHUTDOWN));
                }
            }
        }
        if (this.controlThreadFuture.isPresent()) {
            log.debug("Checking for control thread exception...");
            Future<Boolean> future = this.controlThreadFuture.get();
            future.get(this.shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS);
        }
        log.info("Close complete.");
    }

    private void waitForClose(Duration timeout) throws TimeoutException, ExecutionException {
        log.info("Waiting on closed state...");
        while (!this.state.equals((Object)State.CLOSED)) {
            try {
                Future<Boolean> booleanFuture = this.controlThreadFuture.get();
                log.debug("Blocking on control future, for duration {} seconds", (Object)BackportUtils.toSeconds(timeout));
                boolean signaled = booleanFuture.get(BackportUtils.toSeconds(timeout), TimeUnit.SECONDS);
                if (!signaled) {
                    throw new TimeoutException("Timeout waiting for system to close (" + timeout + ")");
                }
            }
            catch (InterruptedException e) {
                log.trace("Interrupted", (Throwable)e);
            }
            catch (ExecutionException | TimeoutException e) {
                log.error("Execution or timeout exception while waiting for the control thread to close cleanly (state was {}). Try increasing your time-out to allow the system to drain, or close without draining.", (Object)this.state, (Object)e);
                throw e;
            }
            log.trace("Still waiting for system to close...");
        }
    }

    private void doClose(Duration timeout) throws TimeoutException, ExecutionException, InterruptedException {
        log.debug("Starting close process (state: {})...", (Object)this.state);
        this.brokerPollSubsystem.drain();
        log.debug("Shutting down execution pool...");
        this.workerThreadPool.get().getQueue().clear();
        this.workerThreadPool.get().shutdown();
        if (this.workerThreadPool.get().getActiveCount() > 0) {
            log.info("Inflight work in execution pool: {}, letting to finish on shutdown with timeout: {}", (Object)this.workerThreadPool.get().getActiveCount(), (Object)timeout);
        }
        log.debug("Awaiting worker pool termination...");
        this.awaitingInflightProcessingCompletionOnShutdown.getAndSet(true);
        boolean awaitingInflightCompletion = true;
        while (awaitingInflightCompletion) {
            log.debug("Still awaiting completion of inflight work");
            try {
                boolean terminationFinishedWithoutTimeout = this.workerThreadPool.get().awaitTermination(BackportUtils.toSeconds(timeout), TimeUnit.SECONDS);
                awaitingInflightCompletion = false;
                if (terminationFinishedWithoutTimeout) continue;
                log.warn("Thread execution pool termination await timeout ({})! Were any processing jobs dead locked (test latch locks?) or otherwise stuck? Forcing shutdown of workers.", (Object)timeout);
                this.workerThreadPool.get().shutdownNow();
                this.workerThreadPool.get().awaitTermination(BackportUtils.toSeconds(Duration.ofSeconds(1L)), TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                log.error("InterruptedException", (Throwable)e);
                awaitingInflightCompletion = true;
            }
        }
        this.awaitingInflightProcessingCompletionOnShutdown.getAndSet(false);
        if (this.workerThreadPool.get().getActiveCount() > 0) {
            log.warn("Clean execution pool termination failed - some threads still active despite await and interrupt - is user function swallowing interrupted exception? Threads still not done count: {}", (Object)this.workerThreadPool.get().getActiveCount());
        }
        log.debug("Worker pool terminated.");
        this.processWorkCompleteMailBox(Duration.ZERO);
        if (Thread.currentThread().isInterrupted()) {
            log.warn("control thread interrupted - may lead to issues with transactional commit lock acquisition");
        }
        this.commitOffsetsThatAreReady();
        log.debug("Closing and waiting for broker poll system...");
        this.brokerPollSubsystem.closeAndWait();
        this.maybeCloseConsumer();
        this.producerManager.ifPresent(x -> x.close(timeout));
        this.deregisterMeters();
        this.pcMetrics.close();
        log.debug("Close complete.");
        this.state = State.CLOSED;
        if (this.getFailureCause() != null) {
            log.error("PC closed due to error: {}", (Object)this.getFailureCause(), null);
        }
    }

    private void deregisterMeters() {
        this.pcMetrics.removeMetersByPrefixAndCommonTags("pc.user.function.");
    }

    private void maybeCloseConsumer() {
        if (this.isResponsibleForCommits()) {
            this.consumer.close();
        }
    }

    private boolean isResponsibleForCommits() {
        return this.committer instanceof ProducerManager;
    }

    private boolean isRecordsAwaitingProcessing() {
        boolean isRecordsAwaitingProcessing = this.wm.isRecordsAwaitingProcessing();
        boolean threadsDone = this.areMyThreadsDone();
        log.trace("isRecordsAwaitingProcessing {} || threadsDone {}", (Object)isRecordsAwaitingProcessing, (Object)threadsDone);
        return isRecordsAwaitingProcessing || threadsDone;
    }

    private void transitionToDraining() {
        log.debug("Transitioning to draining...");
        this.state = State.DRAINING;
        this.notifySomethingToDo();
    }

    private void interruptControlThread() {
        if (this.blockableControlThread != null) {
            log.debug("Interrupting {} thread in case it's waiting for work", (Object)this.blockableControlThread.getName());
            this.blockableControlThread.interrupt();
        }
    }

    private boolean areMyThreadsDone() {
        if (BackportUtils.isEmpty(this.controlThreadFuture)) {
            return false;
        }
        return this.controlThreadFuture.get().isDone();
    }

    protected <R> void supervisorLoop(Function<PollContextInternal<K, V>, List<R>> userFunctionWrapped, java.util.function.Consumer<R> callback) {
        ExecutorService executorService;
        if (this.state != State.UNUSED) {
            throw new IllegalStateException(StringUtils.msg("Invalid state - you cannot call the poll* or pollAndProduce* methods more than once (they are asynchronous) (current state is {})", new Object[]{this.state}));
        }
        this.state = State.RUNNING;
        this.brokerPollSubsystem.start(this.options.getManagedExecutorService());
        try {
            executorService = (ExecutorService)InitialContext.doLookup(this.options.getManagedExecutorService());
        }
        catch (NamingException e) {
            log.debug("Using Java SE Thread", (Throwable)e);
            executorService = Executors.newSingleThreadExecutor();
        }
        Callable<Boolean> controlTask = () -> {
            this.addInstanceMDC();
            log.info("Control loop starting up...");
            Thread controlThread = Thread.currentThread();
            controlThread.setName("pc-control");
            this.getMyId().ifPresent(id -> controlThread.setName("pc-control-" + id));
            this.blockableControlThread = controlThread;
            while (this.state != State.CLOSED) {
                log.debug("Control loop start");
                try {
                    this.controlLoop(userFunctionWrapped, callback);
                }
                catch (InterruptedException e) {
                    log.debug("Control loop interrupted, closing");
                    Thread.interrupted();
                    this.doClose(this.shutdownTimeout);
                }
                catch (Exception e) {
                    if (Thread.interrupted()) {
                        log.debug("Thread interrupted flag cleared in control loop error handling");
                    }
                    log.error("Error from poll control thread, will attempt controlled shutdown, then rethrow. Error: " + e.getMessage(), (Throwable)e);
                    this.failureReason = new RuntimeException("Error from poll control thread: " + e.getMessage(), e);
                    this.doClose(this.shutdownTimeout);
                    throw this.failureReason;
                }
            }
            log.info("Control loop ending clean (state:{})...", (Object)this.state);
            return true;
        };
        Future<Boolean> controlTaskFutureResult = executorService.submit(controlTask);
        this.controlThreadFuture = Optional.of(controlTaskFutureResult);
    }

    private void addInstanceMDC() {
        this.myId.ifPresent(id -> MDC.put((String)MDC_INSTANCE_ID, (String)id));
    }

    protected <R> void controlLoop(Function<PollContextInternal<K, V>, List<R>> userFunction, java.util.function.Consumer<R> callback) throws TimeoutException, ExecutionException, InterruptedException {
        this.maybeWakeupPoller();
        boolean shouldTryCommitNow = this.maybeAcquireCommitLock();
        Duration timeToBlockFor = shouldTryCommitNow ? Duration.ZERO : this.getTimeToBlockFor();
        this.processWorkCompleteMailBox(timeToBlockFor);
        if (shouldTryCommitNow) {
            this.commitOffsetsThatAreReady();
        }
        this.retrieveAndDistributeNewWork(userFunction, callback);
        log.trace("Loop: Running {} loop end plugin(s)", (Object)this.controlLoopHooks.size());
        this.controlLoopHooks.forEach(Runnable::run);
        log.trace("Current state: {}", (Object)this.state);
        switch (this.state) {
            case DRAINING: {
                this.drain();
                break;
            }
            case CLOSING: {
                this.doClose(this.shutdownTimeout);
            }
        }
        this.brokerPollSubsystem.supervise();
        Duration duration = Duration.ofMillis(1L);
        try {
            Thread.sleep(duration.toMillis());
        }
        catch (InterruptedException e) {
            log.trace("Woke up", (Throwable)e);
        }
        if (log.isTraceEnabled()) {
            log.trace("End of control loop, waiting processing {}, remaining in partition queues: {}, out for processing: {}. In state: {}", new Object[]{this.wm.getNumberOfWorkQueuedInShardsAwaitingSelection(), this.wm.getNumberOfIncompleteOffsets(), this.wm.getNumberRecordsOutForProcessing(), this.state});
        }
    }

    private void maybeWakeupPoller() {
        if (this.state == State.RUNNING && !this.wm.isSufficientlyLoaded() && this.brokerPollSubsystem.isPausedForThrottling()) {
            log.debug("Found Poller paused with not enough front loaded messages, ensuring poller is awake (mail: {} vs target: {})", (Object)this.wm.getNumberOfWorkQueuedInShardsAwaitingSelection(), (Object)this.options.getTargetAmountOfRecordsInFlight());
            this.brokerPollSubsystem.wakeupIfPaused();
        }
    }

    private boolean maybeAcquireCommitLock() throws TimeoutException, InterruptedException {
        boolean shouldTryCommitNow;
        boolean bl = shouldTryCommitNow = this.isTimeToCommitNow() && this.wm.isDirty() && !this.isRebalanceInProgress.get();
        if (shouldTryCommitNow && this.options.isUsingTransactionCommitMode()) {
            log.debug("Acquiring commit lock pessimistically, before we try to collect offsets for committing");
            this.producerManager.get().preAcquireOffsetsToCommit();
        }
        return shouldTryCommitNow;
    }

    private <R> int retrieveAndDistributeNewWork(Function<PollContextInternal<K, V>, List<R>> userFunction, java.util.function.Consumer<R> callback) {
        this.checkPipelinePressure();
        int gotWorkCount = 0;
        if (this.state == State.RUNNING || this.state == State.DRAINING) {
            int delta = this.calculateQuantityToRequest();
            List<WorkContainer<K, V>> records = this.wm.getWorkIfAvailable(delta);
            gotWorkCount = records.size();
            this.lastWorkRequestWasFulfilled = gotWorkCount >= delta;
            log.trace("Loop: Submit to pool");
            this.submitWorkToPool(userFunction, callback, records);
        }
        this.queueStatsLimiter.performIfNotLimited(() -> {
            int queueSize = this.getNumberOfUserFunctionsQueued();
            log.debug("Stats: \n- pool active: {} queued:{} \n- queue size: {} target: {} loading factor: {}", new Object[]{this.workerThreadPool.get().getActiveCount(), queueSize, queueSize, this.getPoolLoadTarget(), this.dynamicExtraLoadFactor.getCurrentFactor()});
        });
        return gotWorkCount;
    }

    protected <R> void submitWorkToPool(Function<PollContextInternal<K, V>, List<R>> usersFunction, java.util.function.Consumer<R> callback, List<WorkContainer<K, V>> workToProcess) {
        if (this.state.equals((Object)State.CLOSING) || this.state.equals((Object)State.CLOSED)) {
            log.debug("Not submitting new work as Parallel Consumer is in {} state, incoming work: {}, Pool stats: {}", new Object[]{this.state, workToProcess.size(), this.workerThreadPool.get()});
        }
        if (!workToProcess.isEmpty()) {
            log.debug("New work incoming: {}, Pool stats: {}", (Object)workToProcess.size(), (Object)this.workerThreadPool.get());
            List<List<WorkContainer<K, V>>> batches = this.makeBatches(workToProcess);
            if (log.isDebugEnabled()) {
                List sizes = batches.stream().map(List::size).sorted().collect(Collectors.toList());
                log.debug("Number batches: {}, smallest {}, sizes {}", new Object[]{batches.size(), sizes.stream().findFirst().get(), sizes});
                List integerStream = sizes.stream().filter(x -> x < this.options.getBatchSize()).collect(Collectors.toList());
                if (integerStream.size() > 1) {
                    log.warn("More than one batch isn't target size: {}. Input number of batches: {}", integerStream, (Object)batches.size());
                }
            }
            for (List<WorkContainer<K, V>> batch : batches) {
                this.submitWorkToPoolInner(usersFunction, callback, batch);
            }
        }
    }

    private <R> void submitWorkToPoolInner(Function<PollContextInternal<K, V>, List<R>> usersFunction, java.util.function.Consumer<R> callback, List<WorkContainer<K, V>> batch) {
        log.trace("Sending work ({}) to pool", batch);
        Future<List<?>> outputRecordFuture = this.workerThreadPool.get().submit(() -> {
            this.addInstanceMDC();
            return this.runUserFunction(usersFunction, callback, batch);
        });
        for (WorkContainer<K, V> workContainer : batch) {
            workContainer.setFuture(outputRecordFuture);
        }
    }

    private List<List<WorkContainer<K, V>>> makeBatches(List<WorkContainer<K, V>> workToProcess) {
        int maxBatchSize = this.options.getBatchSize();
        return AbstractParallelEoSStreamProcessor.partition(workToProcess, maxBatchSize);
    }

    private static <T> List<List<T>> partition(Collection<T> sourceCollection, int maxBatchSize) {
        ArrayList<List<T>> listOfBatches = new ArrayList<List<T>>();
        ArrayList<T> batchInConstruction = new ArrayList<T>();
        for (T item : sourceCollection) {
            batchInConstruction.add(item);
            if (batchInConstruction.size() != maxBatchSize) continue;
            listOfBatches.add(batchInConstruction);
            batchInConstruction = new ArrayList();
        }
        if (!batchInConstruction.isEmpty()) {
            listOfBatches.add(batchInConstruction);
        }
        if (log.isDebugEnabled()) {
            log.debug("sourceCollection.size() {}, batches: {}, batch sizes {}", new Object[]{sourceCollection.size(), listOfBatches.size(), listOfBatches.stream().map(List::size).collect(Collectors.toList())});
        }
        return listOfBatches;
    }

    protected int calculateQuantityToRequest() {
        int batchSize;
        int modulo;
        int target = this.getTargetOutForProcessing();
        int current = this.wm.getNumberRecordsOutForProcessing();
        int delta = target - current;
        if (this.options.isUsingBatching() && (modulo = delta % (batchSize = this.options.getBatchSize().intValue())) > 0) {
            int extraToFillBatch = target - modulo;
            delta += extraToFillBatch;
        }
        log.debug("Will try to get work - target: {}, current queue size: {}, requesting: {}, loading factor: {}", new Object[]{target, current, delta, this.dynamicExtraLoadFactor.getCurrentFactor()});
        return delta;
    }

    protected int getTargetOutForProcessing() {
        return this.getQueueTargetLoaded();
    }

    protected int getQueueTargetLoaded() {
        return this.getPoolLoadTarget() * this.dynamicExtraLoadFactor.getCurrentFactor();
    }

    protected void checkPipelinePressure() {
        if (log.isTraceEnabled()) {
            log.trace("Queue pressure check: (current size: {}, loaded target: {}, factor: {}) if (isPoolQueueLow() {} && lastWorkRequestWasFulfilled {}))", new Object[]{this.getNumberOfUserFunctionsQueued(), this.getQueueTargetLoaded(), this.dynamicExtraLoadFactor.getCurrentFactor(), this.isPoolQueueLow(), this.lastWorkRequestWasFulfilled});
        }
        if (this.isPoolQueueLow() && this.lastWorkRequestWasFulfilled) {
            boolean steppedUp = this.dynamicExtraLoadFactor.maybeStepUp();
            if (steppedUp) {
                log.debug("isPoolQueueLow(): Executor pool queue is not loaded with enough work (queue: {} vs target: {}), stepped up loading factor to {}", new Object[]{this.getNumberOfUserFunctionsQueued(), this.getPoolLoadTarget(), this.dynamicExtraLoadFactor.getCurrentFactor()});
            } else if (this.dynamicExtraLoadFactor.isMaxReached()) {
                log.warn("isPoolQueueLow(): Max loading factor steps reached: {}/{}", (Object)this.dynamicExtraLoadFactor.getCurrentFactor(), (Object)this.dynamicExtraLoadFactor.getMaxFactor());
            }
        }
    }

    private int getPoolLoadTarget() {
        return this.options.getTargetAmountOfRecordsInFlight();
    }

    private boolean isPoolQueueLow() {
        int queueTarget;
        int queueSize = this.getNumberOfUserFunctionsQueued();
        boolean workAmountBelowTarget = queueSize <= (queueTarget = this.getPoolLoadTarget());
        log.debug("isPoolQueueLow()? workAmountBelowTarget {} {} vs {};", new Object[]{workAmountBelowTarget, queueSize, queueTarget});
        return workAmountBelowTarget;
    }

    private void drain() {
        log.debug("Signaling to drain...");
        this.brokerPollSubsystem.drain();
        if (!this.isRecordsAwaitingProcessing()) {
            this.transitionToClosing();
        } else {
            log.debug("Records still waiting processing, won't transition to closing.");
        }
    }

    private void transitionToClosing() {
        log.debug("Transitioning to closing...");
        this.state = this.state == State.UNUSED ? State.CLOSED : State.CLOSING;
        this.notifySomethingToDo();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processWorkCompleteMailBox(Duration timeToBlockFor) {
        log.trace("Processing mailbox (might block waiting for results)...");
        ArrayDeque<ControllerEventMessage<K, V>> results = new ArrayDeque<ControllerEventMessage<K, V>>();
        if (timeToBlockFor.toMillis() > 0L) {
            this.currentlyPollingWorkCompleteMailBox.getAndSet(true);
            if (log.isDebugEnabled()) {
                log.debug("Blocking poll on work until next scheduled offset commit attempt for {}. active threads: {}, queue: {}", new Object[]{timeToBlockFor, this.workerThreadPool.get().getActiveCount(), this.getNumberOfUserFunctionsQueued()});
            }
            log.trace("Blocking poll {}", (Object)timeToBlockFor);
            try {
                ControllerEventMessage<K, V> firstBlockingPoll = this.workMailBox.poll(timeToBlockFor.toMillis(), TimeUnit.MILLISECONDS);
                if (firstBlockingPoll == null) {
                    log.debug("Mailbox results returned null, indicating timeToBlockFor elapsed (which was set as {})", (Object)timeToBlockFor);
                } else {
                    log.debug("Work arrived in mailbox during blocking poll. (Timeout was set as {})", (Object)timeToBlockFor);
                    results.add(firstBlockingPoll);
                }
            }
            catch (InterruptedException e) {
                log.debug("Interrupted waiting on work results");
            }
            finally {
                this.currentlyPollingWorkCompleteMailBox.getAndSet(false);
            }
            log.trace("Blocking poll finish");
        }
        int size = this.workMailBox.size();
        log.trace("Draining {} more, got {} already...", (Object)size, (Object)results.size());
        this.workMailBox.drainTo(results, size);
        log.trace("Processing drained work {}...", (Object)results.size());
        for (ControllerEventMessage controllerEventMessage : results) {
            if (controllerEventMessage.isNewConsumerRecords()) {
                this.wm.registerWork(controllerEventMessage.getConsumerRecords());
                continue;
            }
            WorkContainer work = controllerEventMessage.getWorkContainer();
            MDC.put((String)MDC_WORK_CONTAINER_DESCRIPTOR, (String)work.toString());
            this.wm.handleFutureResult(work);
            MDC.remove((String)MDC_WORK_CONTAINER_DESCRIPTOR);
        }
    }

    private Duration getTimeToBlockFor() {
        Optional<Duration> lowestScheduledOpt;
        if (!this.wm.isWorkInFlightMeetingTarget() && (lowestScheduledOpt = this.wm.getLowestRetryTime()).isPresent()) {
            Duration retryDelay = this.options.getDefaultMessageRetryDelay();
            Duration lowestScheduled = lowestScheduledOpt.get();
            Duration timeBetweenCommits = this.getTimeBetweenCommits();
            Duration effectiveRetryDelay = lowestScheduled.toMillis() < retryDelay.toMillis() ? retryDelay : lowestScheduled;
            Duration result = timeBetweenCommits.toMillis() < effectiveRetryDelay.toMillis() ? timeBetweenCommits : effectiveRetryDelay;
            log.debug("Not enough work in flight, while work is waiting to be retried - so will only sleep until next retry time of {} (lowestScheduled = {})", (Object)result, (Object)lowestScheduled);
            return result;
        }
        Duration effectiveCommitAttemptDelay = this.getTimeToNextCommitCheck();
        log.debug("Calculated next commit time in {}", (Object)effectiveCommitAttemptDelay);
        return effectiveCommitAttemptDelay;
    }

    private boolean isIdlingOrRunning() {
        return this.state == State.RUNNING || this.state == State.DRAINING || this.state == State.PAUSED;
    }

    protected boolean isTimeToCommitNow() {
        boolean shouldCommitNow;
        this.updateLastCommitCheckTime();
        Duration elapsedSinceLastCommit = this.lastCommitTime == null ? Duration.ofDays(1L) : Duration.between(this.lastCommitTime, Instant.now());
        boolean commitFrequencyOK = elapsedSinceLastCommit.compareTo(this.getTimeBetweenCommits()) > 0;
        boolean isCommandedToCommit = this.isCommandedToCommit();
        boolean bl = shouldCommitNow = commitFrequencyOK || isCommandedToCommit;
        if (log.isDebugEnabled()) {
            log.debug("Should commit this cycle? shouldCommitNow? " + shouldCommitNow + " : commitFrequencyOK? " + commitFrequencyOK + ", isCommandedToCommit? " + isCommandedToCommit);
        }
        return shouldCommitNow;
    }

    private int getNumberOfUserFunctionsQueued() {
        return this.workerThreadPool.get().getQueue().size();
    }

    private Duration getTimeToNextCommitCheck() {
        if (this.isIdlingOrRunning()) {
            Duration timeSinceLastCommit = this.getTimeSinceLastCheck();
            Duration timeBetweenCommits = this.getTimeBetweenCommits();
            Duration minus = timeBetweenCommits.minus(timeSinceLastCommit);
            return minus;
        }
        log.debug("System not {} (state: {}), so don't wait to commit, only a small thread yield time", (Object)State.RUNNING, (Object)this.state);
        return Duration.ZERO;
    }

    private Duration getTimeSinceLastCheck() {
        Instant now = this.clock.instant();
        return Duration.between(this.lastCommitCheckTime, now);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void commitOffsetsThatAreReady() throws TimeoutException, InterruptedException {
        log.trace("Synchronizing on commitCommand...");
        AtomicBoolean atomicBoolean = this.commitCommand;
        synchronized (atomicBoolean) {
            log.debug("Committing offsets that are ready...");
            this.committer.retrieveOffsetsAndCommit();
            this.clearCommitCommand();
            this.lastCommitTime = Instant.now();
        }
    }

    private void updateLastCommitCheckTime() {
        this.lastCommitCheckTime = Instant.now();
    }

    protected <R> List<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> runUserFunction(Function<PollContextInternal<K, V>, List<R>> usersFunction, java.util.function.Consumer<R> callback, List<WorkContainer<K, V>> workContainerBatch) {
        if (log.isDebugEnabled()) {
            MDC.put((String)MDC_WORK_CONTAINER_DESCRIPTOR, (String)(workContainerBatch.get(0).offset() + ""));
        }
        log.trace("Pool received: {}", workContainerBatch);
        Map<Boolean, List<WorkContainer>> splitContainersMap = workContainerBatch.stream().collect(Collectors.groupingBy(this.wm::checkIfWorkIsStale));
        List staleWorkContainers = splitContainersMap.getOrDefault(Boolean.TRUE, new ArrayList());
        List activeWorkContainers = splitContainersMap.getOrDefault(Boolean.FALSE, new ArrayList());
        this.handleStaleWork(staleWorkContainers);
        PollContextInternal context = new PollContextInternal(activeWorkContainers);
        try {
            if (!activeWorkContainers.isEmpty()) {
                ArrayList<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> arrayList = this.runUserFunctionInternal(usersFunction, context, callback, activeWorkContainers);
                return arrayList;
            }
            List<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> list = Collections.emptyList();
            return list;
        }
        catch (Exception e) {
            Throwable cause = e.getCause();
            String msg = StringUtils.msg("Exception caught in user function running stage, registering WC as failed, returning to mailbox. Context: {}", context, e);
            if (cause instanceof PCRetriableException) {
                log.debug("Explicit " + PCRetriableException.class.getSimpleName() + " caught, logging at DEBUG only. " + msg, (Throwable)e);
            } else {
                log.error(msg, (Throwable)e);
            }
            for (WorkContainer<K, V> wc : workContainerBatch) {
                wc.onUserFunctionFailure(e);
                this.addToMailbox(context, wc);
            }
            throw e;
        }
        finally {
            this.cleanUpContext(context);
        }
    }

    protected void handleStaleWork(List<WorkContainer<K, V>> staleWorkContainers) {
        PollContextInternal<K, V> internalContext = new PollContextInternal<K, V>(staleWorkContainers);
        try {
            if (!staleWorkContainers.isEmpty()) {
                log.debug("Pool found work from old generation of assigned work, skipping message as epoch doesn't match current {}", staleWorkContainers);
                staleWorkContainers.forEach(wc -> this.addToMailbox(internalContext, (WorkContainer<K, V>)wc));
            }
        }
        finally {
            this.cleanUpContext(internalContext);
        }
    }

    protected <R> ArrayList<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> runUserFunctionInternal(Function<PollContextInternal<K, V>, List<R>> usersFunction, PollContextInternal<K, V> context, java.util.function.Consumer<R> callback, List<WorkContainer<K, V>> activeWorkContainers) {
        List resultsFromUserFunction = (List)this.userProcessingTimer.record(() -> (List)usersFunction.apply(context));
        for (WorkContainer<K, V> kvWorkContainer : activeWorkContainers) {
            this.onUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction);
        }
        ArrayList<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> intermediateResults = new ArrayList<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>>();
        for (Object e : resultsFromUserFunction) {
            log.trace("Running users call back...");
            callback.accept(e);
        }
        for (WorkContainer workContainer : activeWorkContainers) {
            this.addToMailBoxOnUserFunctionSuccess(context, workContainer, resultsFromUserFunction);
        }
        log.trace("User function future registered");
        return intermediateResults;
    }

    private void cleanUpContext(PollContextInternal<K, V> context) {
        context.getProducingLock().ifPresent(ProducerManager.ProducingLock::unlock);
    }

    protected void addToMailBoxOnUserFunctionSuccess(PollContextInternal<K, V> context, WorkContainer<K, V> wc, List<?> resultsFromUserFunction) {
        this.addToMailbox(context, wc);
    }

    protected void onUserFunctionSuccess(WorkContainer<K, V> wc, List<?> resultsFromUserFunction) {
        log.trace("User function success");
        wc.onUserFunctionSuccess();
    }

    protected void addToMailbox(PollContextInternal<K, V> pollContext, WorkContainer<K, V> wc) {
        String state = wc.isUserFunctionSucceeded() ? "succeeded" : "FAILED";
        log.trace("Adding {} {} to mailbox...", (Object)state, wc);
        this.workMailBox.add(ControllerEventMessage.of(wc));
        wc.onPostAddToMailBox(pollContext, this.producerManager);
    }

    public void registerWork(EpochAndRecordsMap<K, V> polledRecords) {
        log.trace("Adding {} to mailbox...", polledRecords);
        this.workMailBox.add(ControllerEventMessage.of(polledRecords));
    }

    public void notifySomethingToDo() {
        boolean noTransactionInProgress;
        boolean bl = noTransactionInProgress = this.producerManager.map(ProducerManager::isTransactionCommittingInProgress).orElse(false) == false;
        if (noTransactionInProgress && !this.awaitingInflightProcessingCompletionOnShutdown.get()) {
            log.trace("Interrupting control thread: Knock knock, wake up! You've got mail (tm)!");
            this.interruptControlThread();
        } else {
            log.trace("Would have interrupted control thread, but TX in progress");
        }
    }

    @Override
    public long workRemaining() {
        return this.wm.getNumberOfIncompleteOffsets();
    }

    public void addLoopEndCallBack(Runnable r) {
        this.controlLoopHooks.add(r);
    }

    public void setLongPollTimeout(Duration ofMillis) {
        BrokerPollSystem.setLongPollTimeout(ofMillis);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void requestCommitAsap() {
        log.debug("Registering command to commit next chance");
        AtomicBoolean atomicBoolean = this.commitCommand;
        synchronized (atomicBoolean) {
            this.commitCommand.set(true);
        }
        this.notifySomethingToDo();
    }

    private boolean isTransactionCommittingInProgress() {
        return this.options.isUsingTransactionCommitMode() && this.producerManager.map(ProducerManager::isTransactionCommittingInProgress).orElse(false) != false;
    }

    @Override
    public void pauseIfRunning() {
        if (this.state == State.RUNNING) {
            log.info("Transitioning parallel consumer to state paused.");
            this.state = State.PAUSED;
        } else {
            log.debug("Skipping transition of parallel consumer to state paused. Current state is {}.", (Object)this.state);
        }
    }

    @Override
    public void resumeIfPaused() {
        if (this.state == State.PAUSED) {
            log.info("Transitioning parallel consumer to state running.");
            this.state = State.RUNNING;
            this.notifySomethingToDo();
        } else {
            log.debug("Skipping transition of parallel consumer to state running. Current state is {}.", (Object)this.state);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isCommandedToCommit() {
        AtomicBoolean atomicBoolean = this.commitCommand;
        synchronized (atomicBoolean) {
            return this.commitCommand.get();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clearCommitCommand() {
        AtomicBoolean atomicBoolean = this.commitCommand;
        synchronized (atomicBoolean) {
            if (this.commitCommand.get()) {
                log.debug("Command to commit asap received, clearing");
                this.commitCommand.set(false);
            }
        }
    }

    protected ParallelConsumerOptions<K, V> getOptions() {
        return this.options;
    }

    void setClock(Clock clock) {
        this.clock = clock;
    }

    protected Optional<ProducerManager<K, V>> getProducerManager() {
        return this.producerManager;
    }

    protected Supplier<ThreadPoolExecutor> getWorkerThreadPool() {
        return this.workerThreadPool;
    }

    public WorkManager<K, V> getWm() {
        return this.wm;
    }

    protected BlockingQueue<ControllerEventMessage<K, V>> getWorkMailBox() {
        return this.workMailBox;
    }

    public void setState(State state) {
        this.state = state;
    }

    public int getNumberOfAssignedPartitions() {
        return this.numberOfAssignedPartitions;
    }

    protected PCModule<K, V> getModule() {
        return this.module;
    }

    public void setMyId(Optional<String> myId) {
        this.myId = myId;
    }

    public Optional<String> getMyId() {
        return this.myId;
    }

    static final class ControllerEventMessage<K, V> {
        private final WorkContainer<K, V> workContainer;
        private final EpochAndRecordsMap<K, V> consumerRecords;

        private boolean isWorkResult() {
            return this.workContainer != null;
        }

        private boolean isNewConsumerRecords() {
            return !this.isWorkResult();
        }

        private static <K, V> ControllerEventMessage<K, V> of(EpochAndRecordsMap<K, V> polledRecords) {
            return new ControllerEventMessage<K, V>(null, polledRecords);
        }

        public static <K, V> ControllerEventMessage<K, V> of(WorkContainer<K, V> work) {
            return new ControllerEventMessage<K, V>(work, null);
        }

        public WorkContainer<K, V> getWorkContainer() {
            return this.workContainer;
        }

        public EpochAndRecordsMap<K, V> getConsumerRecords() {
            return this.consumerRecords;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof ControllerEventMessage)) {
                return false;
            }
            ControllerEventMessage other = (ControllerEventMessage)o;
            WorkContainer<K, V> this$workContainer = this.getWorkContainer();
            WorkContainer<K, V> other$workContainer = other.getWorkContainer();
            if (this$workContainer == null ? other$workContainer != null : !((Object)this$workContainer).equals(other$workContainer)) {
                return false;
            }
            EpochAndRecordsMap<K, V> this$consumerRecords = this.getConsumerRecords();
            EpochAndRecordsMap<K, V> other$consumerRecords = other.getConsumerRecords();
            return !(this$consumerRecords == null ? other$consumerRecords != null : !((Object)this$consumerRecords).equals(other$consumerRecords));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            WorkContainer<K, V> $workContainer = this.getWorkContainer();
            result = result * 59 + ($workContainer == null ? 43 : ((Object)$workContainer).hashCode());
            EpochAndRecordsMap<K, V> $consumerRecords = this.getConsumerRecords();
            result = result * 59 + ($consumerRecords == null ? 43 : ((Object)$consumerRecords).hashCode());
            return result;
        }

        public String toString() {
            return "AbstractParallelEoSStreamProcessor.ControllerEventMessage(workContainer=" + this.getWorkContainer() + ", consumerRecords=" + this.getConsumerRecords() + ")";
        }

        private ControllerEventMessage(WorkContainer<K, V> workContainer, EpochAndRecordsMap<K, V> consumerRecords) {
            this.workContainer = workContainer;
            this.consumerRecords = consumerRecords;
        }
    }
}

