/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.minicluster;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.minicluster.SometimesExceptionSender;
import org.apache.flink.runtime.minicluster.SometimesInstantiationErrorSender;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testtasks.WaitingNoOpInvokable;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

class MiniClusterITCase {
    MiniClusterITCase() {
    }

    @Test
    void runJobWithSingleRpcService() throws Exception {
        int numOfTMs = 3;
        int slotsPerTM = 7;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(3).setNumSlotsPerTaskManager(7).setRpcServiceSharing(RpcServiceSharing.SHARED).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            miniCluster.executeJobBlocking(MiniClusterITCase.getSimpleJob(21));
        }
    }

    @Test
    void runJobWithMultipleRpcServices() throws Exception {
        int numOfTMs = 3;
        int slotsPerTM = 7;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(3).setNumSlotsPerTaskManager(7).setRpcServiceSharing(RpcServiceSharing.DEDICATED).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            miniCluster.executeJobBlocking(MiniClusterITCase.getSimpleJob(21));
        }
    }

    @Test
    void testHandlingNotEnoughSlotsThroughTimeout() throws Exception {
        Configuration config = new Configuration();
        Duration slotRequestTimeout = Duration.ofMillis(100L);
        config.set(JobManagerOptions.SLOT_REQUEST_TIMEOUT, (Object)slotRequestTimeout.toMillis());
        config.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)slotRequestTimeout);
        config.set(ResourceManagerOptions.REQUIREMENTS_CHECK_DELAY, (Object)Duration.ofNanos(Long.MAX_VALUE));
        MiniClusterITCase.tryRunningJobWithoutEnoughSlots(config);
    }

    @Test
    @Tag(value="org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler")
    void testHandlingNotEnoughSlotsThroughEarlyAbortRequest() throws Exception {
        Configuration config = new Configuration();
        Duration slotRequestTimeout = Duration.ofNanos(Long.MAX_VALUE);
        config.set(JobManagerOptions.SLOT_REQUEST_TIMEOUT, (Object)slotRequestTimeout.toMillis());
        config.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)slotRequestTimeout);
        config.set(ResourceManagerOptions.REQUIREMENTS_CHECK_DELAY, (Object)Duration.ofMillis(20L));
        config.set(ResourceManagerOptions.STANDALONE_CLUSTER_STARTUP_PERIOD_TIME, (Object)1L);
        MiniClusterITCase.tryRunningJobWithoutEnoughSlots(config);
    }

    private static void tryRunningJobWithoutEnoughSlots(Configuration configuration) throws Exception {
        JobVertex vertex1 = new JobVertex("Test Vertex1");
        vertex1.setParallelism(1);
        vertex1.setMaxParallelism(1);
        vertex1.setInvokableClass(BlockingNoOpInvokable.class);
        JobVertex vertex2 = new JobVertex("Test Vertex2");
        vertex2.setParallelism(1);
        vertex2.setMaxParallelism(1);
        vertex2.setInvokableClass(BlockingNoOpInvokable.class);
        vertex2.connectNewDataSetAsInput(vertex1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex1, vertex2);
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(1).setConfiguration(configuration).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            ((ListAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> miniCluster.executeJobBlocking(jobGraph)).isInstanceOf(JobExecutionException.class)).hasMessageContaining("Job execution failed").extracting(Throwable::getCause).extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)).anySatisfy(cause -> {
                AbstractThrowableAssert cfr_ignored_0 = (AbstractThrowableAssert)Assertions.assertThat((Throwable)cause).isInstanceOf(NoResourceAvailableException.class);
            });
        }
    }

    @Test
    void testForwardJob() throws Exception {
        int parallelism = 31;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(62).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender.setParallelism(31);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            receiver.setParallelism(31);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);
            miniCluster.executeJobBlocking(jobGraph);
        }
    }

    @Test
    void testBipartiteJob() throws Exception {
        int parallelism = 31;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(62).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender.setParallelism(31);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(Tasks.AgnosticReceiver.class);
            receiver.setParallelism(31);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);
            miniCluster.executeJobBlocking(jobGraph);
        }
    }

    @Test
    void testTwoInputJobFailingEdgeMismatch() throws Exception {
        boolean parallelism = true;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(6).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender1 = new JobVertex("Sender1");
            sender1.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender1.setParallelism(1);
            JobVertex sender2 = new JobVertex("Sender2");
            sender2.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender2.setParallelism(2);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(Tasks.AgnosticTertiaryReceiver.class);
            receiver.setParallelism(3);
            receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender1, receiver, sender2);
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> miniCluster.executeJobBlocking(jobGraph)).isInstanceOf(JobExecutionException.class)).hasRootCauseInstanceOf(ArrayIndexOutOfBoundsException.class).rootCause().hasMessageContaining("2");
        }
    }

    @Test
    void testTwoInputJob() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(66).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender1 = new JobVertex("Sender1");
            sender1.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender1.setParallelism(11);
            JobVertex sender2 = new JobVertex("Sender2");
            sender2.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender2.setParallelism(22);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(Tasks.AgnosticBinaryReceiver.class);
            receiver.setParallelism(33);
            receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender1, receiver, sender2);
            miniCluster.executeJobBlocking(jobGraph);
        }
    }

    @Test
    void testSchedulingAllAtOnce() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(11).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender.setParallelism(11);
            JobVertex forwarder = new JobVertex("Forwarder");
            forwarder.setInvokableClass(Tasks.Forwarder.class);
            forwarder.setParallelism(11);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(Tasks.AgnosticReceiver.class);
            receiver.setParallelism(11);
            SlotSharingGroup sharingGroup = new SlotSharingGroup();
            sender.setSlotSharingGroup(sharingGroup);
            forwarder.setSlotSharingGroup(sharingGroup);
            receiver.setSlotSharingGroup(sharingGroup);
            forwarder.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            receiver.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, forwarder, receiver);
            miniCluster.executeJobBlocking(jobGraph);
        }
    }

    @Test
    void testJobWithAFailingSenderVertex() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(22).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(Tasks.ExceptionSender.class);
            sender.setParallelism(11);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            receiver.setParallelism(11);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> miniCluster.executeJobBlocking(jobGraph)).isInstanceOf(JobExecutionException.class)).hasRootCauseInstanceOf(Exception.class).rootCause().hasMessageContaining("Test exception");
        }
    }

    @Test
    void testJobWithAnOccasionallyFailingSenderVertex() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(11).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            SlotSharingGroup group = new SlotSharingGroup();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(SometimesExceptionSender.class);
            sender.setParallelism(11);
            sender.setSlotSharingGroup(group);
            SometimesExceptionSender.configFailingSenders(11);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            receiver.setParallelism(11);
            receiver.setSlotSharingGroup(group);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> miniCluster.executeJobBlocking(jobGraph)).isInstanceOf(JobExecutionException.class)).hasRootCauseInstanceOf(Exception.class).rootCause().hasMessageContaining("Test exception");
        }
    }

    @Test
    void testJobWithAFailingReceiverVertex() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(22).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender.setParallelism(11);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(Tasks.ExceptionReceiver.class);
            receiver.setParallelism(11);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> miniCluster.executeJobBlocking(jobGraph)).isInstanceOf(JobExecutionException.class)).hasRootCauseInstanceOf(Exception.class).rootCause().hasMessageContaining("Test exception");
        }
    }

    @Test
    void testJobWithAllVerticesFailingDuringInstantiation() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(22).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(Tasks.InstantiationErrorSender.class);
            sender.setParallelism(11);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            receiver.setParallelism(11);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> miniCluster.executeJobBlocking(jobGraph)).isInstanceOf(JobExecutionException.class)).hasRootCauseInstanceOf(Exception.class).rootCause().hasMessageContaining("Test exception in constructor");
        }
    }

    @Test
    void testJobWithSomeVerticesFailingDuringInstantiation() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(11).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            SlotSharingGroup group = new SlotSharingGroup();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(SometimesInstantiationErrorSender.class);
            sender.setParallelism(11);
            sender.setSlotSharingGroup(group);
            SometimesInstantiationErrorSender.configFailingSenders(11);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            receiver.setParallelism(11);
            receiver.setSlotSharingGroup(group);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> miniCluster.executeJobBlocking(jobGraph)).isInstanceOf(JobExecutionException.class)).hasCauseInstanceOf(Exception.class).rootCause().hasMessageContaining("Test exception in constructor");
        }
    }

    @Test
    void testCallFinalizeOnMasterBeforeJobCompletes() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(22).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex source = new JobVertex("Source");
            source.setInvokableClass(WaitingNoOpInvokable.class);
            source.setParallelism(11);
            WaitOnFinalizeJobVertex.resetFinalizedOnMaster();
            WaitOnFinalizeJobVertex sink = new WaitOnFinalizeJobVertex("Sink", 20L);
            sink.setInvokableClass(NoOpInvokable.class);
            sink.setParallelism(11);
            sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(source, sink);
            CompletableFuture submissionFuture = miniCluster.submitJob(jobGraph);
            CompletionStage jobResultFuture = submissionFuture.thenCompose(ignored -> miniCluster.requestJobResult(jobGraph.getJobID()));
            ((JobResult)((CompletableFuture)jobResultFuture).get()).toJobExecutionResult(this.getClass().getClassLoader());
            Assertions.assertThat((AtomicBoolean)WaitOnFinalizeJobVertex.finalizedOnMaster).isTrue();
        }
    }

    @Test
    void testOutOfMemoryErrorMessageEnrichmentInJobVertexFinalization() throws Exception {
        boolean parallelism = true;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(1).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            OutOfMemoryInFinalizationJobVertex failingJobVertex = new OutOfMemoryInFinalizationJobVertex();
            failingJobVertex.setInvokableClass(NoOpInvokable.class);
            failingJobVertex.setParallelism(1);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(failingJobVertex);
            CompletableFuture submissionFuture = miniCluster.submitJob(jobGraph);
            CompletionStage jobResultFuture = submissionFuture.thenCompose(ignored -> miniCluster.requestJobResult(jobGraph.getJobID()));
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.lambda$testOutOfMemoryErrorMessageEnrichmentInJobVertexFinalization$10((CompletableFuture)jobResultFuture)).isInstanceOf(JobExecutionException.class)).hasRootCauseInstanceOf(OutOfMemoryError.class).rootCause().hasMessageContaining("Java heap space. A heap space-related out-of-memory error has occurred.");
        }
    }

    @Test
    void testOutOfMemoryErrorMessageEnrichmentInJobVertexInitialization() throws Exception {
        boolean parallelism = true;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(1).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            OutOfMemoryInInitializationVertex failingJobVertex = new OutOfMemoryInInitializationVertex();
            failingJobVertex.setInvokableClass(NoOpInvokable.class);
            failingJobVertex.setParallelism(1);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(failingJobVertex);
            CompletableFuture submissionFuture = miniCluster.submitJob(jobGraph);
            CompletionStage jobResultFuture = submissionFuture.thenCompose(ignored -> miniCluster.requestJobResult(jobGraph.getJobID()));
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.lambda$testOutOfMemoryErrorMessageEnrichmentInJobVertexInitialization$12((CompletableFuture)jobResultFuture)).isInstanceOf(JobExecutionException.class)).hasRootCauseInstanceOf(OutOfMemoryError.class).rootCause().hasMessageContaining("Java heap space");
        }
    }

    private static JobGraph getSimpleJob(int parallelism) throws IOException {
        JobVertex task = new JobVertex("Test task");
        task.setParallelism(parallelism);
        task.setMaxParallelism(parallelism);
        task.setInvokableClass(NoOpInvokable.class);
        JobGraph jg = JobGraphTestUtils.streamingJobGraph(task);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)1000L));
        jg.setExecutionConfig(executionConfig);
        return jg;
    }

    private /* synthetic */ void lambda$testOutOfMemoryErrorMessageEnrichmentInJobVertexInitialization$12(CompletableFuture jobResultFuture) throws Throwable {
        ((JobResult)jobResultFuture.get()).toJobExecutionResult(this.getClass().getClassLoader());
    }

    private /* synthetic */ void lambda$testOutOfMemoryErrorMessageEnrichmentInJobVertexFinalization$10(CompletableFuture jobResultFuture) throws Throwable {
        ((JobResult)jobResultFuture.get()).toJobExecutionResult(this.getClass().getClassLoader());
    }

    private static class OutOfMemoryInInitializationVertex
    extends JobVertex {
        OutOfMemoryInInitializationVertex() {
            super("FailingInInitialization");
        }

        public void initializeOnMaster(JobVertex.InitializeOnMasterContext context) {
            throw new OutOfMemoryError("Java heap space");
        }
    }

    private static class OutOfMemoryInFinalizationJobVertex
    extends JobVertex {
        private OutOfMemoryInFinalizationJobVertex() {
            super("FailingInFinalization");
        }

        public void finalizeOnMaster(JobVertex.FinalizeOnMasterContext context) {
            throw new OutOfMemoryError("Java heap space");
        }
    }

    private static class WaitOnFinalizeJobVertex
    extends JobVertex {
        private static final long serialVersionUID = -1179547322468530299L;
        private static final AtomicBoolean finalizedOnMaster = new AtomicBoolean(false);
        private final long waitingTime;

        WaitOnFinalizeJobVertex(String name, long waitingTime) {
            super(name);
            this.waitingTime = waitingTime;
        }

        public void finalizeOnMaster(JobVertex.FinalizeOnMasterContext context) throws Exception {
            Thread.sleep(this.waitingTime);
            finalizedOnMaster.set(true);
        }

        static void resetFinalizedOnMaster() {
            finalizedOnMaster.set(false);
        }
    }
}

