/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer;

import io.confluent.csid.utils.BackportUtils;
import io.confluent.csid.utils.Range;
import io.confluent.csid.utils.StringUtils;
import io.confluent.csid.utils.WallClock;
import io.confluent.parallelconsumer.BrokerPollSystem;
import io.confluent.parallelconsumer.ConsumerManager;
import io.confluent.parallelconsumer.DrainingCloseable;
import io.confluent.parallelconsumer.InternalError;
import io.confluent.parallelconsumer.OffsetCommitter;
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import io.confluent.parallelconsumer.ProducerManager;
import io.confluent.parallelconsumer.UserFunctions;
import io.confluent.parallelconsumer.WorkContainer;
import io.confluent.parallelconsumer.WorkManager;
import java.io.Closeable;
import java.lang.reflect.Field;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import pl.tlinkowski.unij.api.UniLists;

public class ParallelEoSStreamProcessor<K, V>
implements ParallelStreamProcessor<K, V>,
ConsumerRebalanceListener,
Closeable {
    private static final Logger log = LoggerFactory.getLogger(ParallelEoSStreamProcessor.class);
    private final ParallelConsumerOptions options;
    private WallClock clock = new WallClock();
    private Duration timeBetweenCommits = Duration.ofSeconds(1L);
    private Instant lastCommit = Instant.now();
    private final Optional<ProducerManager<K, V>> producerManager;
    private final Consumer<K, V> consumer;
    private final ExecutorService workerPool;
    private Optional<Future<Boolean>> controlThreadFuture = Optional.empty();
    protected WorkManager<K, V> wm;
    private final BlockingQueue<WorkContainer<K, V>> workMailBox = new LinkedBlockingQueue<WorkContainer<K, V>>();
    private final BrokerPollSystem<K, V> brokerPollSubsystem;
    private final List<Runnable> controlLoopHooks = new ArrayList<Runnable>();
    private Thread blockableControlThread;
    private final AtomicBoolean currentlyPollingWorkCompleteMailBox = new AtomicBoolean();
    private final OffsetCommitter committer;
    private final AtomicBoolean commitCommand = new AtomicBoolean(false);
    private State state = State.unused;
    private Optional<ConsumerRebalanceListener> usersConsumerRebalanceListener = Optional.empty();

    public boolean isClosedOrFailed() {
        boolean closed = this.state == State.closed;
        boolean doneOrCancelled = false;
        if (this.controlThreadFuture.isPresent()) {
            doneOrCancelled = this.controlThreadFuture.get().isDone() || this.controlThreadFuture.get().isCancelled();
        }
        return closed || doneOrCancelled;
    }

    public ParallelEoSStreamProcessor(ParallelConsumerOptions newOptions) {
        Objects.requireNonNull(newOptions, "Options must be supplied");
        log.info("Confluent Parallel Consumer initialise... Options: {}", (Object)newOptions);
        this.options = newOptions;
        this.options.validate();
        this.consumer = this.options.getConsumer();
        this.checkNotSubscribed(this.consumer);
        this.checkAutoCommitIsDisabled(this.consumer);
        this.workerPool = Executors.newFixedThreadPool(newOptions.getNumberOfThreads());
        this.wm = new WorkManager<K, V>(newOptions, this.consumer);
        ConsumerManager<K, V> consumerMgr = new ConsumerManager<K, V>(this.consumer);
        this.brokerPollSubsystem = new BrokerPollSystem<K, V>(consumerMgr, this.wm, this, newOptions);
        if (this.options.isProducerSupplied()) {
            this.producerManager = Optional.of(new ProducerManager(this.options.getProducer(), consumerMgr, this.wm, this.options));
            this.committer = this.options.isUsingTransactionalProducer() ? (OffsetCommitter)this.producerManager.get() : this.brokerPollSubsystem;
        } else {
            this.producerManager = Optional.empty();
            this.committer = this.brokerPollSubsystem;
        }
    }

    private void checkNotSubscribed(Consumer<K, V> consumerToCheck) {
        if (consumerToCheck instanceof MockConsumer) {
            return;
        }
        Set subscription = consumerToCheck.subscription();
        Set assignment = consumerToCheck.assignment();
        if (!subscription.isEmpty() || !assignment.isEmpty()) {
            throw new IllegalStateException("Consumer subscription must be managed by this class. Use " + this.getClass().getName() + "#subcribe methods instead.");
        }
    }

    @Override
    public void subscribe(Collection<String> topics) {
        log.debug("Subscribing to {}", topics);
        this.consumer.subscribe(topics, (ConsumerRebalanceListener)this);
    }

    @Override
    public void subscribe(Pattern pattern) {
        log.debug("Subscribing to {}", (Object)pattern);
        this.consumer.subscribe(pattern, (ConsumerRebalanceListener)this);
    }

    @Override
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
        log.debug("Subscribing to {}", topics);
        this.usersConsumerRebalanceListener = Optional.of(callback);
        this.consumer.subscribe(topics, (ConsumerRebalanceListener)this);
    }

    @Override
    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
        log.debug("Subscribing to {}", (Object)pattern);
        this.usersConsumerRebalanceListener = Optional.of(callback);
        this.consumer.subscribe(pattern, (ConsumerRebalanceListener)this);
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        try {
            log.debug("Partitions revoked (onPartitionsRevoked), state: {}", (Object)this.state);
            this.commitOffsetsThatAreReady();
            this.wm.onPartitionsRevoked(partitions);
            this.usersConsumerRebalanceListener.ifPresent(x -> x.onPartitionsRevoked(partitions));
        }
        catch (Exception e) {
            throw new InternalError("onPartitionsRevoked event error", e);
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        this.wm.onPartitionsAssigned(partitions);
        this.usersConsumerRebalanceListener.ifPresent(x -> x.onPartitionsAssigned(partitions));
    }

    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        this.wm.onPartitionsLost(partitions);
        this.usersConsumerRebalanceListener.ifPresent(x -> x.onPartitionsLost(partitions));
    }

    private void checkAutoCommitIsDisabled(Consumer<K, V> consumer) {
        if (consumer instanceof KafkaConsumer) {
            Field coordinatorField = consumer.getClass().getDeclaredField("coordinator");
            coordinatorField.setAccessible(true);
            ConsumerCoordinator coordinator = (ConsumerCoordinator)coordinatorField.get(consumer);
            Field autoCommitEnabledField = coordinator.getClass().getDeclaredField("autoCommitEnabled");
            autoCommitEnabledField.setAccessible(true);
            Boolean isAutoCommitEnabled = (Boolean)autoCommitEnabledField.get(coordinator);
            if (isAutoCommitEnabled.booleanValue()) {
                throw new IllegalStateException("Consumer auto commit must be disabled, as commits are handled by the library.");
            }
        }
    }

    @Override
    public void poll(java.util.function.Consumer<ConsumerRecord<K, V>> usersVoidConsumptionFunction) {
        Function wrappedUserFunc = record -> {
            log.trace("asyncPoll - Consumed a record ({}), executing void function...", (Object)record.offset());
            UserFunctions.carefullyRun(usersVoidConsumptionFunction, record);
            log.trace("asyncPoll - user function finished ok.");
            return UniLists.of();
        };
        java.util.function.Consumer<Object> voidCallBack = ignore -> log.trace("Void callback applied.");
        this.supervisorLoop(wrappedUserFunc, voidCallBack);
    }

    @Override
    public void pollAndProduceMany(Function<ConsumerRecord<K, V>, List<ProducerRecord<K, V>>> userFunction, java.util.function.Consumer<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> callback) {
        if (!this.options.isProducerSupplied()) {
            throw new IllegalArgumentException("To use the produce flows you must supply a Producer in the options");
        }
        Function wrappedUserFunc = consumedRecord -> {
            List recordListToProduce = (List)UserFunctions.carefullyRun(userFunction, consumedRecord);
            if (recordListToProduce.isEmpty()) {
                log.warn("No result returned from function to send.");
            }
            log.trace("asyncPoll and Stream - Consumed a record ({}), and returning a derivative result record to be produced: {}", consumedRecord, (Object)recordListToProduce);
            ArrayList results = new ArrayList();
            for (ProducerRecord toProduce : recordListToProduce) {
                RecordMetadata produceResultMeta = this.producerManager.get().produceMessage(toProduce);
                ParallelStreamProcessor.ConsumeProduceResult result = new ParallelStreamProcessor.ConsumeProduceResult(consumedRecord, toProduce, produceResultMeta);
                results.add(result);
            }
            return results;
        };
        this.supervisorLoop(wrappedUserFunc, callback);
    }

    @Override
    public void pollAndProduceMany(Function<ConsumerRecord<K, V>, List<ProducerRecord<K, V>>> userFunction) {
        this.pollAndProduceMany(userFunction, record -> log.trace("No-op user callback"));
    }

    @Override
    public void pollAndProduce(Function<ConsumerRecord<K, V>, ProducerRecord<K, V>> userFunction) {
        this.pollAndProduce(userFunction, record -> log.trace("No-op user callback"));
    }

    @Override
    public void pollAndProduce(Function<ConsumerRecord<K, V>, ProducerRecord<K, V>> userFunction, java.util.function.Consumer<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> callback) {
        this.pollAndProduceMany(record -> UniLists.of((Object)((ProducerRecord)userFunction.apply((ConsumerRecord)record))), callback);
    }

    @Override
    public void close() {
        Duration timeout = DrainingCloseable.DEFAULT_TIMEOUT.multipliedBy(2L);
        this.closeDontDrainFirst(timeout);
    }

    @Override
    public void close(Duration timeout, DrainingCloseable.DrainingMode drainMode) {
        if (this.state == State.closed) {
            log.info("Already closed, checking end state..");
        } else {
            log.info("Signaling to close...");
            switch (drainMode) {
                case DRAIN: {
                    log.info("Will wait for all in flight to complete before");
                    this.transitionToDraining();
                    break;
                }
                case DONT_DRAIN: {
                    log.info("Not waiting for in flight to complete, will transition directly to closing");
                    this.transitionToClosing();
                }
            }
            this.waitForClose(timeout);
        }
        if (this.controlThreadFuture.isPresent()) {
            log.debug("Checking for control thread exception...");
            Future<Boolean> future = this.controlThreadFuture.get();
            future.get(BackportUtils.toSeconds(timeout), TimeUnit.SECONDS);
        }
        log.info("Close complete.");
    }

    private void waitForClose(Duration timeout) throws TimeoutException, ExecutionException {
        log.info("Waiting on closed state...");
        while (!this.state.equals((Object)State.closed)) {
            try {
                Future<Boolean> booleanFuture = this.controlThreadFuture.get();
                log.debug("Blocking on control future");
                boolean signaled = booleanFuture.get(BackportUtils.toSeconds(timeout), TimeUnit.SECONDS);
                if (!signaled) {
                    throw new TimeoutException("Timeout waiting for system to close (" + timeout + ")");
                }
            }
            catch (InterruptedException e) {
                log.trace("Interrupted", (Throwable)e);
            }
            catch (ExecutionException | TimeoutException e) {
                log.error("Execution or timeout exception while waiting for the control thread to close cleanly (state was {}). Try increasing your time-out to allow the system to drain, or close withing draining.", (Object)this.state, (Object)e);
                throw e;
            }
            log.trace("Still waiting for system to close...");
        }
    }

    private void doClose(Duration timeout) throws TimeoutException, ExecutionException {
        log.debug("Doing closing state: {}...", (Object)this.state);
        log.debug("Closing and waiting for broker poll system...");
        this.brokerPollSubsystem.closeAndWait();
        this.maybeCloseConsumer();
        this.producerManager.ifPresent(x -> x.close(timeout));
        log.debug("Shutting down execution pool...");
        List<Runnable> unfinished = this.workerPool.shutdownNow();
        if (!unfinished.isEmpty()) {
            log.warn("Threads not done: {}", unfinished);
        }
        log.trace("Awaiting worker pool termination...");
        boolean interrupted = true;
        while (interrupted) {
            log.debug("Still interrupted");
            try {
                boolean terminationFinishedWithoutTimeout = this.workerPool.awaitTermination(BackportUtils.toSeconds(DrainingCloseable.DEFAULT_TIMEOUT), TimeUnit.SECONDS);
                interrupted = false;
                if (terminationFinishedWithoutTimeout) continue;
                log.warn("workerPool await timeout!");
                boolean shutdown = this.workerPool.isShutdown();
                boolean bl = this.workerPool.isTerminated();
            }
            catch (InterruptedException e) {
                log.error("InterruptedException", (Throwable)e);
                interrupted = true;
            }
        }
        log.debug("Close complete.");
        this.state = State.closed;
    }

    private void maybeCloseConsumer() {
        if (this.isResponsibleForCommits()) {
            this.consumer.close();
        }
    }

    private boolean isResponsibleForCommits() {
        return this.committer instanceof ProducerManager;
    }

    public void waitForProcessedNotCommitted(Duration timeout) {
        log.debug("Waiting processed but not committed...");
        Timer timer = Time.SYSTEM.timer(timeout);
        while (this.wm.isRecordsAwaitingToBeCommitted()) {
            log.trace("Waiting for no in processing...");
            Thread.sleep(100L);
            timer.update();
            if (!timer.isExpired()) continue;
            throw new TimeoutException("Waiting for no more records in processing");
        }
        log.debug("No longer anything in flight.");
    }

    private boolean isRecordsAwaitingProcessing() {
        boolean isRecordsAwaitingProcessing = this.wm.isRecordsAwaitingProcessing();
        boolean threadsDone = this.areMyThreadsDone();
        log.trace("isRecordsAwaitingProcessing {} || threadsDone {}", (Object)isRecordsAwaitingProcessing, (Object)threadsDone);
        return isRecordsAwaitingProcessing || threadsDone;
    }

    private void transitionToDraining() {
        log.debug("Transitioning to draining...");
        this.state = State.draining;
        this.interruptControlThread();
    }

    private void interruptControlThread() {
        if (this.blockableControlThread != null) {
            log.debug("Interrupting {} thread in case it's waiting for work", (Object)this.blockableControlThread.getName());
            this.blockableControlThread.interrupt();
        }
    }

    private boolean areMyThreadsDone() {
        if (BackportUtils.isEmpty(this.controlThreadFuture)) {
            return false;
        }
        return this.controlThreadFuture.get().isDone();
    }

    protected <R> void supervisorLoop(Function<ConsumerRecord<K, V>, List<R>> userFunction, java.util.function.Consumer<R> callback) {
        log.info("Control loop starting up...");
        if (this.state != State.unused) {
            throw new IllegalStateException(StringUtils.msg("Invalid state - the consumer cannot be used more than once (current state is {})", new Object[]{this.state}));
        }
        this.state = State.running;
        Callable<Boolean> controlTask = () -> {
            Thread controlThread = Thread.currentThread();
            controlThread.setName("control");
            log.trace("Control task scheduled");
            this.blockableControlThread = controlThread;
            while (this.state != State.closed) {
                try {
                    this.controlLoop(userFunction, callback);
                }
                catch (Exception e) {
                    log.error("Error from poll control thread, will attempt controlled shutdown, then rethrow. Error: " + e.getMessage(), (Throwable)e);
                    this.doClose(DrainingCloseable.DEFAULT_TIMEOUT);
                    throw new RuntimeException("Error from poll control thread: " + e.getMessage(), e);
                }
            }
            log.info("Control loop ending clean (state:{})...", (Object)this.state);
            return true;
        };
        this.brokerPollSubsystem.start();
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<Boolean> controlTaskFutureResult = executorService.submit(controlTask);
        this.controlThreadFuture = Optional.of(controlTaskFutureResult);
    }

    private <R> void controlLoop(Function<ConsumerRecord<K, V>, List<R>> userFunction, java.util.function.Consumer<R> callback) throws TimeoutException, ExecutionException, InterruptedException {
        if (this.state == State.running || this.state == State.draining) {
            log.trace("Loop: Get work");
            List<WorkContainer<K, V>> records = this.wm.maybeGetWork();
            log.trace("Loop: Submit to pool");
            this.submitWorkToPool(userFunction, callback, records);
        }
        if (this.state == State.running && !this.wm.isSufficientlyLoaded()) {
            log.debug("Found not enough messages queued up, ensuring poller is awake");
            this.brokerPollSubsystem.wakeup();
        }
        log.trace("Loop: Process mailbox");
        this.processWorkCompleteMailBox();
        if (this.state == State.running) {
            log.trace("Loop: Maybe commit");
            this.commitOffsetsMaybe();
        }
        log.trace("Loop: Running {} loop end plugin(s)", (Object)this.controlLoopHooks.size());
        this.controlLoopHooks.forEach(Runnable::run);
        log.debug("Current state: {}", (Object)this.state);
        switch (this.state) {
            case draining: {
                this.drain();
                break;
            }
            case closing: {
                this.doClose(DrainingCloseable.DEFAULT_TIMEOUT);
            }
        }
        this.brokerPollSubsystem.supervise();
        log.trace("End of control loop, {} remaining in work manager. In state: {}", (Object)this.wm.getPartitionWorkRemainingCount(), (Object)this.state);
    }

    private void drain() {
        log.debug("Signaling to drain...");
        this.brokerPollSubsystem.drain();
        if (!this.isRecordsAwaitingProcessing()) {
            this.transitionToClosing();
        }
    }

    private void transitionToClosing() {
        log.debug("Transitioning to closing...");
        this.state = this.state == State.unused ? State.closed : State.closing;
        this.interruptControlThread();
    }

    private void processWorkCompleteMailBox() {
        log.trace("Processing mailbox (might block waiting or results)...");
        HashSet<WorkContainer<K, V>> results = new HashSet<WorkContainer<K, V>>();
        Duration timeout = this.getTimeToNextCommit();
        WorkContainer<K, V> firstBlockingPoll = null;
        try {
            log.debug("Blocking poll on work until next scheduled offset commit attempt for {}", (Object)timeout);
            this.currentlyPollingWorkCompleteMailBox.getAndSet(true);
            firstBlockingPoll = this.workMailBox.poll(timeout.toMillis(), TimeUnit.MILLISECONDS);
            this.currentlyPollingWorkCompleteMailBox.getAndSet(false);
        }
        catch (InterruptedException e) {
            log.debug("Interrupted waiting on work results");
        }
        if (firstBlockingPoll == null) {
            log.debug("Mailbox results returned null, indicating timeout (which was set as {}) or interruption during a blocking wait for returned work results", (Object)timeout);
        } else {
            results.add(firstBlockingPoll);
        }
        int size = this.workMailBox.size();
        log.trace("Draining {} more, got {} already...", (Object)size, (Object)results.size());
        for (Integer n : Range.range(size)) {
            WorkContainer<K, V> secondPollNonBlocking = null;
            try {
                secondPollNonBlocking = this.workMailBox.poll(0L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                log.debug("Interrupted waiting on work results", (Throwable)e);
            }
            if (secondPollNonBlocking == null) continue;
            results.add(secondPollNonBlocking);
        }
        log.trace("Processing drained work {}...", (Object)results.size());
        for (WorkContainer workContainer : results) {
            MDC.put((String)"offset", (String)workContainer.toString());
            this.handleFutureResult(workContainer);
            MDC.clear();
        }
    }

    private void commitOffsetsMaybe() {
        boolean shouldCommitNow;
        Duration elapsedSinceLast = this.getTimeSinceLastCommit();
        boolean commitFrequencyOK = BackportUtils.toSeconds(elapsedSinceLast) >= BackportUtils.toSeconds(this.timeBetweenCommits);
        boolean bl = shouldCommitNow = commitFrequencyOK || !this.lingeringOnCommitWouldBeBeneficial() || this.isCommandedToCommit();
        if (shouldCommitNow) {
            if (!commitFrequencyOK) {
                log.debug("Commit too frequent, but no benefit in lingering");
            }
            this.commitOffsetsThatAreReady();
            this.lastCommit = Instant.now();
        } else if (log.isDebugEnabled()) {
            if (this.wm.hasCommittableOffsets()) {
                log.debug("Have offsets to commit, but not enough time elapsed ({}), waiting for at least {}...", (Object)elapsedSinceLast, (Object)this.timeBetweenCommits);
            } else {
                log.trace("Could commit now, but no offsets commitable");
            }
        }
    }

    private boolean lingeringOnCommitWouldBeBeneficial() {
        boolean workIsWaitingToBeCompletedSuccessfully = this.wm.workIsWaitingToBeCompletedSuccessfully();
        boolean noWorkInFlight = !this.wm.hasWorkInFlight();
        boolean workWaitingInMailbox = !this.workMailBox.isEmpty();
        log.trace("workIsWaitingToBeCompletedSuccessfully {} || noWorkInFlight {} || workWaitingInMailbox {};", new Object[]{workIsWaitingToBeCompletedSuccessfully, noWorkInFlight, workWaitingInMailbox});
        return workIsWaitingToBeCompletedSuccessfully || noWorkInFlight || workWaitingInMailbox;
    }

    private Duration getTimeToNextCommit() {
        if (this.state == State.running || this.state == State.draining) {
            return this.getTimeBetweenCommits().minus(this.getTimeSinceLastCommit());
        }
        log.debug("System not {} (state: {}), so don't wait to commit, only a small thread yield time", (Object)State.running, (Object)this.state);
        return Duration.ZERO;
    }

    private Duration getTimeSinceLastCommit() {
        Instant now = this.clock.getNow();
        return Duration.between(this.lastCommit, now);
    }

    private void commitOffsetsThatAreReady() {
        this.committer.retrieveOffsetsAndCommit();
    }

    protected void handleFutureResult(WorkContainer<K, V> wc) {
        if (wc.getUserFunctionSucceeded().get().booleanValue()) {
            this.onSuccess(wc);
        } else {
            this.onFailure(wc);
        }
    }

    private void onFailure(WorkContainer<K, V> wc) {
        this.wm.failed(wc);
    }

    protected void onSuccess(WorkContainer<K, V> wc) {
        log.trace("Processing success...");
        this.wm.success(wc);
    }

    private <R> void submitWorkToPool(Function<ConsumerRecord<K, V>, List<R>> usersFunction, java.util.function.Consumer<R> callback, List<WorkContainer<K, V>> workToProcess) {
        if (!workToProcess.isEmpty()) {
            log.debug("New work incoming: {}, Pool stats: {}", (Object)workToProcess.size(), (Object)this.workerPool);
            for (WorkContainer<K, V> work : workToProcess) {
                log.trace("Sending work ({}) to pool", work);
                Future<List<Object>> outputRecordFuture = this.workerPool.submit(() -> this.userFunctionRunner(usersFunction, callback, work));
                work.setFuture(outputRecordFuture);
            }
        }
    }

    protected <R> List<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> userFunctionRunner(Function<ConsumerRecord<K, V>, List<R>> usersFunction, java.util.function.Consumer<R> callback, WorkContainer<K, V> wc) {
        try {
            MDC.put((String)"offset", (String)wc.toString());
            log.trace("Pool received: {}", wc);
            ConsumerRecord<K, V> rec = wc.getCr();
            List<R> resultsFromUserFunction = usersFunction.apply(rec);
            this.onUserFunctionSuccess(wc, resultsFromUserFunction);
            ArrayList<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> intermediateResults = new ArrayList<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>>();
            for (R result : resultsFromUserFunction) {
                log.trace("Running users's call back...");
                callback.accept(result);
            }
            log.trace("User function future registered");
            this.addToMailBoxOnUserFunctionSuccess(wc, resultsFromUserFunction);
            return intermediateResults;
        }
        catch (Exception e) {
            log.debug("Error processing record", (Throwable)e);
            wc.onUserFunctionFailure();
            this.addToMailbox(wc);
            throw e;
        }
    }

    protected void addToMailBoxOnUserFunctionSuccess(WorkContainer<K, V> wc, List<?> resultsFromUserFunction) {
        this.addToMailbox(wc);
    }

    protected void onUserFunctionSuccess(WorkContainer<K, V> wc, List<?> resultsFromUserFunction) {
        log.trace("User function success");
        wc.onUserFunctionSuccess();
    }

    protected void addToMailbox(WorkContainer<K, V> wc) {
        log.trace("Adding {} to mailbox...", wc);
        this.workMailBox.add(wc);
        log.trace("Finished adding. {}", wc);
    }

    void notifyNewWorkRegistered() {
        if (this.currentlyPollingWorkCompleteMailBox.get()) {
            if (this.producerManager.isPresent() && !this.producerManager.get().isTransactionInProgress()) {
                log.trace("Interrupting control thread: Knock knock, wake up! You've got mail (tm)!");
                this.blockableControlThread.interrupt();
            } else {
                log.trace("Would have interrupted control thread, but TX in progress");
            }
        } else {
            log.trace("Work box not being polled currently, so thread not blocked, will come around to the bail box in the next looop.");
        }
    }

    @Override
    public int workRemaining() {
        return this.wm.getPartitionWorkRemainingCount();
    }

    void addLoopEndCallBack(Runnable r) {
        this.controlLoopHooks.add(r);
    }

    public void setLongPollTimeout(Duration ofMillis) {
        BrokerPollSystem.setLongPollTimeout(ofMillis);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void requestCommitAsap() {
        log.debug("Registering command to commit next chance");
        AtomicBoolean atomicBoolean = this.commitCommand;
        synchronized (atomicBoolean) {
            this.commitCommand.set(true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isCommandedToCommit() {
        AtomicBoolean atomicBoolean = this.commitCommand;
        synchronized (atomicBoolean) {
            boolean commitAsap = this.commitCommand.get();
            if (commitAsap) {
                log.debug("Command to commit asap received, clearing");
                this.commitCommand.set(false);
            }
            return commitAsap;
        }
    }

    void setClock(WallClock clock) {
        this.clock = clock;
    }

    public void setTimeBetweenCommits(Duration timeBetweenCommits) {
        this.timeBetweenCommits = timeBetweenCommits;
    }

    public Duration getTimeBetweenCommits() {
        return this.timeBetweenCommits;
    }

    WorkManager<K, V> getWm() {
        return this.wm;
    }

    static enum State {
        unused,
        running,
        draining,
        closing,
        closed;

    }
}

