/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartAllFailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.AcknowledgeCheckpointEvent;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TestingCoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestingCheckpointStorageAccessCoordinatorView;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.SerializableFunction;
import org.assertj.core.api.AbstractByteArrayAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.OptionalAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.After;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;

public class OperatorCoordinatorSchedulerTest
extends TestLogger {
    private final JobVertexID testVertexId = new JobVertexID();
    private final OperatorID testOperatorId = new OperatorID();
    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    private final ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
    private DefaultScheduler createdScheduler;

    @After
    public void shutdownScheduler() throws Exception {
        if (this.createdScheduler != null) {
            this.closeScheduler(this.createdScheduler);
        }
    }

    @Test
    public void testCoordinatorStartedWhenSchedulerStarts() throws Exception {
        DefaultScheduler scheduler = this.createAndStartScheduler();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        Assertions.assertThat((boolean)coordinator.isStarted()).isTrue();
    }

    @Test
    public void testCoordinatorDisposedWhenSchedulerStops() throws Exception {
        DefaultScheduler scheduler = this.createAndStartScheduler();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.closeScheduler(scheduler);
        Assertions.assertThat((boolean)coordinator.isClosed()).isTrue();
    }

    @Test
    public void testFailureToStartPropagatesExceptions() throws Exception {
        TestingOperatorCoordinator.Provider failingCoordinatorProvider = new TestingOperatorCoordinator.Provider(this.testOperatorId, (SerializableFunction<OperatorCoordinator.Context, TestingOperatorCoordinator>)((SerializableFunction & Serializable)CoordinatorThatFailsInStart::new));
        DefaultScheduler scheduler = this.createScheduler(failingCoordinatorProvider);
        try {
            scheduler.startScheduling();
            Assertions.fail((String)"expected an exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testFailureToStartClosesCoordinator() throws Exception {
        TestingOperatorCoordinator.Provider failingCoordinatorProvider = new TestingOperatorCoordinator.Provider(this.testOperatorId, (SerializableFunction<OperatorCoordinator.Context, TestingOperatorCoordinator>)((SerializableFunction & Serializable)CoordinatorThatFailsInStart::new));
        DefaultScheduler scheduler = this.createScheduler(failingCoordinatorProvider);
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        try {
            scheduler.startScheduling();
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assertions.assertThat((boolean)coordinator.isClosed()).isTrue();
    }

    @Test
    public void deployingTaskFailureNotifiesCoordinator() throws Exception {
        DefaultScheduler scheduler = this.createAndStartScheduler();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failTask(scheduler, 1);
        ((ListAssert)Assertions.assertThat(coordinator.getFailedTasks()).hasSize(1)).containsExactly((Object[])new Integer[]{1});
    }

    @Test
    public void runningTaskFailureNotifiesCoordinator() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failTask(scheduler, 1);
        ((ListAssert)Assertions.assertThat(coordinator.getFailedTasks()).hasSize(1)).containsExactly((Object[])new Integer[]{1});
    }

    @Test
    public void cancellationAsPartOfFailoverNotifiesCoordinator() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerWithAllRestartOnFailureAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failTask(scheduler, 1);
        ((ListAssert)Assertions.assertThat(coordinator.getFailedTasks()).hasSize(2)).containsExactlyInAnyOrder((Object[])new Integer[]{0, 1});
    }

    @Test
    public void taskRepeatedFailureNotifyCoordinator() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failAndRestartTask(scheduler, 0);
        this.failAndRestartTask(scheduler, 0);
        ((ListAssert)Assertions.assertThat(coordinator.getFailedTasks()).hasSize(2)).containsExactly((Object[])new Integer[]{0, 0});
    }

    @Test
    public void taskGatewayNotSetBeforeTasksRunning() throws Exception {
        DefaultScheduler scheduler = this.createAndStartScheduler();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        OperatorCoordinator.SubtaskGateway gateway = coordinator.getSubtaskGateway(0);
        Assertions.assertThat((Object)gateway).isNull();
    }

    @Test
    public void taskGatewayAvailableWhenTasksRunning() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        OperatorCoordinator.SubtaskGateway gateway = coordinator.getSubtaskGateway(0);
        Assertions.assertThat((Object)gateway).isNotNull();
    }

    @Test
    public void taskTaskManagerFailuresAreReportedBack() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks(new FailingTaskExecutorOperatorEventGateway());
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        OperatorCoordinator.SubtaskGateway gateway = coordinator.getSubtaskGateway(0);
        CompletableFuture result = gateway.sendEvent((OperatorEvent)new TestOperatorEvent());
        this.executor.triggerAll();
        Assertions.assertThatThrownBy(result::get).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(TestException.class)});
    }

    @Ignore
    @Test
    public void deployingTaskCancellationNotifiesCoordinator() throws Exception {
        DefaultScheduler scheduler = this.createAndStartScheduler();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.cancelTask(scheduler, 1);
        ((ListAssert)Assertions.assertThat(coordinator.getFailedTasks()).hasSize(1)).containsExactly((Object[])new Integer[]{1});
    }

    @Ignore
    @Test
    public void runningTaskCancellationNotifiesCoordinator() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.cancelTask(scheduler, 0);
        ((ListAssert)Assertions.assertThat(coordinator.getFailedTasks()).hasSize(1)).containsExactly((Object[])new Integer[]{0});
    }

    @Test
    public void testTakeCheckpoint() throws Exception {
        byte[] checkpointData = new byte[656];
        new Random().nextBytes(checkpointData);
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        CompletableFuture<CompletedCheckpoint> checkpointFuture = this.triggerCheckpoint(scheduler);
        coordinator.getLastTriggeredCheckpoint().complete(checkpointData);
        this.executor.triggerAll();
        AcknowledgeCheckpointEvent event = new AcknowledgeCheckpointEvent(coordinator.getLastTriggeredCheckpointId());
        OperatorCoordinatorHolder holder = this.getCoordinatorHolder(scheduler);
        for (int i = 0; i < holder.currentParallelism(); ++i) {
            holder.handleEventFromOperator(i, 0, (OperatorEvent)event);
        }
        this.acknowledgeCurrentCheckpoint(scheduler);
        OperatorState state = (OperatorState)checkpointFuture.get().getOperatorStates().get(this.testOperatorId);
        Assertions.assertThat((byte[])OperatorCoordinatorSchedulerTest.getStateHandleContents((StreamStateHandle)state.getCoordinatorState())).containsExactly(checkpointData);
    }

    @Test
    public void testSnapshotSyncFailureFailsCheckpoint() throws Exception {
        TestingOperatorCoordinator.Provider failingCoordinatorProvider = new TestingOperatorCoordinator.Provider(this.testOperatorId, (SerializableFunction<OperatorCoordinator.Context, TestingOperatorCoordinator>)((SerializableFunction & Serializable)CoordinatorThatFailsCheckpointing::new));
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks(failingCoordinatorProvider);
        CompletableFuture<CompletedCheckpoint> checkpointFuture = this.triggerCheckpoint(scheduler);
        Assertions.assertThatThrownBy(checkpointFuture::get).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(TestException.class)});
    }

    @Test
    public void testSnapshotAsyncFailureFailsCheckpoint() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        CompletableFuture<CompletedCheckpoint> checkpointFuture = this.triggerCheckpoint(scheduler);
        CompletableFuture<byte[]> coordinatorStateFuture = coordinator.getLastTriggeredCheckpoint();
        coordinatorStateFuture.completeExceptionally(new TestException());
        this.waitForCompletionToPropagate(checkpointFuture);
        Assertions.assertThatThrownBy(checkpointFuture::get).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(TestException.class)});
    }

    @Test
    public void testSavepointRestoresCoordinator() throws Exception {
        byte[] testCoordinatorState = new byte[123];
        new Random().nextBytes(testCoordinatorState);
        DefaultScheduler scheduler = this.createSchedulerWithRestoredSavepoint(testCoordinatorState);
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        byte[] restoredState = coordinator.getLastRestoredCheckpointState();
        Assertions.assertThat((byte[])restoredState).containsExactly(testCoordinatorState);
    }

    @Test
    public void testGlobalFailureResetsToCheckpoint() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        byte[] coordinatorState = new byte[]{7, 11, 3, 5};
        this.takeCompleteCheckpoint(scheduler, coordinator, coordinatorState);
        this.failGlobalAndRestart(scheduler, new TestException());
        ((AbstractByteArrayAssert)Assertions.assertThat((byte[])coordinator.getLastRestoredCheckpointState()).as("coordinator should have a restored checkpoint", new Object[0])).containsExactly(coordinatorState);
    }

    @Test
    public void testGlobalFailureBeforeCheckpointResetsToEmptyState() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failGlobalAndRestart(scheduler, new TestException());
        ((AbstractByteArrayAssert)Assertions.assertThat((byte[])coordinator.getLastRestoredCheckpointState()).as("coordinator should have null restored state", new Object[0])).isEqualTo((Object)TestingOperatorCoordinator.NULL_RESTORE_VALUE);
        Assertions.assertThat((long)coordinator.getLastRestoredCheckpointId()).isEqualTo(-1L);
    }

    @Test
    public void testGlobalFailureTwiceWillNotResetToCheckpointTwice() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        AtomicInteger resetToCheckpointCounter = new AtomicInteger(0);
        coordinator.setResetToCheckpointConsumer((ignore1, ignore2) -> resetToCheckpointCounter.incrementAndGet());
        scheduler.handleGlobalFailure((Throwable)new TestException());
        this.failGlobalAndRestart(scheduler, new TestException());
        Assertions.assertThat((int)resetToCheckpointCounter.get()).isEqualTo(1);
        ((AbstractByteArrayAssert)Assertions.assertThat((byte[])coordinator.getLastRestoredCheckpointState()).as("coordinator should have null restored state", new Object[0])).isEqualTo((Object)TestingOperatorCoordinator.NULL_RESTORE_VALUE);
        Assertions.assertThat((long)coordinator.getLastRestoredCheckpointId()).isEqualTo(-1L);
    }

    @Test
    public void testGlobalFailoverDoesNotNotifyLocalRestore() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.takeCompleteCheckpoint(scheduler, coordinator, new byte[0]);
        this.failGlobalAndRestart(scheduler, new TestException());
        Assertions.assertThat(coordinator.getRestoredTasks()).isEmpty();
    }

    @Test
    public void testLocalFailoverResetsTask() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        long checkpointId = this.takeCompleteCheckpoint(scheduler, coordinator, new byte[0]);
        this.failAndRestartTask(scheduler, 1);
        Assertions.assertThat(coordinator.getRestoredTasks()).hasSize(1);
        TestingOperatorCoordinator.SubtaskAndCheckpoint restoredTask = coordinator.getRestoredTasks().get(0);
        Assertions.assertThat((int)restoredTask.subtaskIndex).isEqualTo(1);
        Assertions.assertThat((long)restoredTask.checkpointId).isEqualTo(checkpointId);
    }

    @Test
    public void testLocalFailoverBeforeCheckpointResetsTask() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failAndRestartTask(scheduler, 1);
        Assertions.assertThat(coordinator.getRestoredTasks()).hasSize(1);
        TestingOperatorCoordinator.SubtaskAndCheckpoint restoredTask = coordinator.getRestoredTasks().get(0);
        Assertions.assertThat((int)restoredTask.subtaskIndex).isEqualTo(1);
        Assertions.assertThat((long)restoredTask.checkpointId).isEqualTo(-1L);
    }

    @Test
    public void testLocalFailoverDoesNotResetToCheckpoint() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.takeCompleteCheckpoint(scheduler, coordinator, new byte[]{37, 11, 83, 4});
        this.failAndRestartTask(scheduler, 0);
        ((AbstractByteArrayAssert)Assertions.assertThat((byte[])coordinator.getLastRestoredCheckpointState()).as("coordinator should not have a restored checkpoint", new Object[0])).isNull();
    }

    @Test
    public void testConfirmCheckpointComplete() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        long checkpointId = this.takeCompleteCheckpoint(scheduler, coordinator, new byte[]{37, 11, 83, 4});
        ((AbstractLongAssert)Assertions.assertThat((long)coordinator.getLastCheckpointComplete()).as("coordinator should be notified of completed checkpoint", new Object[0])).isEqualTo(checkpointId);
    }

    @Test
    public void testBatchGlobalFailureResetsToEmptyState() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerWithoutCheckpointingAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failGlobalAndRestart(scheduler, new TestException());
        ((AbstractByteArrayAssert)Assertions.assertThat((byte[])coordinator.getLastRestoredCheckpointState()).as("coordinator should have null restored state", new Object[0])).isEqualTo((Object)TestingOperatorCoordinator.NULL_RESTORE_VALUE);
        Assertions.assertThat((long)coordinator.getLastRestoredCheckpointId()).isEqualTo(-1L);
    }

    @Test
    public void testBatchGlobalFailoverDoesNotNotifyLocalRestore() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerWithoutCheckpointingAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failGlobalAndRestart(scheduler, new TestException());
        Assertions.assertThat(coordinator.getRestoredTasks()).isEmpty();
    }

    @Test
    public void testBatchLocalFailoverResetsTask() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerWithoutCheckpointingAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failAndRestartTask(scheduler, 1);
        Assertions.assertThat(coordinator.getRestoredTasks()).hasSize(1);
        TestingOperatorCoordinator.SubtaskAndCheckpoint restoredTask = coordinator.getRestoredTasks().get(0);
        Assertions.assertThat((int)restoredTask.subtaskIndex).isEqualTo(1);
        Assertions.assertThat((long)restoredTask.checkpointId).isEqualTo(-1L);
    }

    @Test
    public void testBatchLocalFailoverDoesNotResetToCheckpoint() throws Exception {
        DefaultScheduler scheduler = this.createSchedulerWithoutCheckpointingAndDeployTasks();
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        this.failAndRestartTask(scheduler, 0);
        ((AbstractByteArrayAssert)Assertions.assertThat((byte[])coordinator.getLastRestoredCheckpointState()).as("coordinator should not have a restored checkpoint", new Object[0])).isNull();
    }

    @Test
    public void testDeliveringClientRequestToRequestHandler() throws Exception {
        TestingCoordinationRequestHandler.Provider provider = new TestingCoordinationRequestHandler.Provider(this.testOperatorId);
        DefaultScheduler scheduler = this.createScheduler(provider);
        String payload = "testing payload";
        TestingCoordinationRequestHandler.Request<String> request = new TestingCoordinationRequestHandler.Request<String>("testing payload");
        TestingCoordinationRequestHandler.Response response = (TestingCoordinationRequestHandler.Response)scheduler.deliverCoordinationRequestToCoordinator(this.testOperatorId, request).get();
        Assertions.assertThat((String)((String)response.getPayload())).isEqualTo("testing payload");
    }

    @Test
    public void testDeliveringClientRequestToNonRequestHandler() throws Exception {
        TestingOperatorCoordinator.Provider provider = new TestingOperatorCoordinator.Provider(this.testOperatorId);
        DefaultScheduler scheduler = this.createScheduler(provider);
        String payload = "testing payload";
        TestingCoordinationRequestHandler.Request<String> request = new TestingCoordinationRequestHandler.Request<String>("testing payload");
        CommonTestUtils.assertThrows((String)"cannot handle client event", FlinkException.class, () -> scheduler.deliverCoordinationRequestToCoordinator(this.testOperatorId, (CoordinationRequest)request));
    }

    @Test
    public void testDeliveringClientRequestToNonExistingCoordinator() throws Exception {
        TestingOperatorCoordinator.Provider provider = new TestingOperatorCoordinator.Provider(this.testOperatorId);
        DefaultScheduler scheduler = this.createScheduler(provider);
        String payload = "testing payload";
        TestingCoordinationRequestHandler.Request<String> request = new TestingCoordinationRequestHandler.Request<String>("testing payload");
        CommonTestUtils.assertThrows((String)"does not exist", FlinkException.class, () -> scheduler.deliverCoordinationRequestToCoordinator(new OperatorID(), (CoordinationRequest)request));
    }

    private DefaultScheduler createScheduler(OperatorCoordinator.Provider provider) throws Exception {
        return this.setupTestJobAndScheduler(provider);
    }

    private DefaultScheduler createAndStartScheduler() throws Exception {
        DefaultScheduler scheduler = this.setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(this.testOperatorId));
        scheduler.startScheduling();
        this.executor.triggerAll();
        Assertions.assertThat((Comparable)SchedulerTestingUtils.getExecutionState(scheduler, this.testVertexId, 0)).isEqualTo((Object)ExecutionState.DEPLOYING);
        return scheduler;
    }

    private DefaultScheduler createSchedulerAndDeployTasks() throws Exception {
        return this.createSchedulerAndDeployTasks(new TestingOperatorCoordinator.Provider(this.testOperatorId));
    }

    private DefaultScheduler createSchedulerWithAllRestartOnFailureAndDeployTasks() throws Exception {
        DefaultScheduler scheduler = this.setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(this.testOperatorId), null, null, true);
        this.scheduleAllTasksToRunning(scheduler);
        return scheduler;
    }

    private DefaultScheduler createSchedulerWithoutCheckpointingAndDeployTasks() throws Exception {
        Consumer<JobGraph> noCheckpoints = jobGraph -> jobGraph.setSnapshotSettings(null);
        DefaultScheduler scheduler = this.setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(this.testOperatorId), null, noCheckpoints, false);
        Assertions.assertThat((Object)scheduler.getExecutionGraph().getCheckpointCoordinator()).isNull();
        this.scheduleAllTasksToRunning(scheduler);
        return scheduler;
    }

    private DefaultScheduler createSchedulerAndDeployTasks(OperatorCoordinator.Provider provider) throws Exception {
        DefaultScheduler scheduler = this.setupTestJobAndScheduler(provider);
        this.scheduleAllTasksToRunning(scheduler);
        return scheduler;
    }

    private DefaultScheduler createSchedulerAndDeployTasks(TaskExecutorOperatorEventGateway gateway) throws Exception {
        DefaultScheduler scheduler = this.setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(this.testOperatorId), gateway, null, false);
        this.scheduleAllTasksToRunning(scheduler);
        return scheduler;
    }

    private DefaultScheduler createSchedulerWithRestoredSavepoint(byte[] coordinatorState) throws Exception {
        byte[] savepointMetadata = OperatorCoordinatorSchedulerTest.serializeAsCheckpointMetadata(this.testOperatorId, coordinatorState);
        String savepointPointer = "testingSavepointPointer";
        TestingCheckpointStorageAccessCoordinatorView storage = new TestingCheckpointStorageAccessCoordinatorView();
        storage.registerSavepoint("testingSavepointPointer", savepointMetadata);
        Consumer<JobGraph> savepointConfigurer = jobGraph -> {
            SchedulerTestingUtils.enableCheckpointing(jobGraph, new ModernStateBackend(), storage.asCheckpointStorage());
            jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)"testingSavepointPointer"));
        };
        DefaultScheduler scheduler = this.setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(this.testOperatorId), null, savepointConfigurer, false);
        scheduler.startScheduling();
        return scheduler;
    }

    private DefaultScheduler setupTestJobAndScheduler(OperatorCoordinator.Provider provider) throws Exception {
        return this.setupTestJobAndScheduler(provider, null, null, false);
    }

    private DefaultScheduler setupTestJobAndScheduler(OperatorCoordinator.Provider provider, @Nullable TaskExecutorOperatorEventGateway taskExecutorOperatorEventGateway, @Nullable Consumer<JobGraph> jobGraphPreProcessing, boolean restartAllOnFailover) throws Exception {
        DefaultScheduler scheduler;
        DefaultSchedulerBuilder schedulerBuilder;
        OperatorIDPair opIds = OperatorIDPair.of((OperatorID)new OperatorID(), (OperatorID)provider.getOperatorId());
        JobVertex vertex = new JobVertex("Vertex with OperatorCoordinator", this.testVertexId, Collections.singletonList(opIds));
        vertex.setInvokableClass(NoOpInvokable.class);
        vertex.addOperatorCoordinator(new SerializedValue((Object)provider));
        vertex.setParallelism(2);
        JobGraph jobGraph = JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(vertex).build();
        SchedulerTestingUtils.enableCheckpointing(jobGraph);
        if (jobGraphPreProcessing != null) {
            jobGraphPreProcessing.accept(jobGraph);
        }
        ComponentMainThreadExecutorServiceAdapter mainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(this.executor, Thread.currentThread());
        DefaultSchedulerBuilder defaultSchedulerBuilder = schedulerBuilder = taskExecutorOperatorEventGateway == null ? OperatorCoordinatorSchedulerTest.createSchedulerBuilder(jobGraph, mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()) : OperatorCoordinatorSchedulerTest.createSchedulerBuilder(jobGraph, (ComponentMainThreadExecutor)mainThreadExecutor, taskExecutorOperatorEventGateway, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        if (restartAllOnFailover) {
            schedulerBuilder.setFailoverStrategyFactory((FailoverStrategy.Factory)new RestartAllFailoverStrategy.Factory());
        }
        this.createdScheduler = scheduler = schedulerBuilder.setFutureExecutor(this.executor).setDelayExecutor(this.executor).build();
        return scheduler;
    }

    private static DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor, ScheduledExecutorService scheduledExecutorService) {
        return OperatorCoordinatorSchedulerTest.createSchedulerBuilder(jobGraph, mainThreadExecutor, new SimpleAckingTaskManagerGateway(), scheduledExecutorService);
    }

    private static DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor, TaskExecutorOperatorEventGateway operatorEventGateway, ScheduledExecutorService scheduledExecutorService) {
        TaskManagerGateway gateway = operatorEventGateway instanceof TaskManagerGateway ? (TaskManagerGateway)operatorEventGateway : new TaskExecutorOperatorEventGatewayAdapter(operatorEventGateway);
        return OperatorCoordinatorSchedulerTest.createSchedulerBuilder(jobGraph, mainThreadExecutor, gateway, scheduledExecutorService);
    }

    private static DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor, TaskManagerGateway taskManagerGateway, ScheduledExecutorService executorService) {
        return new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, executorService).setSchedulingStrategyFactory((SchedulingStrategyFactory)new PipelinedRegionSchedulingStrategy.Factory()).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0L)).setExecutionSlotAllocatorFactory(new TestExecutionSlotAllocatorFactory(taskManagerGateway));
    }

    private void scheduleAllTasksToRunning(DefaultScheduler scheduler) {
        scheduler.startScheduling();
        this.executor.triggerAll();
        this.executor.triggerScheduledTasks();
        SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
        Assertions.assertThat((Comparable)SchedulerTestingUtils.getExecutionState(scheduler, this.testVertexId, 0)).isEqualTo((Object)ExecutionState.RUNNING);
        this.executor.triggerAll();
    }

    private TestingOperatorCoordinator getCoordinator(DefaultScheduler scheduler) {
        OperatorCoordinatorHolder holder = this.getCoordinatorHolder(scheduler);
        OperatorCoordinator coordinator = holder.coordinator();
        Assertions.assertThat((Object)coordinator).isInstanceOf(TestingOperatorCoordinator.class);
        return (TestingOperatorCoordinator)coordinator;
    }

    private OperatorCoordinatorHolder getCoordinatorHolder(DefaultScheduler scheduler) {
        ExecutionJobVertex vertexWithCoordinator = OperatorCoordinatorSchedulerTest.getJobVertex(scheduler, this.testVertexId);
        ((ObjectAssert)Assertions.assertThat((Object)vertexWithCoordinator).as("vertex for coordinator not found", new Object[0])).isNotNull();
        Optional<OperatorCoordinatorHolder> coordinatorOptional = vertexWithCoordinator.getOperatorCoordinators().stream().filter(holder -> holder.operatorId().equals((Object)this.testOperatorId)).findFirst();
        ((OptionalAssert)Assertions.assertThat(coordinatorOptional).as("vertex does not contain coordinator", new Object[0])).isPresent();
        return coordinatorOptional.get();
    }

    private void failTask(DefaultScheduler scheduler, int subtask) {
        SchedulerTestingUtils.failExecution(scheduler, this.testVertexId, subtask);
        this.executor.triggerAll();
        Assertions.assertThat((Comparable)SchedulerTestingUtils.getExecutionState(scheduler, this.testVertexId, subtask)).isEqualTo((Object)ExecutionState.FAILED);
    }

    private void failAndRedeployTask(DefaultScheduler scheduler, int subtask) {
        this.failTask(scheduler, subtask);
        this.executor.triggerAll();
        this.executor.triggerScheduledTasks();
        this.executor.triggerAll();
        Assertions.assertThat((Comparable)SchedulerTestingUtils.getExecutionState(scheduler, this.testVertexId, subtask)).isEqualTo((Object)ExecutionState.DEPLOYING);
    }

    private void failAndRestartTask(DefaultScheduler scheduler, int subtask) {
        this.failAndRedeployTask(scheduler, subtask);
        SchedulerTestingUtils.setExecutionToState(ExecutionState.INITIALIZING, scheduler, this.testVertexId, subtask);
        SchedulerTestingUtils.setExecutionToState(ExecutionState.RUNNING, scheduler, this.testVertexId, subtask);
        Assertions.assertThat((Comparable)SchedulerTestingUtils.getExecutionState(scheduler, this.testVertexId, subtask)).isEqualTo((Object)ExecutionState.RUNNING);
    }

    private void failGlobalAndRestart(DefaultScheduler scheduler, Throwable reason) throws InterruptedException {
        scheduler.handleGlobalFailure(reason);
        SchedulerTestingUtils.setAllExecutionsToCancelled(scheduler);
        CheckpointCoordinator checkpointCoordinator = scheduler.getExecutionGraph().getCheckpointCoordinator();
        while (checkpointCoordinator != null && checkpointCoordinator.isTriggering()) {
            Thread.sleep(1L);
        }
        this.executor.triggerAll();
        this.executor.triggerScheduledTasks();
        this.executor.triggerAll();
        SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
        this.executor.triggerAll();
        Assertions.assertThat((Comparable)SchedulerTestingUtils.getExecutionState(scheduler, this.testVertexId, 0)).isEqualTo((Object)ExecutionState.RUNNING);
    }

    private void cancelTask(DefaultScheduler scheduler, int subtask) {
        SchedulerTestingUtils.canceledExecution(scheduler, this.testVertexId, subtask);
        this.executor.triggerAll();
        Assertions.assertThat((Comparable)SchedulerTestingUtils.getExecutionState(scheduler, this.testVertexId, subtask)).isEqualTo((Object)ExecutionState.CANCELED);
    }

    private CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler scheduler) throws Exception {
        CompletableFuture<CompletedCheckpoint> future = SchedulerTestingUtils.triggerCheckpoint(scheduler);
        TestingOperatorCoordinator coordinator = this.getCoordinator(scheduler);
        while (!coordinator.hasTriggeredCheckpoint() && !future.isDone()) {
            this.executor.triggerAll();
            Thread.sleep(1L);
        }
        return future;
    }

    private void waitForCompletionToPropagate(CompletableFuture<?> checkpointFuture) {
        while (!checkpointFuture.isDone()) {
            this.executor.triggerAll();
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException e) {
                throw new Error(e);
            }
        }
    }

    private void acknowledgeCurrentCheckpoint(DefaultScheduler scheduler) {
        this.executor.triggerAll();
        SchedulerTestingUtils.acknowledgeCurrentCheckpoint(scheduler);
        this.executor.triggerAll();
    }

    private long takeCompleteCheckpoint(DefaultScheduler scheduler, TestingOperatorCoordinator testingOperatorCoordinator, byte[] coordinatorState) throws Exception {
        CompletableFuture<CompletedCheckpoint> checkpointFuture = this.triggerCheckpoint(scheduler);
        testingOperatorCoordinator.getLastTriggeredCheckpoint().complete(coordinatorState);
        this.executor.triggerAll();
        AcknowledgeCheckpointEvent event = new AcknowledgeCheckpointEvent(testingOperatorCoordinator.getLastTriggeredCheckpointId());
        OperatorCoordinatorHolder holder = this.getCoordinatorHolder(scheduler);
        for (int i = 0; i < holder.currentParallelism(); ++i) {
            holder.handleEventFromOperator(i, 0, (OperatorEvent)event);
        }
        this.acknowledgeCurrentCheckpoint(scheduler);
        long checkpointId = checkpointFuture.get().getCheckpointID();
        while (!testingOperatorCoordinator.hasCompleteCheckpoint()) {
            this.executor.triggerAll();
            Thread.sleep(1L);
        }
        return checkpointId;
    }

    private void closeScheduler(DefaultScheduler scheduler) throws Exception {
        CompletableFuture closeFuture = scheduler.closeAsync();
        this.executor.triggerAll();
        closeFuture.get();
    }

    private static ExecutionJobVertex getJobVertex(DefaultScheduler scheduler, JobVertexID jobVertexId) {
        ExecutionVertexID id = new ExecutionVertexID(jobVertexId, 0);
        return scheduler.getExecutionVertex(id).getJobVertex();
    }

    private static OperatorState createOperatorState(OperatorID id, byte[] coordinatorState) {
        OperatorState state = new OperatorState(id, 10, 16384);
        state.setCoordinatorState(new ByteStreamStateHandle("name", coordinatorState));
        return state;
    }

    private static byte[] serializeAsCheckpointMetadata(OperatorID id, byte[] coordinatorState) throws IOException {
        OperatorState state = OperatorCoordinatorSchedulerTest.createOperatorState(id, coordinatorState);
        CheckpointMetadata metadata = new CheckpointMetadata(1337L, Collections.singletonList(state), Collections.emptyList());
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        Checkpoints.storeCheckpointMetadata((CheckpointMetadata)metadata, (OutputStream)out);
        return out.toByteArray();
    }

    private static byte[] getStateHandleContents(StreamStateHandle stateHandle) {
        if (stateHandle instanceof ByteStreamStateHandle) {
            return ((ByteStreamStateHandle)stateHandle).getData();
        }
        Assertions.fail((String)"other state handles not implemented");
        return null;
    }

    private static final class TaskExecutorOperatorEventGatewayAdapter
    extends SimpleAckingTaskManagerGateway {
        private final TaskExecutorOperatorEventGateway operatorGateway;

        private TaskExecutorOperatorEventGatewayAdapter(TaskExecutorOperatorEventGateway operatorGateway) {
            this.operatorGateway = operatorGateway;
        }

        @Override
        public CompletableFuture<Acknowledge> sendOperatorEventToTask(ExecutionAttemptID task, OperatorID operator, SerializedValue<OperatorEvent> evt) {
            return this.operatorGateway.sendOperatorEventToTask(task, operator, evt);
        }
    }

    private static class ModernStateBackend
    implements StateBackend {
        private ModernStateBackend() {
        }

        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
            throw new UnsupportedOperationException();
        }

        public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier, @Nonnull Collection<OperatorStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
            throw new UnsupportedOperationException();
        }
    }

    private static final class FailingTaskExecutorOperatorEventGateway
    implements TaskExecutorOperatorEventGateway {
        private FailingTaskExecutorOperatorEventGateway() {
        }

        public CompletableFuture<Acknowledge> sendOperatorEventToTask(ExecutionAttemptID task, OperatorID operator, SerializedValue<OperatorEvent> evt) {
            return FutureUtils.completedExceptionally((Throwable)new TestException());
        }
    }

    private static final class CoordinatorThatFailsCheckpointing
    extends TestingOperatorCoordinator {
        public CoordinatorThatFailsCheckpointing(OperatorCoordinator.Context context) {
            super(context);
        }

        @Override
        public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
            throw new Error(new TestException());
        }
    }

    private static final class CoordinatorThatFailsInStart
    extends TestingOperatorCoordinator {
        public CoordinatorThatFailsInStart(OperatorCoordinator.Context context) {
            super(context);
        }

        @Override
        public void start() throws Exception {
            throw new Exception("test failure");
        }
    }

    private static final class TestException
    extends Exception {
        private TestException() {
        }
    }

    private static final class TestOperatorEvent
    implements OperatorEvent {
        private TestOperatorEvent() {
        }
    }
}

