/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.SubtaskAccess;
import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
import org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public class OperatorCoordinatorHolderTest
extends TestLogger {
    private final Consumer<Throwable> globalFailureHandler = t -> {
        this.globalFailure = t;
    };
    private Throwable globalFailure;

    @After
    public void checkNoGlobalFailure() throws Exception {
        if (this.globalFailure != null) {
            ExceptionUtils.rethrowException((Throwable)this.globalFailure);
        }
    }

    @Test
    public void checkpointFutureInitiallyNotDone() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        CompletableFuture checkpointFuture = new CompletableFuture();
        holder.checkpointCoordinator(1L, checkpointFuture);
        Assert.assertFalse((boolean)checkpointFuture.isDone());
    }

    @Test
    public void completedCheckpointFuture() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        byte[] testData = new byte[]{11, 22, 33, 44};
        CompletableFuture checkpointFuture = new CompletableFuture();
        holder.checkpointCoordinator(9L, checkpointFuture);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getLastTriggeredCheckpoint().complete(testData);
        Assert.assertTrue((boolean)checkpointFuture.isDone());
        Assert.assertArrayEquals((byte[])testData, (byte[])((byte[])checkpointFuture.get()));
    }

    @Test
    public void eventsBeforeCheckpointFutureCompletionPassThrough() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        holder.checkpointCoordinator(1L, new CompletableFuture());
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(1).sendEvent((OperatorEvent)new TestOperatorEvent(1));
        MatcherAssert.assertThat(tasks.getSentEventsForSubtask(1), (Matcher)Matchers.contains((Object[])new OperatorEvent[]{new TestOperatorEvent(1)}));
    }

    @Test
    public void eventsAreBlockedAfterCheckpointFutureCompletes() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        this.triggerAndCompleteCheckpoint(holder, 10L);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(1337));
        Assert.assertEquals((long)0L, (long)tasks.getNumberOfSentEvents());
    }

    @Test
    public void abortedCheckpointReleasesBlockedEvents() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        this.triggerAndCompleteCheckpoint(holder, 123L);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(1337));
        holder.abortCurrentTriggering();
        MatcherAssert.assertThat(tasks.getSentEventsForSubtask(0), (Matcher)Matchers.contains((Object[])new OperatorEvent[]{new TestOperatorEvent(1337)}));
    }

    @Test
    public void sourceBarrierInjectionReleasesBlockedEvents() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        this.triggerAndCompleteCheckpoint(holder, 1111L);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(1337));
        holder.afterSourceBarrierInjection(1111L);
        MatcherAssert.assertThat(tasks.getSentEventsForSubtask(0), (Matcher)Matchers.contains((Object[])new OperatorEvent[]{new TestOperatorEvent(1337)}));
    }

    @Test
    public void restoreOpensValveEvents() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        this.triggerAndCompleteCheckpoint(holder, 1000L);
        holder.resetToCheckpoint(1L, new byte[0]);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(1).sendEvent((OperatorEvent)new TestOperatorEvent(999));
        MatcherAssert.assertThat(tasks.getSentEventsForSubtask(1), (Matcher)Matchers.contains((Object[])new OperatorEvent[]{new TestOperatorEvent(999)}));
    }

    @Test
    public void lateCompleteCheckpointFutureDoesNotBlockEvents() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        CompletableFuture holderFuture = new CompletableFuture();
        holder.checkpointCoordinator(1000L, holderFuture);
        CompletableFuture<byte[]> future1 = OperatorCoordinatorHolderTest.getCoordinator(holder).getLastTriggeredCheckpoint();
        holder.abortCurrentTriggering();
        this.triggerAndCompleteCheckpoint(holder, 1010L);
        holder.afterSourceBarrierInjection(1010L);
        future1.complete(new byte[0]);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(123));
        MatcherAssert.assertThat(tasks.events, (Matcher)Matchers.contains((Object[])new EventReceivingTasks.EventWithSubtask[]{new EventReceivingTasks.EventWithSubtask(new TestOperatorEvent(123), 0)}));
    }

    @Test
    public void triggeringFailsIfOtherTriggeringInProgress() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        holder.checkpointCoordinator(11L, new CompletableFuture());
        CompletableFuture future = new CompletableFuture();
        holder.checkpointCoordinator(12L, future);
        Assert.assertTrue((boolean)future.isCompletedExceptionally());
        Assert.assertNotNull((Object)this.globalFailure);
        this.globalFailure = null;
    }

    @Test
    public void takeCheckpointAfterSuccessfulCheckpoint() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(0));
        this.triggerAndCompleteCheckpoint(holder, 22L);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(1));
        holder.afterSourceBarrierInjection(22L);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(2));
        this.triggerAndCompleteCheckpoint(holder, 23L);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(3));
        holder.afterSourceBarrierInjection(23L);
        MatcherAssert.assertThat(tasks.getSentEventsForSubtask(0), (Matcher)Matchers.contains((Object[])new OperatorEvent[]{new TestOperatorEvent(0), new TestOperatorEvent(1), new TestOperatorEvent(2), new TestOperatorEvent(3)}));
    }

    @Test
    public void takeCheckpointAfterAbortedCheckpoint() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(0));
        this.triggerAndCompleteCheckpoint(holder, 22L);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(1));
        holder.abortCurrentTriggering();
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(2));
        this.triggerAndCompleteCheckpoint(holder, 23L);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(3));
        holder.afterSourceBarrierInjection(23L);
        MatcherAssert.assertThat(tasks.getSentEventsForSubtask(0), (Matcher)Matchers.contains((Object[])new OperatorEvent[]{new TestOperatorEvent(0), new TestOperatorEvent(1), new TestOperatorEvent(2), new TestOperatorEvent(3)}));
    }

    @Test
    public void testFailingJobMultipleTimesNotCauseCascadingJobFailure() throws Exception {
        Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorProvider = context -> new TestingOperatorCoordinator((OperatorCoordinator.Context)context, (OperatorCoordinator.Context)context){
            final /* synthetic */ OperatorCoordinator.Context val$context;
            {
                this.val$context = context2;
                super(context);
            }

            @Override
            public void handleEventFromOperator(int subtask, OperatorEvent event) {
                this.val$context.failJob((Throwable)new RuntimeException("Artificial Exception"));
            }
        };
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, coordinatorProvider);
        holder.handleEventFromOperator(0, (OperatorEvent)new TestOperatorEvent());
        Assert.assertNotNull((Object)this.globalFailure);
        Throwable firstGlobalFailure = this.globalFailure;
        holder.handleEventFromOperator(1, (OperatorEvent)new TestOperatorEvent());
        Assert.assertEquals((String)"The global failure should be the same instance because the contextshould only take the first request from the coordinator to fail the job.", (Object)firstGlobalFailure, (Object)this.globalFailure);
        holder.resetToCheckpoint(0L, new byte[0]);
        holder.handleEventFromOperator(1, (OperatorEvent)new TestOperatorEvent());
        Assert.assertNotEquals((String)"The new failures should be propagated after the coordinator is reset.", (Object)firstGlobalFailure, (Object)this.globalFailure);
        this.globalFailure = null;
    }

    @Test
    public void checkpointCompletionWaitsForEventFutures() throws Exception {
        CompletableFuture<Acknowledge> ackFuture = new CompletableFuture<Acknowledge>();
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasksWithRpcResult(ackFuture);
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(0));
        CompletableFuture<byte[]> checkpointFuture = this.triggerAndCompleteCheckpoint(holder, 22L);
        Assert.assertFalse((boolean)checkpointFuture.isDone());
        ackFuture.complete(Acknowledge.get());
        Assert.assertTrue((boolean)checkpointFuture.isDone());
    }

    @Test
    public void verifyCheckpointEventOrderWhenCheckpointFutureCompletedImmediately() throws Exception {
        this.checkpointEventValueAtomicity(FutureCompletedInstantlyTestCoordinator::new);
    }

    @Test
    public void verifyCheckpointEventOrderWhenCheckpointFutureCompletesLate() throws Exception {
        this.checkpointEventValueAtomicity(FutureCompletedAfterSendingEventsCoordinator::new);
    }

    private void checkpointEventValueAtomicity(Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor) throws Exception {
        ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
        ComponentMainThreadExecutorServiceAdapter mainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(executor, Thread.currentThread());
        EventReceivingTasks sender = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(sender, coordinatorCtor, mainThreadExecutor);
        Thread.sleep(new Random().nextInt(10));
        executor.triggerAll();
        CompletableFuture checkpointFuture = new CompletableFuture();
        holder.checkpointCoordinator(0L, checkpointFuture);
        executor.triggerAll();
        Thread.sleep(new Random().nextInt(10));
        holder.close();
        executor.triggerAll();
        Assert.assertTrue((boolean)checkpointFuture.isDone());
        int checkpointedNumber = OperatorCoordinatorHolderTest.bytesToInt((byte[])checkpointFuture.get());
        Assert.assertEquals((long)checkpointedNumber, (long)sender.getNumberOfSentEvents());
        for (int i = 0; i < checkpointedNumber; ++i) {
            Assert.assertEquals((long)i, (long)((TestOperatorEvent)sender.getAllSentEvents().get((int)i).event).getValue());
        }
    }

    @Test
    public void testCheckpointFailsIfSendingEventFailedAfterTrigger() throws Exception {
        CompletableFuture<Acknowledge> eventSendingResult = new CompletableFuture<Acknowledge>();
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(0));
        CompletableFuture checkpointResult = new CompletableFuture();
        holder.checkpointCoordinator(1L, checkpointResult);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]);
        eventSendingResult.completeExceptionally(new RuntimeException("Artificial"));
        Assert.assertTrue((boolean)checkpointResult.isCompletedExceptionally());
    }

    @Test
    public void testCheckpointFailsIfSendingEventFailedBeforeTrigger() throws Exception {
        ReorderableManualExecutorService executor = new ReorderableManualExecutorService();
        ComponentMainThreadExecutorServiceAdapter mainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(executor, Thread.currentThread());
        CompletableFuture<Acknowledge> eventSendingResult = new CompletableFuture<Acknowledge>();
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new, mainThreadExecutor);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(0));
        executor.triggerAll();
        executor.setDelayNewRunnables(true);
        eventSendingResult.completeExceptionally(new RuntimeException("Artificial"));
        executor.setDelayNewRunnables(false);
        CompletableFuture checkpointResult = new CompletableFuture();
        holder.checkpointCoordinator(1L, checkpointResult);
        executor.triggerAll();
        OperatorCoordinatorHolderTest.getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]);
        executor.triggerAll();
        Assert.assertFalse((boolean)checkpointResult.isDone());
        executor.executeAllDelayedRunnables();
        executor.triggerAll();
        Assert.assertTrue((boolean)checkpointResult.isCompletedExceptionally());
    }

    private CompletableFuture<byte[]> triggerAndCompleteCheckpoint(OperatorCoordinatorHolder holder, long checkpointId) throws Exception {
        CompletableFuture<byte[]> future = new CompletableFuture<byte[]>();
        holder.checkpointCoordinator(checkpointId, future);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]);
        return future;
    }

    static byte[] intToBytes(int value) {
        return ByteBuffer.allocate(4).putInt(value).array();
    }

    static int bytesToInt(byte[] bytes) {
        return ByteBuffer.wrap(bytes).getInt();
    }

    private static TestingOperatorCoordinator getCoordinator(OperatorCoordinatorHolder holder) {
        return (TestingOperatorCoordinator)holder.coordinator();
    }

    private OperatorCoordinatorHolder createCoordinatorHolder(SubtaskAccess.SubtaskAccessFactory eventTarget, Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor) throws Exception {
        return this.createCoordinatorHolder(eventTarget, coordinatorCtor, ComponentMainThreadExecutorServiceAdapter.forMainThread());
    }

    private OperatorCoordinatorHolder createCoordinatorHolder(SubtaskAccess.SubtaskAccessFactory eventTarget, final Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor, ComponentMainThreadExecutor mainThreadExecutor) throws Exception {
        final OperatorID opId = new OperatorID();
        OperatorCoordinator.Provider provider = new OperatorCoordinator.Provider(){

            public OperatorID getOperatorId() {
                return opId;
            }

            public OperatorCoordinator create(OperatorCoordinator.Context context) {
                return (OperatorCoordinator)coordinatorCtor.apply(context);
            }
        };
        OperatorCoordinatorHolder holder = OperatorCoordinatorHolder.create((OperatorID)opId, (OperatorCoordinator.Provider)provider, (String)"test-coordinator-name", (ClassLoader)((Object)((Object)this)).getClass().getClassLoader(), (int)3, (int)1775, (SubtaskAccess.SubtaskAccessFactory)eventTarget);
        holder.lazyInitialize(this.globalFailureHandler, mainThreadExecutor);
        holder.start();
        return holder;
    }

    private static abstract class CheckpointEventOrderTestBaseCoordinator
    implements OperatorCoordinator,
    Runnable {
        private final Thread coordinatorThread;
        protected final OperatorCoordinator.Context context;
        protected final OperatorCoordinator.SubtaskGateway[] subtaskGateways;
        private volatile boolean closed;

        CheckpointEventOrderTestBaseCoordinator(OperatorCoordinator.Context context) {
            this.context = context;
            this.subtaskGateways = new OperatorCoordinator.SubtaskGateway[context.currentParallelism()];
            this.coordinatorThread = new Thread(this);
        }

        public void start() throws Exception {
        }

        public void close() throws Exception {
            this.closed = true;
            this.coordinatorThread.interrupt();
            this.coordinatorThread.join();
        }

        public void handleEventFromOperator(int subtask, OperatorEvent event) {
        }

        public void subtaskFailed(int subtask, @Nullable Throwable reason) {
        }

        public void subtaskReset(int subtask, long checkpointId) {
        }

        public void subtaskReady(int subtask, OperatorCoordinator.SubtaskGateway gateway) {
            this.subtaskGateways[subtask] = gateway;
            for (OperatorCoordinator.SubtaskGateway subtaskGateway : this.subtaskGateways) {
                if (subtaskGateway != null) continue;
                return;
            }
            this.coordinatorThread.start();
        }

        public abstract void checkpointCoordinator(long var1, CompletableFuture<byte[]> var3) throws Exception;

        public void notifyCheckpointComplete(long checkpointId) {
        }

        public void resetToCheckpoint(long checkpointId, byte[] checkpointData) throws Exception {
        }

        @Override
        public void run() {
            try {
                while (!this.closed) {
                    this.step();
                }
            }
            catch (Throwable t) {
                if (this.closed) {
                    return;
                }
                t.printStackTrace();
                System.exit(-1);
            }
        }

        protected abstract void step() throws Exception;
    }

    private static final class FutureCompletedAfterSendingEventsCoordinator
    extends CheckpointEventOrderTestBaseCoordinator {
        private final OneShotLatch checkpointCompleted = new OneShotLatch();
        @Nullable
        private volatile CompletableFuture<byte[]> checkpoint;
        private int num;

        FutureCompletedAfterSendingEventsCoordinator(OperatorCoordinator.Context context) {
            super(context);
        }

        @Override
        public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
            this.checkpoint = result;
        }

        @Override
        protected void step() throws Exception {
            Thread.sleep(2L);
            this.subtaskGateways[0].sendEvent((OperatorEvent)new TestOperatorEvent(this.num++));
            this.subtaskGateways[1].sendEvent((OperatorEvent)new TestOperatorEvent(this.num++));
            this.subtaskGateways[2].sendEvent((OperatorEvent)new TestOperatorEvent(this.num++));
            CompletableFuture<byte[]> chkpnt = this.checkpoint;
            if (chkpnt != null) {
                chkpnt.complete(OperatorCoordinatorHolderTest.intToBytes(this.num));
                this.checkpointCompleted.trigger();
                this.checkpoint = null;
            }
        }

        @Override
        public void close() throws Exception {
            this.checkpointCompleted.await();
            super.close();
        }
    }

    private static final class FutureCompletedInstantlyTestCoordinator
    extends CheckpointEventOrderTestBaseCoordinator {
        private final ReentrantLock lock = new ReentrantLock(true);
        private final Condition condition = this.lock.newCondition();
        @Nullable
        @GuardedBy(value="lock")
        private CompletableFuture<byte[]> checkpoint;
        private int num;

        FutureCompletedInstantlyTestCoordinator(OperatorCoordinator.Context context) {
            super(context);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
            this.lock.lock();
            try {
                this.checkpoint = result;
                this.condition.await();
            }
            finally {
                this.lock.unlock();
            }
        }

        @Override
        protected void step() throws Exception {
            this.lock.lock();
            try {
                if (this.checkpoint != null) {
                    this.checkpoint.complete(OperatorCoordinatorHolderTest.intToBytes(this.num));
                    this.checkpoint = null;
                }
                this.subtaskGateways[0].sendEvent((OperatorEvent)new TestOperatorEvent(this.num++));
                this.condition.signalAll();
            }
            finally {
                this.lock.unlock();
            }
            Thread.sleep(2L);
        }
    }

    private static class ReorderableManualExecutorService
    extends ManuallyTriggeredScheduledExecutorService {
        private boolean delayNewRunnables;
        private final Queue<Runnable> delayedRunnables = new ArrayDeque<Runnable>();

        private ReorderableManualExecutorService() {
        }

        public void setDelayNewRunnables(boolean delayNewRunnables) {
            this.delayNewRunnables = delayNewRunnables;
        }

        @Override
        public void execute(@Nonnull Runnable command) {
            if (this.delayNewRunnables) {
                this.delayedRunnables.add(command);
            } else {
                super.execute(command);
            }
        }

        public void executeAllDelayedRunnables() {
            while (!this.delayedRunnables.isEmpty()) {
                super.execute(this.delayedRunnables.poll());
            }
        }
    }
}

