/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.NoOpTaskExecutorBlobService;
import org.apache.flink.runtime.blob.TaskExecutorBlobService;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorBuilder;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatServicesImpl;
import org.apache.flink.runtime.heartbeat.RecordingHeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl;
import org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationRejection;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
import org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService;
import org.apache.flink.runtime.taskexecutor.FileType;
import org.apache.flink.runtime.taskexecutor.JobLeaderService;
import org.apache.flink.runtime.taskexecutor.KvStateService;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.StartStopNotifyingLeaderRetrievalService;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorHeartbeatPayload;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesBuilder;
import org.apache.flink.runtime.taskexecutor.TaskSubmissionTestEnvironment;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutor;
import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException;
import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.ThreadSafeTaskSlotTable;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.shaded.curator5.com.google.common.collect.Iterators;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.flink.testutils.TestFileUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Reference;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TimeUtils;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.TriConsumer;
import org.apache.flink.util.function.TriConsumerWithException;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;

public class TaskExecutorTest
extends TestLogger {
    public static final HeartbeatServices HEARTBEAT_SERVICES = new HeartbeatServicesImpl(1000L, 1000L);
    private static final TaskExecutorResourceSpec TM_RESOURCE_SPEC = new TaskExecutorResourceSpec(new CPUResource(1.0), MemorySize.parse((String)"1m"), MemorySize.parse((String)"2m"), MemorySize.parse((String)"3m"), MemorySize.parse((String)"4m"), Collections.emptyList());
    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    @Rule
    public final TemporaryFolder tmp = new TemporaryFolder();
    @Rule
    public final TestName testName = new TestName();
    private static final Time timeout = Time.milliseconds((long)10000L);
    private static final HeartbeatServices failedRpcEnabledHeartbeatServices = new HeartbeatServicesImpl(1L, 10000000L, 1);
    private TestingRpcService rpc;
    private Configuration configuration;
    private UnresolvedTaskManagerLocation unresolvedTaskManagerLocation;
    private JobID jobId;
    private JobID jobId2;
    private TestingFatalErrorHandler testingFatalErrorHandler;
    private TestingHighAvailabilityServices haServices;
    private SettableLeaderRetrievalService resourceManagerLeaderRetriever;
    private SettableLeaderRetrievalService jobManagerLeaderRetriever;
    private SettableLeaderRetrievalService jobManagerLeaderRetriever2;
    private NettyShuffleEnvironment nettyShuffleEnvironment;

    @Before
    public void setup() throws IOException {
        this.rpc = new TestingRpcService();
        this.configuration = new Configuration();
        TaskExecutorResourceUtils.adjustForLocalExecution((Configuration)this.configuration);
        this.unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        this.jobId = new JobID();
        this.jobId2 = new JobID();
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.haServices = new TestingHighAvailabilityServices();
        this.resourceManagerLeaderRetriever = new SettableLeaderRetrievalService();
        this.jobManagerLeaderRetriever = new SettableLeaderRetrievalService();
        this.jobManagerLeaderRetriever2 = new SettableLeaderRetrievalService();
        this.haServices.setResourceManagerLeaderRetriever(this.resourceManagerLeaderRetriever);
        this.haServices.setJobMasterLeaderRetriever(this.jobId, this.jobManagerLeaderRetriever);
        this.haServices.setJobMasterLeaderRetriever(this.jobId2, this.jobManagerLeaderRetriever2);
        this.nettyShuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
    }

    @After
    public void teardown() throws Exception {
        if (this.rpc != null) {
            RpcUtils.terminateRpcService((RpcService[])new RpcService[]{this.rpc});
            this.rpc = null;
        }
        if (this.nettyShuffleEnvironment != null) {
            this.nettyShuffleEnvironment.close();
        }
        this.testingFatalErrorHandler.rethrowError();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testShouldShutDownTaskManagerServicesInPostStop() throws Exception {
        TaskSlotTableImpl taskSlotTable = TaskSlotUtils.createTaskSlotTable(1, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        DefaultJobLeaderService jobLeaderService = new DefaultJobLeaderService(this.unresolvedTaskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
        IOManagerAsync ioManager = new IOManagerAsync(this.tmp.newFolder().getAbsolutePath());
        TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(false, Reference.borrowed((Object)ioManager.getSpillingDirectories()), Executors.directExecutor());
        this.nettyShuffleEnvironment.start();
        KvStateService kvStateService = new KvStateService(new KvStateRegistry(), null, null);
        kvStateService.start();
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setUnresolvedTaskManagerLocation(this.unresolvedTaskManagerLocation).setIoManager((IOManager)ioManager).setShuffleEnvironment((ShuffleEnvironment<?, ?>)this.nettyShuffleEnvironment).setKvStateService(kvStateService).setTaskSlotTable((TaskSlotTable<Task>)taskSlotTable).setJobLeaderService((JobLeaderService)jobLeaderService).setTaskStateManager(localStateStoresManager).build();
        TaskExecutor taskManager = this.createTaskExecutor(taskManagerServices);
        try {
            taskManager.start();
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskManager});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskManager});
        Assert.assertThat((Object)taskSlotTable.isClosed(), (Matcher)Matchers.is((Object)true));
        Assert.assertThat((Object)this.nettyShuffleEnvironment.isClosed(), (Matcher)Matchers.is((Object)true));
        Assert.assertThat((Object)kvStateService.isShutdown(), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void testHeartbeatTimeoutWithJobManager() throws Exception {
        TestingHeartbeatServices heartbeatServices = new TestingHeartbeatServices();
        ResourceID jmResourceId = ResourceID.generate();
        this.runJobManagerHeartbeatTest(jmResourceId, heartbeatServices, ignored -> {}, (TriConsumer<ResourceID, TaskExecutorGateway, AllocationID>)((TriConsumer)(taskExecutorResourceId, ignoredTaskExecutorGateway, ignoredAllocationId) -> heartbeatServices.triggerHeartbeatTimeout((ResourceID)taskExecutorResourceId, jmResourceId)));
    }

    @Test
    public void testJobManagerBecomesUnreachableTriggersDisconnect() throws Exception {
        ResourceID jmResourceId = ResourceID.generate();
        this.runJobManagerHeartbeatTest(jmResourceId, failedRpcEnabledHeartbeatServices, jobMasterGatewayBuilder -> jobMasterGatewayBuilder.setTaskManagerHeartbeatFunction((resourceID, taskExecutorToJobManagerHeartbeatPayload) -> FutureUtils.completedExceptionally((Throwable)new RecipientUnreachableException("sender", "recipient", "job manager is unreachable."))), (TriConsumer<ResourceID, TaskExecutorGateway, AllocationID>)((TriConsumer)(ignoredTaskExecutorResourceId, taskExecutorGateway, allocationId) -> taskExecutorGateway.heartbeatFromJobManager(jmResourceId, new AllocatedSlotReport(this.jobId, Collections.singleton(new AllocatedSlotInfo(0, allocationId))))));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runJobManagerHeartbeatTest(ResourceID jmResourceId, HeartbeatServices heartbeatServices, Consumer<TestingJobMasterGatewayBuilder> jobMasterGatewayBuilderConsumer, TriConsumer<ResourceID, TaskExecutorGateway, AllocationID> heartbeatAction) throws IOException, InterruptedException, ExecutionException, TimeoutException {
        DefaultJobLeaderService jobLeaderService = new DefaultJobLeaderService(this.unresolvedTaskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
        String jobMasterAddress = "jm";
        UUID jmLeaderId = UUID.randomUUID();
        CountDownLatch registrationAttempts = new CountDownLatch(2);
        OneShotLatch slotOfferedLatch = new OneShotLatch();
        CompletableFuture disconnectTaskManagerFuture = new CompletableFuture();
        TestingJobMasterGatewayBuilder testingJobMasterGatewayBuilder = new TestingJobMasterGatewayBuilder().setRegisterTaskManagerFunction((ignoredJobId, ignoredTaskManagerRegistrationInformation) -> {
            registrationAttempts.countDown();
            return CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId));
        }).setDisconnectTaskManagerFunction(resourceID -> {
            disconnectTaskManagerFuture.complete(resourceID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setOfferSlotsFunction((resourceID, slotOffers) -> {
            slotOfferedLatch.trigger();
            return CompletableFuture.completedFuture(slotOffers);
        });
        jobMasterGatewayBuilderConsumer.accept(testingJobMasterGatewayBuilder);
        TestingJobMasterGateway jobMasterGateway = testingJobMasterGatewayBuilder.build();
        TaskExecutorLocalStateStoresManager localStateStoresManager = this.createTaskExecutorLocalStateStoresManager();
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setUnresolvedTaskManagerLocation(this.unresolvedTaskManagerLocation).setTaskSlotTable((TaskSlotTable<Task>)TaskSlotUtils.createTaskSlotTable(1, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor())).setJobLeaderService((JobLeaderService)jobLeaderService).setTaskStateManager(localStateStoresManager).build();
        TestingTaskExecutor taskManager = this.createTestingTaskExecutor(taskManagerServices, heartbeatServices);
        OneShotLatch slotReportReceived = new OneShotLatch();
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setSendSlotReportFunction(ignored -> {
            slotReportReceived.trigger();
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        ArrayDeque registrationResponses = new ArrayDeque();
        registrationResponses.add(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), testingResourceManagerGateway.getOwnResourceId(), new ClusterInformation("foobar", 1234), null)));
        registrationResponses.add(new CompletableFuture());
        testingResourceManagerGateway.setRegisterTaskExecutorFunction(taskExecutorRegistration -> (CompletableFuture)registrationResponses.poll());
        this.rpc.registerGateway("jm", (RpcGateway)jobMasterGateway);
        this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
        try {
            taskManager.start();
            taskManager.waitUntilStarted();
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskManager.getSelfGateway(TaskExecutorGateway.class);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
            slotReportReceived.await();
            AllocationID allocationId = new AllocationID();
            this.requestSlot(taskExecutorGateway, this.jobId, allocationId, this.buildSlotID(0), ResourceProfile.UNKNOWN, "jm", testingResourceManagerGateway.getFencingToken());
            this.jobManagerLeaderRetriever.notifyListener("jm", jmLeaderId);
            slotOfferedLatch.await();
            heartbeatAction.accept((Object)this.unresolvedTaskManagerLocation.getResourceID(), (Object)taskExecutorGateway, (Object)allocationId);
            ResourceID resourceID2 = (ResourceID)disconnectTaskManagerFuture.get();
            Assert.assertThat((Object)resourceID2, (Matcher)Matchers.equalTo((Object)this.unresolvedTaskManagerLocation.getResourceID()));
            Assert.assertTrue((String)"The TaskExecutor should try to reconnect to the JM", (boolean)registrationAttempts.await(timeout.toMilliseconds(), TimeUnit.SECONDS));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskManager});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskManager});
    }

    @Test
    public void testHeartbeatTimeoutWithResourceManager() throws Exception {
        this.runResourceManagerHeartbeatTest((HeartbeatServices)new HeartbeatServicesImpl(1L, 3L), ignoredResourceManagerGateway -> {}, (TriConsumerWithException<TaskExecutorGateway, ResourceID, CompletableFuture<ResourceID>, Exception>)((TriConsumerWithException)(ignoredA, ignoredB, ignoredC) -> {}));
    }

    @Test
    public void testResourceManagerBecomesUnreachableTriggersDisconnect() throws Exception {
        this.runResourceManagerHeartbeatTest(failedRpcEnabledHeartbeatServices, rmGateway -> rmGateway.setTaskExecutorHeartbeatFunction((resourceID, taskExecutorHeartbeatPayload) -> FutureUtils.completedExceptionally((Throwable)new RecipientUnreachableException("sender", "recipient", "resource manager is unreachable."))), (TriConsumerWithException<TaskExecutorGateway, ResourceID, CompletableFuture<ResourceID>, Exception>)((TriConsumerWithException)(taskExecutorGateway, rmResourceId, taskExecutorDisconnectFuture) -> CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> {
            taskExecutorGateway.heartbeatFromResourceManager(rmResourceId);
            return taskExecutorDisconnectFuture.isDone();
        }), 50L)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runResourceManagerHeartbeatTest(HeartbeatServices heartbeatServices, Consumer<TestingResourceManagerGateway> setupResourceManagerGateway, TriConsumerWithException<TaskExecutorGateway, ResourceID, CompletableFuture<ResourceID>, Exception> heartbeatAction) throws Exception {
        String rmAddress = "rm";
        ResourceID rmResourceId = new ResourceID("rm");
        ResourceManagerId rmLeaderId = ResourceManagerId.generate();
        TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway(rmLeaderId, rmResourceId, "rm", "rm");
        TaskExecutorRegistrationSuccess registrationResponse = new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId, new ClusterInformation("localhost", 1234), null);
        ArrayDeque registrationResponses = new ArrayDeque(2);
        registrationResponses.add(CompletableFuture.completedFuture(registrationResponse));
        registrationResponses.add(new CompletableFuture());
        CompletableFuture taskExecutorRegistrationFuture = new CompletableFuture();
        CountDownLatch registrationAttempts = new CountDownLatch(2);
        rmGateway.setRegisterTaskExecutorFunction(registration -> {
            taskExecutorRegistrationFuture.complete(registration.getResourceId());
            registrationAttempts.countDown();
            return (CompletableFuture)registrationResponses.poll();
        });
        setupResourceManagerGateway.accept(rmGateway);
        CompletableFuture taskExecutorDisconnectFuture = new CompletableFuture();
        rmGateway.setDisconnectTaskExecutorConsumer(disconnectInfo -> taskExecutorDisconnectFuture.complete(disconnectInfo.f0));
        this.rpc.registerGateway("rm", (RpcGateway)rmGateway);
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setUnresolvedTaskManagerLocation(this.unresolvedTaskManagerLocation).build();
        TaskExecutor taskManager = this.createTaskExecutor(taskManagerServices, heartbeatServices);
        try {
            taskManager.start();
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskManager.getSelfGateway(TaskExecutorGateway.class);
            this.resourceManagerLeaderRetriever.notifyListener("rm", rmLeaderId.toUUID());
            Assert.assertThat(taskExecutorRegistrationFuture.get(), (Matcher)Matchers.equalTo((Object)this.unresolvedTaskManagerLocation.getResourceID()));
            heartbeatAction.accept((Object)taskExecutorGateway, (Object)rmGateway.getOwnResourceId(), taskExecutorDisconnectFuture);
            Assert.assertThat(taskExecutorDisconnectFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS), (Matcher)Matchers.equalTo((Object)this.unresolvedTaskManagerLocation.getResourceID()));
            Assert.assertTrue((String)"The TaskExecutor should try to reconnect to the RM", (boolean)registrationAttempts.await(timeout.toMilliseconds(), TimeUnit.SECONDS));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskManager});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskManager});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeatReporting() throws Exception {
        String rmAddress = "rm";
        UUID rmLeaderId = UUID.randomUUID();
        TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway();
        CompletableFuture taskExecutorRegistrationFuture = new CompletableFuture();
        ResourceID rmResourceId = rmGateway.getOwnResourceId();
        CompletableFuture<TaskExecutorRegistrationSuccess> registrationResponse = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId, new ClusterInformation("localhost", 1234), null));
        rmGateway.setRegisterTaskExecutorFunction(taskExecutorRegistration -> {
            taskExecutorRegistrationFuture.complete(taskExecutorRegistration.getResourceId());
            return registrationResponse;
        });
        CompletableFuture initialSlotReportFuture = new CompletableFuture();
        rmGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
            initialSlotReportFuture.complete(resourceIDInstanceIDSlotReportTuple3.f2);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        CompletableFuture heartbeatPayloadCompletableFuture = new CompletableFuture();
        rmGateway.setTaskExecutorHeartbeatFunction((resourceID, heartbeatPayload) -> {
            heartbeatPayloadCompletableFuture.complete(heartbeatPayload);
            return FutureUtils.completedVoidFuture();
        });
        this.rpc.registerGateway("rm", (RpcGateway)rmGateway);
        SlotID slotId = this.buildSlotID(0);
        ResourceProfile resourceProfile = ResourceProfile.fromResources((double)1.0, (int)1);
        SlotReport slotReport1 = new SlotReport(new SlotStatus(slotId, resourceProfile));
        SlotReport slotReport2 = new SlotReport(new SlotStatus(slotId, resourceProfile, new JobID(), new AllocationID()));
        ArrayDeque<SlotReport> reports = new ArrayDeque<SlotReport>(Arrays.asList(slotReport1, slotReport2));
        TaskSlotTable taskSlotTable = TestingTaskSlotTable.newBuilder().createSlotReportSupplier(reports::poll).closeAsyncReturns(CompletableFuture.completedFuture(null)).build();
        TaskExecutorLocalStateStoresManager localStateStoresManager = this.createTaskExecutorLocalStateStoresManager();
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setUnresolvedTaskManagerLocation(this.unresolvedTaskManagerLocation).setTaskSlotTable(taskSlotTable).setTaskStateManager(localStateStoresManager).build();
        TaskExecutorPartitionTracker partitionTracker = TaskExecutorTest.createPartitionTrackerWithFixedPartitionReport(taskManagerServices.getShuffleEnvironment());
        TaskExecutor taskManager = this.createTaskExecutor(taskManagerServices, HEARTBEAT_SERVICES, partitionTracker);
        try {
            taskManager.start();
            this.resourceManagerLeaderRetriever.notifyListener("rm", rmLeaderId);
            Assert.assertThat(taskExecutorRegistrationFuture.get(), (Matcher)Matchers.equalTo((Object)this.unresolvedTaskManagerLocation.getResourceID()));
            Assert.assertThat(initialSlotReportFuture.get(), (Matcher)Matchers.equalTo((Object)slotReport1));
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskManager.getSelfGateway(TaskExecutorGateway.class);
            taskExecutorGateway.heartbeatFromResourceManager(rmResourceId);
            SlotReport actualSlotReport = ((TaskExecutorHeartbeatPayload)heartbeatPayloadCompletableFuture.get()).getSlotReport();
            Assert.assertEquals((Object)slotReport2, (Object)actualSlotReport);
            ClusterPartitionReport actualClusterPartitionReport = ((TaskExecutorHeartbeatPayload)heartbeatPayloadCompletableFuture.get()).getClusterPartitionReport();
            Assert.assertEquals((Object)partitionTracker.createClusterPartitionReport(), (Object)actualClusterPartitionReport);
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskManager});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskManager});
    }

    private static TaskExecutorPartitionTracker createPartitionTrackerWithFixedPartitionReport(ShuffleEnvironment<?, ?> shuffleEnvironment) {
        final ResultPartitionID resultPartitionID = new ResultPartitionID();
        ClusterPartitionReport.ClusterPartitionReportEntry clusterPartitionReportEntry = new ClusterPartitionReport.ClusterPartitionReportEntry(new IntermediateDataSetID(), 4, Collections.singletonMap(resultPartitionID, new ShuffleDescriptor(){

            public ResultPartitionID getResultPartitionID() {
                return resultPartitionID;
            }

            public Optional<ResourceID> storesLocalResourcesOn() {
                return Optional.empty();
            }
        }));
        final ClusterPartitionReport clusterPartitionReport = new ClusterPartitionReport(Collections.singletonList(clusterPartitionReportEntry));
        return new TaskExecutorPartitionTrackerImpl(shuffleEnvironment){

            public ClusterPartitionReport createClusterPartitionReport() {
                return clusterPartitionReport;
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
        String resourceManagerAddress = "/resource/manager/address/one";
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        CountDownLatch taskManagerRegisteredLatch = new CountDownLatch(1);
        testingResourceManagerGateway.setRegisterTaskExecutorFunction(FunctionUtils.uncheckedFunction(ignored -> {
            taskManagerRegisteredLatch.countDown();
            return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), new ResourceID("/resource/manager/address/one"), new ClusterInformation("localhost", 1234), null));
        }));
        this.rpc.registerGateway("/resource/manager/address/one", (RpcGateway)testingResourceManagerGateway);
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setUnresolvedTaskManagerLocation(this.unresolvedTaskManagerLocation).setTaskSlotTable((TaskSlotTable<Task>)TaskSlotUtils.createTaskSlotTable(1, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor())).setTaskStateManager(this.createTaskExecutorLocalStateStoresManager()).build();
        TaskExecutor taskManager = this.createTaskExecutor(taskManagerServices);
        try {
            taskManager.start();
            this.resourceManagerLeaderRetriever.notifyListener("/resource/manager/address/one", UUID.randomUUID());
            Assert.assertTrue((boolean)taskManagerRegisteredLatch.await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskManager});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskManager});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerRegistrationOnLeaderChange() throws Exception {
        UUID leaderId1 = UUID.randomUUID();
        UUID leaderId2 = UUID.randomUUID();
        CompletableFuture rmGateway1TaskExecutorRegistration = new CompletableFuture();
        TestingResourceManagerGateway rmGateway1 = new TestingResourceManagerGateway();
        rmGateway1.setRegisterTaskExecutorFunction(taskExecutorRegistration -> {
            rmGateway1TaskExecutorRegistration.complete(taskExecutorRegistration);
            return this.createRegistrationResponse(rmGateway1);
        });
        CompletableFuture rmGateway2TaskExecutorRegistration = new CompletableFuture();
        TestingResourceManagerGateway rmGateway2 = new TestingResourceManagerGateway();
        rmGateway2.setRegisterTaskExecutorFunction(taskExecutorRegistration -> {
            rmGateway2TaskExecutorRegistration.complete(taskExecutorRegistration);
            return this.createRegistrationResponse(rmGateway2);
        });
        this.rpc.registerGateway(rmGateway1.getAddress(), (RpcGateway)rmGateway1);
        this.rpc.registerGateway(rmGateway2.getAddress(), (RpcGateway)rmGateway2);
        TaskSlotTable taskSlotTable = TestingTaskSlotTable.newBuilder().createSlotReportSupplier(SlotReport::new).closeAsyncReturns(CompletableFuture.completedFuture(null)).build();
        TaskExecutorLocalStateStoresManager localStateStoresManager = this.createTaskExecutorLocalStateStoresManager();
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setUnresolvedTaskManagerLocation(this.unresolvedTaskManagerLocation).setTaskSlotTable(taskSlotTable).setTaskStateManager(localStateStoresManager).build();
        TaskExecutor taskManager = this.createTaskExecutor(taskManagerServices);
        try {
            taskManager.start();
            String taskManagerAddress = taskManager.getAddress();
            Assert.assertNull((Object)taskManager.getResourceManagerConnection());
            this.resourceManagerLeaderRetriever.notifyListener(rmGateway1.getAddress(), leaderId1);
            TaskExecutorRegistration taskExecutorRegistration1 = (TaskExecutorRegistration)rmGateway1TaskExecutorRegistration.join();
            Assert.assertThat((Object)taskExecutorRegistration1.getTaskExecutorAddress(), (Matcher)Matchers.is((Object)taskManagerAddress));
            Assert.assertThat((Object)taskExecutorRegistration1.getResourceId(), (Matcher)Matchers.is((Object)this.unresolvedTaskManagerLocation.getResourceID()));
            Assert.assertNotNull((Object)taskManager.getResourceManagerConnection());
            this.resourceManagerLeaderRetriever.notifyListener(null, null);
            this.resourceManagerLeaderRetriever.notifyListener(rmGateway2.getAddress(), leaderId2);
            TaskExecutorRegistration taskExecutorRegistration2 = (TaskExecutorRegistration)rmGateway2TaskExecutorRegistration.join();
            Assert.assertThat((Object)taskExecutorRegistration2.getTaskExecutorAddress(), (Matcher)Matchers.is((Object)taskManagerAddress));
            Assert.assertThat((Object)taskExecutorRegistration2.getResourceId(), (Matcher)Matchers.is((Object)this.unresolvedTaskManagerLocation.getResourceID()));
            Assert.assertNotNull((Object)taskManager.getResourceManagerConnection());
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskManager});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskManager});
    }

    private CompletableFuture<RegistrationResponse> createRegistrationResponse(TestingResourceManagerGateway rmGateway1) {
        return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), rmGateway1.getOwnResourceId(), new ClusterInformation("localhost", 1234), null));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTaskSlotTableTerminationOnShutdown() throws Exception {
        CompletableFuture taskExecutorTerminationFuture;
        CompletableFuture<Void> taskSlotTableClosingFuture = new CompletableFuture<Void>();
        TaskExecutorTestingContext submissionContext = this.createTaskExecutorTestingContext(TestingTaskSlotTable.newBuilder().closeAsyncReturns(taskSlotTableClosingFuture).build(), HEARTBEAT_SERVICES);
        try {
            submissionContext.start();
        }
        finally {
            taskExecutorTerminationFuture = submissionContext.taskExecutor.closeAsync();
        }
        Assert.assertThat((Object)taskExecutorTerminationFuture.isDone(), (Matcher)Matchers.is((Object)false));
        taskSlotTableClosingFuture.complete(null);
        taskExecutorTerminationFuture.get();
    }

    private ResourceManagerId createAndRegisterResourceManager(CompletableFuture<Tuple3<ResourceID, InstanceID, SlotReport>> initialSlotReportFuture) {
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
        resourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
            initialSlotReportFuture.complete((Tuple3<ResourceID, InstanceID, SlotReport>)resourceIDInstanceIDSlotReportTuple3);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        this.rpc.registerGateway(resourceManagerGateway.getAddress(), (RpcGateway)resourceManagerGateway);
        this.resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());
        return resourceManagerGateway.getFencingToken();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJobLeaderDetection() throws Exception {
        TaskSlotTableImpl taskSlotTable = TaskSlotUtils.createTaskSlotTable(1, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        DefaultJobLeaderService jobLeaderService = new DefaultJobLeaderService(this.unresolvedTaskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
        CompletableFuture initialSlotReportFuture = new CompletableFuture();
        resourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
            initialSlotReportFuture.complete(null);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        CompletableFuture offeredSlotsFuture = new CompletableFuture();
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setOfferSlotsFunction((resourceID, slotOffers) -> {
            offeredSlotsFuture.complete(new ArrayList(slotOffers));
            return CompletableFuture.completedFuture(slotOffers);
        }).build();
        this.rpc.registerGateway(resourceManagerGateway.getAddress(), (RpcGateway)resourceManagerGateway);
        this.rpc.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        AllocationID allocationId = new AllocationID();
        TaskExecutorLocalStateStoresManager localStateStoresManager = this.createTaskExecutorLocalStateStoresManager();
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setUnresolvedTaskManagerLocation(this.unresolvedTaskManagerLocation).setTaskSlotTable((TaskSlotTable<Task>)taskSlotTable).setJobLeaderService((JobLeaderService)jobLeaderService).setTaskStateManager(localStateStoresManager).build();
        TaskExecutor taskManager = this.createTaskExecutor(taskManagerServices);
        try {
            taskManager.start();
            TaskExecutorGateway tmGateway = (TaskExecutorGateway)taskManager.getSelfGateway(TaskExecutorGateway.class);
            this.resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());
            initialSlotReportFuture.get();
            this.requestSlot(tmGateway, this.jobId, allocationId, this.buildSlotID(0), ResourceProfile.ZERO, jobMasterGateway.getAddress(), resourceManagerGateway.getFencingToken());
            this.jobManagerLeaderRetriever.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
            Collection offeredSlots = (Collection)offeredSlotsFuture.get();
            Collection allocationIds = offeredSlots.stream().map(SlotOffer::getAllocationId).collect(Collectors.toList());
            Assert.assertThat((Object)allocationIds, (Matcher)Matchers.containsInAnyOrder((Object[])new AllocationID[]{allocationId}));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskManager});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskManager});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlotAcceptance() throws Exception {
        InstanceID registrationId = new InstanceID();
        OneShotLatch taskExecutorIsRegistered = new OneShotLatch();
        CompletableFuture<Tuple3<InstanceID, SlotID, AllocationID>> availableSlotFuture = new CompletableFuture<Tuple3<InstanceID, SlotID, AllocationID>>();
        TestingResourceManagerGateway resourceManagerGateway = this.createRmWithTmRegisterAndNotifySlotHooks(registrationId, taskExecutorIsRegistered, availableSlotFuture);
        AllocationID allocationId1 = new AllocationID();
        AllocationID allocationId2 = new AllocationID();
        SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.ANY);
        OneShotLatch offerSlotsLatch = new OneShotLatch();
        OneShotLatch taskInTerminalState = new OneShotLatch();
        CompletableFuture<Collection<SlotOffer>> offerResultFuture = new CompletableFuture<Collection<SlotOffer>>();
        TestingJobMasterGateway jobMasterGateway = TaskExecutorTest.createJobMasterWithSlotOfferAndTaskTerminationHooks(offerSlotsLatch, taskInTerminalState, offerResultFuture);
        this.rpc.registerGateway(resourceManagerGateway.getAddress(), (RpcGateway)resourceManagerGateway);
        this.rpc.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        TaskSlotTableImpl taskSlotTable = TaskSlotUtils.createTaskSlotTable(2, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        TaskManagerServices taskManagerServices = this.createTaskManagerServicesWithTaskSlotTable((TaskSlotTable<Task>)taskSlotTable);
        TestingTaskExecutor taskManager = this.createTestingTaskExecutor(taskManagerServices);
        try {
            taskManager.start();
            taskManager.waitUntilStarted();
            TaskExecutorGateway tmGateway = (TaskExecutorGateway)taskManager.getSelfGateway(TaskExecutorGateway.class);
            taskExecutorIsRegistered.await();
            AllocationID[] allocationIds = new AllocationID[]{allocationId1, allocationId2};
            for (int i = 0; i < allocationIds.length; ++i) {
                this.requestSlot(tmGateway, this.jobId, allocationIds[i], this.buildSlotID(i), ResourceProfile.UNKNOWN, jobMasterGateway.getAddress(), resourceManagerGateway.getFencingToken());
            }
            this.jobManagerLeaderRetriever.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
            offerSlotsLatch.await();
            offerResultFuture.complete(Collections.singletonList(offer1));
            Tuple3<InstanceID, SlotID, AllocationID> instanceIDSlotIDAllocationIDTuple3 = availableSlotFuture.get();
            Tuple3 expectedResult = Tuple3.of((Object)registrationId, (Object)this.buildSlotID(1), (Object)allocationId2);
            Assert.assertThat(instanceIDSlotIDAllocationIDTuple3, (Matcher)Matchers.equalTo((Object)expectedResult));
            this.submit(allocationId1, jobMasterGateway, tmGateway, NoOpInvokable.class);
            taskInTerminalState.await();
            try {
                this.submit(allocationId2, jobMasterGateway, tmGateway, NoOpInvokable.class);
                Assert.fail((String)"It should not be possible to submit task to acquired by JM slot with index 1 (allocationId2)");
            }
            catch (CompletionException e) {
                Assert.assertThat((Object)e.getCause(), (Matcher)Matchers.instanceOf(TaskSubmissionException.class));
            }
            this.requestSlot(tmGateway, this.jobId, allocationId2, this.buildSlotID(1), ResourceProfile.UNKNOWN, jobMasterGateway.getAddress(), resourceManagerGateway.getFencingToken());
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskManager});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskManager});
    }

    @Test
    public void testRejectedSlotNotFreedIfAnotherOfferIsPending() throws Exception {
        this.testSlotOfferResponseWithPendingSlotOffer(ResponseOrder.REJECT_THEN_ACCEPT);
    }

    @Test
    public void testAcceptedSlotNotActivatedIfAnotherOfferIsPending() throws Exception {
        this.testSlotOfferResponseWithPendingSlotOffer(ResponseOrder.ACCEPT_THEN_REJECT);
    }

    /*
     * Exception decompiling
     */
    private void testSlotOfferResponseWithPendingSlotOffer(ResponseOrder responseOrder) throws Exception {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlotOfferCounterIsSeparatedByJob() throws Exception {
        OneShotLatch taskExecutorIsRegistered = new OneShotLatch();
        TestingResourceManagerGateway resourceManagerGateway = this.createRmWithTmRegisterAndNotifySlotHooks(new InstanceID(), taskExecutorIsRegistered, new CompletableFuture<Tuple3<InstanceID, SlotID, AllocationID>>());
        CompletableFuture<List<SlotOffer>> firstOfferResponseFuture = new CompletableFuture<List<SlotOffer>>();
        CompletableFuture<List<SlotOffer>> secondOfferResponseFuture = new CompletableFuture<List<SlotOffer>>();
        ArrayDeque<CompletableFuture> slotOfferResponses = new ArrayDeque<CompletableFuture>(Arrays.asList(firstOfferResponseFuture, secondOfferResponseFuture));
        MultiShotLatch offerSlotsLatch = new MultiShotLatch();
        TestingJobMasterGateway jobMasterGateway1 = new TestingJobMasterGatewayBuilder().setAddress("jm1").setOfferSlotsFunction((resourceID, slotOffers) -> {
            offerSlotsLatch.trigger();
            return (CompletableFuture)slotOfferResponses.remove();
        }).build();
        TestingJobMasterGateway jobMasterGateway2 = new TestingJobMasterGatewayBuilder().setAddress("jm2").setOfferSlotsFunction((resourceID, slotOffers) -> {
            offerSlotsLatch.trigger();
            return (CompletableFuture)slotOfferResponses.remove();
        }).build();
        this.rpc.registerGateway(resourceManagerGateway.getAddress(), (RpcGateway)resourceManagerGateway);
        this.rpc.registerGateway(jobMasterGateway1.getAddress(), (RpcGateway)jobMasterGateway1);
        this.rpc.registerGateway(jobMasterGateway2.getAddress(), (RpcGateway)jobMasterGateway2);
        TaskSlotTableImpl taskSlotTable = TaskSlotUtils.createTaskSlotTable(2, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        TaskManagerServices taskManagerServices = this.createTaskManagerServicesWithTaskSlotTable((TaskSlotTable<Task>)taskSlotTable);
        TestingTaskExecutor taskExecutor = this.createTestingTaskExecutor(taskManagerServices);
        ThreadSafeTaskSlotTable threadSafeTaskSlotTable = new ThreadSafeTaskSlotTable(taskSlotTable, taskExecutor.getMainThreadExecutableForTesting());
        SlotOffer slotOffer1 = new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY);
        SlotOffer slotOffer2 = new SlotOffer(new AllocationID(), 1, ResourceProfile.ANY);
        try {
            taskExecutor.start();
            taskExecutor.waitUntilStarted();
            TaskExecutorGateway tmGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            taskExecutorIsRegistered.await();
            this.jobManagerLeaderRetriever.notifyListener(jobMasterGateway1.getAddress(), jobMasterGateway1.getFencingToken().toUUID());
            this.jobManagerLeaderRetriever2.notifyListener(jobMasterGateway2.getAddress(), jobMasterGateway2.getFencingToken().toUUID());
            this.requestSlot(tmGateway, this.jobId, slotOffer1.getAllocationId(), this.buildSlotID(slotOffer1.getSlotIndex()), ResourceProfile.UNKNOWN, jobMasterGateway1.getAddress(), resourceManagerGateway.getFencingToken());
            offerSlotsLatch.await();
            this.requestSlot(tmGateway, this.jobId2, slotOffer2.getAllocationId(), this.buildSlotID(slotOffer2.getSlotIndex()), ResourceProfile.UNKNOWN, jobMasterGateway2.getAddress(), resourceManagerGateway.getFencingToken());
            offerSlotsLatch.await();
            firstOfferResponseFuture.complete(Collections.singletonList(slotOffer1));
            secondOfferResponseFuture.complete(Collections.singletonList(slotOffer2));
            Assert.assertThat(threadSafeTaskSlotTable.getActiveTaskSlotAllocationIdsPerJob(this.jobId), (Matcher)Matchers.contains((Object[])new AllocationID[]{slotOffer1.getAllocationId()}));
            Assert.assertThat(threadSafeTaskSlotTable.getActiveTaskSlotAllocationIdsPerJob(this.jobId2), (Matcher)Matchers.contains((Object[])new AllocationID[]{slotOffer2.getAllocationId()}));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFreeingInactiveSlotDoesNotFail() throws Exception {
        OneShotLatch taskExecutorIsRegistered = new OneShotLatch();
        CompletableFuture<Tuple3<InstanceID, SlotID, AllocationID>> availableSlotFuture = new CompletableFuture<Tuple3<InstanceID, SlotID, AllocationID>>();
        TestingResourceManagerGateway resourceManagerGateway = this.createRmWithTmRegisterAndNotifySlotHooks(new InstanceID(), taskExecutorIsRegistered, availableSlotFuture);
        this.rpc.registerGateway(resourceManagerGateway.getAddress(), (RpcGateway)resourceManagerGateway);
        MultiShotLatch offerSlotsLatch = new MultiShotLatch();
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setOfferSlotsFunction((resourceID, slotOffers) -> {
            offerSlotsLatch.trigger();
            return new CompletableFuture();
        }).build();
        this.rpc.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        TaskSlotTableImpl taskSlotTable = TaskSlotUtils.createTaskSlotTable(1, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        TaskExecutorLocalStateStoresManager localStateStoresManager = this.createTaskExecutorLocalStateStoresManager();
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setUnresolvedTaskManagerLocation(this.unresolvedTaskManagerLocation).setTaskSlotTable((TaskSlotTable<Task>)taskSlotTable).setTaskStateManager(localStateStoresManager).build();
        TestingTaskExecutor taskExecutor = this.createTestingTaskExecutor(taskManagerServices);
        ThreadSafeTaskSlotTable threadSafeTaskSlotTable = new ThreadSafeTaskSlotTable(taskSlotTable, taskExecutor.getMainThreadExecutableForTesting());
        try {
            taskExecutor.start();
            taskExecutor.waitUntilStarted();
            TaskExecutorGateway tmGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            taskExecutorIsRegistered.await();
            this.jobManagerLeaderRetriever.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
            AllocationID allocationId = new AllocationID();
            this.requestSlot(tmGateway, this.jobId, allocationId, this.buildSlotID(0), ResourceProfile.UNKNOWN, jobMasterGateway.getAddress(), resourceManagerGateway.getFencingToken());
            offerSlotsLatch.await();
            tmGateway.freeSlot(allocationId, (Throwable)new RuntimeException("test exception"), timeout).get();
            Assert.assertThat((Object)availableSlotFuture.get().f2, (Matcher)Matchers.is((Object)allocationId));
            Assert.assertThat(threadSafeTaskSlotTable.getAllocationIdsPerJob(this.jobId), (Matcher)Matchers.empty());
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSubmitTaskBeforeAcceptSlot() throws Exception {
        InstanceID registrationId = new InstanceID();
        OneShotLatch taskExecutorIsRegistered = new OneShotLatch();
        CompletableFuture<Tuple3<InstanceID, SlotID, AllocationID>> availableSlotFuture = new CompletableFuture<Tuple3<InstanceID, SlotID, AllocationID>>();
        TestingResourceManagerGateway resourceManagerGateway = this.createRmWithTmRegisterAndNotifySlotHooks(registrationId, taskExecutorIsRegistered, availableSlotFuture);
        AllocationID allocationId1 = new AllocationID();
        AllocationID allocationId2 = new AllocationID();
        SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.ANY);
        OneShotLatch offerSlotsLatch = new OneShotLatch();
        OneShotLatch taskInTerminalState = new OneShotLatch();
        CompletableFuture<Collection<SlotOffer>> offerResultFuture = new CompletableFuture<Collection<SlotOffer>>();
        TestingJobMasterGateway jobMasterGateway = TaskExecutorTest.createJobMasterWithSlotOfferAndTaskTerminationHooks(offerSlotsLatch, taskInTerminalState, offerResultFuture);
        this.rpc.registerGateway(resourceManagerGateway.getAddress(), (RpcGateway)resourceManagerGateway);
        this.rpc.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        TaskSlotTableImpl taskSlotTable = TaskSlotUtils.createTaskSlotTable(2, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        TaskManagerServices taskManagerServices = this.createTaskManagerServicesWithTaskSlotTable((TaskSlotTable<Task>)taskSlotTable);
        TestingTaskExecutor taskManager = this.createTestingTaskExecutor(taskManagerServices);
        try {
            taskManager.start();
            taskManager.waitUntilStarted();
            TaskExecutorGateway tmGateway = (TaskExecutorGateway)taskManager.getSelfGateway(TaskExecutorGateway.class);
            taskExecutorIsRegistered.await();
            AllocationID[] allocationIds = new AllocationID[]{allocationId1, allocationId2};
            for (int i = 0; i < allocationIds.length; ++i) {
                this.requestSlot(tmGateway, this.jobId, allocationIds[i], this.buildSlotID(i), ResourceProfile.UNKNOWN, jobMasterGateway.getAddress(), resourceManagerGateway.getFencingToken());
            }
            this.jobManagerLeaderRetriever.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
            offerSlotsLatch.await();
            this.submit(allocationId1, jobMasterGateway, tmGateway, NoOpInvokable.class);
            offerResultFuture.complete(Collections.singleton(offer1));
            Tuple3<InstanceID, SlotID, AllocationID> instanceIDSlotIDAllocationIDTuple3 = availableSlotFuture.get();
            Assert.assertThat((Object)instanceIDSlotIDAllocationIDTuple3.f2, (Matcher)Matchers.equalTo((Object)allocationId2));
            taskInTerminalState.await();
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskManager});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskManager});
    }

    private TestingResourceManagerGateway createRmWithTmRegisterAndNotifySlotHooks(InstanceID registrationId, OneShotLatch taskExecutorIsRegistered, CompletableFuture<Tuple3<InstanceID, SlotID, AllocationID>> availableSlotFuture) {
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
        this.resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());
        resourceManagerGateway.setRegisterTaskExecutorFunction(taskExecutorRegistration -> CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerGateway.getOwnResourceId(), new ClusterInformation("localhost", 1234), null)));
        resourceManagerGateway.setNotifySlotAvailableConsumer(availableSlotFuture::complete);
        resourceManagerGateway.setSendSlotReportFunction(ignored -> {
            taskExecutorIsRegistered.trigger();
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        return resourceManagerGateway;
    }

    private TaskManagerServices createTaskManagerServicesWithTaskSlotTable(TaskSlotTable<Task> taskSlotTable) throws IOException {
        return new TaskManagerServicesBuilder().setUnresolvedTaskManagerLocation(this.unresolvedTaskManagerLocation).setShuffleEnvironment((ShuffleEnvironment<?, ?>)this.nettyShuffleEnvironment).setTaskSlotTable(taskSlotTable).setJobLeaderService((JobLeaderService)new DefaultJobLeaderService(this.unresolvedTaskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration())).setTaskStateManager(this.createTaskExecutorLocalStateStoresManager()).build();
    }

    private static TestingJobMasterGateway createJobMasterWithSlotOfferAndTaskTerminationHooks(OneShotLatch offerSlotsLatch, OneShotLatch taskInTerminalState, CompletableFuture<Collection<SlotOffer>> offerResultFuture) {
        return new TestingJobMasterGatewayBuilder().setOfferSlotsFunction((resourceID, slotOffers) -> {
            offerSlotsLatch.trigger();
            return offerResultFuture;
        }).setUpdateTaskExecutionStateFunction(taskExecutionState -> {
            if (taskExecutionState.getExecutionState().isTerminal()) {
                taskInTerminalState.trigger();
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
    }

    private <T extends TaskInvokable> ExecutionAttemptID submit(AllocationID allocationId, TestingJobMasterGateway jobMasterGateway, TaskExecutorGateway tmGateway, Class<T> invokableClass) throws IOException {
        TaskDeploymentDescriptor tdd = TaskDeploymentDescriptorBuilder.newBuilder(this.jobId, invokableClass).setAllocationId(allocationId).build();
        tmGateway.submitTask(tdd, jobMasterGateway.getFencingToken(), timeout).join();
        return tdd.getExecutionAttemptId();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRMHeartbeatStopWhenLeadershipRevoked() throws Exception {
        long heartbeatInterval = 1L;
        long heartbeatTimeout = 10000L;
        long pollTimeout = 1000L;
        RecordingHeartbeatServices heartbeatServices = new RecordingHeartbeatServices(1L, 10000L);
        ResourceID rmResourceID = ResourceID.generate();
        TaskSlotTableImpl taskSlotTable = TaskSlotUtils.createTaskSlotTable(1, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        String rmAddress = "rm";
        TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway(ResourceManagerId.generate(), rmResourceID, "rm", "rm");
        this.rpc.registerGateway("rm", (RpcGateway)rmGateway);
        TaskExecutorLocalStateStoresManager localStateStoresManager = this.createTaskExecutorLocalStateStoresManager();
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setUnresolvedTaskManagerLocation(this.unresolvedTaskManagerLocation).setTaskSlotTable((TaskSlotTable<Task>)taskSlotTable).setTaskStateManager(localStateStoresManager).build();
        TaskExecutor taskExecutor = this.createTaskExecutor(taskManagerServices, heartbeatServices);
        try {
            taskExecutor.start();
            BlockingQueue<ResourceID> unmonitoredTargets = heartbeatServices.getUnmonitoredTargets();
            BlockingQueue<ResourceID> monitoredTargets = heartbeatServices.getMonitoredTargets();
            this.resourceManagerLeaderRetriever.notifyListener("rm", rmGateway.getFencingToken().toUUID());
            Assert.assertThat((Object)monitoredTargets.poll(1000L, TimeUnit.MILLISECONDS), (Matcher)Matchers.equalTo((Object)rmResourceID));
            this.resourceManagerLeaderRetriever.notifyListener(null, null);
            Assert.assertThat((Object)unmonitoredTargets.poll(1000L, TimeUnit.MILLISECONDS), (Matcher)Matchers.equalTo((Object)rmResourceID));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRemoveJobFromJobLeaderService() throws Exception {
        TaskSlotTableImpl taskSlotTable = TaskSlotUtils.createTaskSlotTable(1, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        TaskExecutorLocalStateStoresManager localStateStoresManager = this.createTaskExecutorLocalStateStoresManager();
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setUnresolvedTaskManagerLocation(this.unresolvedTaskManagerLocation).setTaskSlotTable((TaskSlotTable<Task>)taskSlotTable).setTaskStateManager(localStateStoresManager).build();
        TestingTaskExecutor taskExecutor = this.createTestingTaskExecutor(taskManagerServices);
        try {
            TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
            CompletableFuture initialSlotReport = new CompletableFuture();
            resourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
                initialSlotReport.complete(null);
                return CompletableFuture.completedFuture(Acknowledge.get());
            });
            ResourceManagerId resourceManagerId = resourceManagerGateway.getFencingToken();
            this.rpc.registerGateway(resourceManagerGateway.getAddress(), (RpcGateway)resourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerId.toUUID());
            CompletableFuture<LeaderRetrievalListener> startFuture = new CompletableFuture<LeaderRetrievalListener>();
            CompletableFuture<Void> stopFuture = new CompletableFuture<Void>();
            StartStopNotifyingLeaderRetrievalService jobMasterLeaderRetriever = new StartStopNotifyingLeaderRetrievalService(startFuture, stopFuture);
            this.haServices.setJobMasterLeaderRetriever(this.jobId, jobMasterLeaderRetriever);
            taskExecutor.start();
            taskExecutor.waitUntilStarted();
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            SlotID slotId = this.buildSlotID(0);
            AllocationID allocationId = new AllocationID();
            Assert.assertThat((Object)startFuture.isDone(), (Matcher)Matchers.is((Object)false));
            JobLeaderService jobLeaderService = taskManagerServices.getJobLeaderService();
            Assert.assertThat((Object)jobLeaderService.containsJob(this.jobId), (Matcher)Matchers.is((Object)false));
            initialSlotReport.get();
            this.requestSlot(taskExecutorGateway, this.jobId, allocationId, slotId, ResourceProfile.ZERO, "foobar", resourceManagerId);
            startFuture.get();
            Assert.assertThat((Object)jobLeaderService.containsJob(this.jobId), (Matcher)Matchers.is((Object)true));
            taskExecutorGateway.freeSlot(allocationId, (Throwable)new FlinkException("Test exception"), timeout).get();
            stopFuture.get();
            Assert.assertThat((Object)jobLeaderService.containsJob(this.jobId), (Matcher)Matchers.is((Object)false));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
    }

    @Test
    public void testMaximumRegistrationDuration() throws Exception {
        this.configuration.set(TaskManagerOptions.REGISTRATION_TIMEOUT, (Object)TimeUtils.parseDuration((String)"10 ms"));
        TaskExecutor taskExecutor = this.createTaskExecutor(new TaskManagerServicesBuilder().build());
        taskExecutor.start();
        try {
            Throwable error = this.testingFatalErrorHandler.getErrorFuture().get();
            Assert.assertThat((Object)error, (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
            Assert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)error), (Matcher)Matchers.instanceOf(RegistrationTimeoutException.class));
            this.testingFatalErrorHandler.clearError();
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMaximumRegistrationDurationAfterConnectionLoss() throws Exception {
        this.configuration.set(TaskManagerOptions.REGISTRATION_TIMEOUT, (Object)TimeUtils.parseDuration((String)"100 ms"));
        TaskSlotTableImpl taskSlotTable = TaskSlotUtils.createTaskSlotTable(1, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable((TaskSlotTable<Task>)taskSlotTable).build();
        TaskExecutor taskExecutor = this.createTaskExecutor(taskManagerServices, (HeartbeatServices)new HeartbeatServicesImpl(10L, 10L));
        taskExecutor.start();
        CompletableFuture registrationFuture = new CompletableFuture();
        OneShotLatch secondRegistration = new OneShotLatch();
        try {
            TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            testingResourceManagerGateway.setRegisterTaskExecutorFunction(taskExecutorRegistration -> {
                if (registrationFuture.complete(taskExecutorRegistration.getResourceId())) {
                    return this.createRegistrationResponse(testingResourceManagerGateway);
                }
                secondRegistration.trigger();
                return CompletableFuture.completedFuture(new RegistrationResponse.Failure((Throwable)new FlinkException("Only the first registration should succeed.")));
            });
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), UUID.randomUUID());
            ResourceID registrationResourceId = (ResourceID)registrationFuture.get();
            Assert.assertThat((Object)registrationResourceId, (Matcher)Matchers.equalTo((Object)taskManagerServices.getUnresolvedTaskManagerLocation().getResourceID()));
            secondRegistration.await();
            Throwable error = this.testingFatalErrorHandler.getErrorFuture().get();
            Assert.assertThat((Object)error, (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
            Assert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)error), (Matcher)Matchers.instanceOf(RegistrationTimeoutException.class));
            this.testingFatalErrorHandler.clearError();
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testIgnoringSlotRequestsIfNotRegistered() throws Exception {
        TaskExecutor taskExecutor = this.createTaskExecutor(1);
        taskExecutor.start();
        try {
            TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            CompletableFuture registrationFuture = new CompletableFuture();
            CompletableFuture taskExecutorResourceIdFuture = new CompletableFuture();
            testingResourceManagerGateway.setRegisterTaskExecutorFunction(taskExecutorRegistration -> {
                taskExecutorResourceIdFuture.complete(taskExecutorRegistration.getResourceId());
                return registrationFuture;
            });
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            ResourceID resourceId = (ResourceID)taskExecutorResourceIdFuture.get();
            CompletableFuture slotRequestResponse = taskExecutorGateway.requestSlot(new SlotID(resourceId, 0), this.jobId, new AllocationID(), ResourceProfile.ZERO, "foobar", testingResourceManagerGateway.getFencingToken(), timeout);
            try {
                slotRequestResponse.get();
                Assert.fail((String)"We should not be able to request slots before the TaskExecutor is registered at the ResourceManager.");
            }
            catch (ExecutionException ee) {
                Assert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)ee), (Matcher)Matchers.instanceOf(TaskManagerException.class));
            }
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReconnectionAttemptIfExplicitlyDisconnected() throws Exception {
        TaskSlotTableImpl taskSlotTable = TaskSlotUtils.createTaskSlotTable(1, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        LocalUnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        TaskExecutor taskExecutor = this.createTaskExecutor(new TaskManagerServicesBuilder().setTaskSlotTable((TaskSlotTable<Task>)taskSlotTable).setUnresolvedTaskManagerLocation(unresolvedTaskManagerLocation).build());
        taskExecutor.start();
        try {
            TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            ClusterInformation clusterInformation = new ClusterInformation("foobar", 1234);
            CompletableFuture<TaskExecutorRegistrationSuccess> registrationResponseFuture = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), clusterInformation, null));
            ArrayBlockingQueue registrationQueue = new ArrayBlockingQueue(1);
            testingResourceManagerGateway.setRegisterTaskExecutorFunction(taskExecutorRegistration -> {
                registrationQueue.offer(taskExecutorRegistration.getResourceId());
                return registrationResponseFuture;
            });
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
            ResourceID firstRegistrationAttempt = (ResourceID)registrationQueue.take();
            Assert.assertThat((Object)firstRegistrationAttempt, (Matcher)Matchers.equalTo((Object)unresolvedTaskManagerLocation.getResourceID()));
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            Assert.assertThat(registrationQueue, (Matcher)Matchers.is((Matcher)Matchers.empty()));
            taskExecutorGateway.disconnectResourceManager((Exception)((Object)new FlinkException("Test exception")));
            ResourceID secondRegistrationAttempt = (ResourceID)registrationQueue.take();
            Assert.assertThat((Object)secondRegistrationAttempt, (Matcher)Matchers.equalTo((Object)unresolvedTaskManagerLocation.getResourceID()));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testInitialSlotReport() throws Exception {
        TaskExecutor taskExecutor = this.createTaskExecutor(1);
        taskExecutor.start();
        try {
            TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            CompletableFuture initialSlotReportFuture = new CompletableFuture();
            testingResourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
                initialSlotReportFuture.complete(resourceIDInstanceIDSlotReportTuple3.f0);
                return CompletableFuture.completedFuture(Acknowledge.get());
            });
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
            Assert.assertThat(initialSlotReportFuture.get(), (Matcher)Matchers.equalTo((Object)taskExecutor.getResourceID()));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRegisterWithDefaultSlotResourceProfile() throws Exception {
        int numberOfSlots = 2;
        TaskExecutor taskExecutor = this.createTaskExecutor(2);
        taskExecutor.start();
        try {
            TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            CompletableFuture registeredDefaultSlotResourceProfileFuture = new CompletableFuture();
            ResourceID ownResourceId = testingResourceManagerGateway.getOwnResourceId();
            testingResourceManagerGateway.setRegisterTaskExecutorFunction(taskExecutorRegistration -> {
                registeredDefaultSlotResourceProfileFuture.complete(taskExecutorRegistration.getDefaultSlotResourceProfile());
                return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ownResourceId, new ClusterInformation("localhost", 1234), null));
            });
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
            Assert.assertThat(registeredDefaultSlotResourceProfileFuture.get(), (Matcher)Matchers.equalTo((Object)TaskExecutorResourceUtils.generateDefaultSlotResourceProfile((TaskExecutorResourceSpec)TM_RESOURCE_SPEC, (int)2)));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testInitialSlotReportFailure() throws Exception {
        TaskSlotTableImpl taskSlotTable = TaskSlotUtils.createTaskSlotTable(1, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        LocalUnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable((TaskSlotTable<Task>)taskSlotTable).setUnresolvedTaskManagerLocation(unresolvedTaskManagerLocation).build();
        TaskExecutor taskExecutor = this.createTaskExecutor(taskManagerServices);
        taskExecutor.start();
        try {
            TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            ArrayBlockingQueue<CompletableFuture<Acknowledge>> responseQueue = new ArrayBlockingQueue<CompletableFuture<Acknowledge>>(2);
            testingResourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
                try {
                    return (CompletableFuture)responseQueue.take();
                }
                catch (InterruptedException e) {
                    return FutureUtils.completedExceptionally((Throwable)e);
                }
            });
            CompletableFuture<TaskExecutorRegistrationSuccess> registrationResponse = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), testingResourceManagerGateway.getOwnResourceId(), new ClusterInformation("foobar", 1234), null));
            CountDownLatch numberRegistrations = new CountDownLatch(2);
            testingResourceManagerGateway.setRegisterTaskExecutorFunction(taskExecutorRegistration -> {
                numberRegistrations.countDown();
                return registrationResponse;
            });
            responseQueue.offer(FutureUtils.completedExceptionally((Throwable)new FlinkException("Test exception")));
            responseQueue.offer(CompletableFuture.completedFuture(Acknowledge.get()));
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
            numberRegistrations.await();
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testOfferSlotToJobMasterAfterTimeout() throws Exception {
        TaskSlotTableImpl taskSlotTable = TaskSlotUtils.createTaskSlotTable(2, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable((TaskSlotTable<Task>)taskSlotTable).build();
        TaskExecutor taskExecutor = this.createTaskExecutor(taskManagerServices);
        AllocationID allocationId = new AllocationID();
        CompletableFuture initialSlotReportFuture = new CompletableFuture();
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
            initialSlotReportFuture.complete(null);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
        this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
        CountDownLatch slotOfferings = new CountDownLatch(3);
        CompletableFuture offeredSlotFuture = new CompletableFuture();
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setOfferSlotsFunction((resourceID, slotOffers) -> {
            Assert.assertThat((Object)slotOffers.size(), (Matcher)Matchers.is((Object)1));
            slotOfferings.countDown();
            if (slotOfferings.getCount() == 0L) {
                offeredSlotFuture.complete(((SlotOffer)slotOffers.iterator().next()).getAllocationId());
                return CompletableFuture.completedFuture(slotOffers);
            }
            return FutureUtils.completedExceptionally((Throwable)new TimeoutException());
        }).build();
        String jobManagerAddress = jobMasterGateway.getAddress();
        this.rpc.registerGateway(jobManagerAddress, (RpcGateway)jobMasterGateway);
        this.jobManagerLeaderRetriever.notifyListener(jobManagerAddress, jobMasterGateway.getFencingToken().toUUID());
        try {
            taskExecutor.start();
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            initialSlotReportFuture.get();
            this.requestSlot(taskExecutorGateway, this.jobId, allocationId, new SlotID(taskExecutor.getResourceID(), 0), ResourceProfile.ZERO, jobManagerAddress, testingResourceManagerGateway.getFencingToken());
            slotOfferings.await();
            Assert.assertThat(offeredSlotFuture.get(), (Matcher)Matchers.is((Object)allocationId));
            Assert.assertTrue((boolean)taskSlotTable.isSlotFree(1));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDisconnectFromJobMasterWhenNewLeader() throws Exception {
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable((TaskSlotTable<Task>)TaskSlotUtils.createTaskSlotTable(1, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor())).build();
        TaskExecutor taskExecutor = this.createTaskExecutor(taskManagerServices);
        CompletableFuture offeredSlotsFuture = new CompletableFuture();
        CompletableFuture disconnectFuture = new CompletableFuture();
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setOfferSlotsFunction((resourceID, slotOffers) -> {
            offeredSlotsFuture.complete(slotOffers.size());
            return CompletableFuture.completedFuture(slotOffers);
        }).setDisconnectTaskManagerFunction(resourceID -> {
            disconnectFuture.complete(resourceID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
        CompletableFuture initialSlotReportFuture = new CompletableFuture();
        resourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
            initialSlotReportFuture.complete(null);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        this.rpc.registerGateway(resourceManagerGateway.getAddress(), (RpcGateway)resourceManagerGateway);
        this.rpc.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        try {
            taskExecutor.start();
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            this.resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());
            initialSlotReportFuture.get();
            ResourceID resourceID2 = taskManagerServices.getUnresolvedTaskManagerLocation().getResourceID();
            this.requestSlot(taskExecutorGateway, this.jobId, new AllocationID(), new SlotID(resourceID2, 0), ResourceProfile.ZERO, "foobar", resourceManagerGateway.getFencingToken());
            this.jobManagerLeaderRetriever.notifyListener(jobMasterGateway.getAddress(), UUID.randomUUID());
            Assert.assertThat(offeredSlotsFuture.get(), (Matcher)Matchers.is((Object)1));
            this.jobManagerLeaderRetriever.notifyListener(null, null);
            Assert.assertThat(disconnectFuture.get(), (Matcher)Matchers.is((Object)resourceID2));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
    }

    @Test(timeout=10000L)
    public void testLogNotFoundHandling() throws Throwable {
        this.configuration.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, 0);
        this.configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
        this.configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
        this.configuration.setString("taskmanager.log.path", "/i/dont/exist");
        try (TaskSubmissionTestEnvironment env = new TaskSubmissionTestEnvironment.Builder(this.jobId).setConfiguration(this.configuration).setLocalCommunication(false).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());){
            TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
            try {
                CompletableFuture logFuture = tmGateway.requestFileUploadByType(FileType.LOG, timeout);
                logFuture.get();
            }
            catch (Exception e) {
                Assert.assertThat((Object)e.getMessage(), (Matcher)Matchers.containsString((String)"The file LOG does not exist on the TaskExecutor."));
            }
        }
    }

    @Test(timeout=10000L)
    public void testTerminationOnFatalError() throws Throwable {
        try (TaskSubmissionTestEnvironment env = new TaskSubmissionTestEnvironment.Builder(this.jobId).setConfiguration(this.configuration).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());){
            String testExceptionMsg = "Test exception of fatal error.";
            env.getTaskExecutor().onFatalError(new Exception(testExceptionMsg));
            Throwable exception = env.getTestingFatalErrorHandler().getErrorFuture().get();
            env.getTestingFatalErrorHandler().clearError();
            Assert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.startsWith((String)testExceptionMsg));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSyncSlotsWithJobMasterByHeartbeat() throws Exception {
        CountDownLatch activeSlots = new CountDownLatch(2);
        ActivateSlotNotifyingTaskSlotTable taskSlotTable = new ActivateSlotNotifyingTaskSlotTable(2, activeSlots);
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable((TaskSlotTable<Task>)taskSlotTable).build();
        TaskExecutor taskExecutor = this.createTaskExecutor(taskManagerServices);
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        ArrayBlockingQueue allocationsNotifiedFree = new ArrayBlockingQueue(2);
        OneShotLatch initialSlotReporting = new OneShotLatch();
        testingResourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
            initialSlotReporting.trigger();
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        testingResourceManagerGateway.setNotifySlotAvailableConsumer(instanceIDSlotIDAllocationIDTuple3 -> allocationsNotifiedFree.offer(instanceIDSlotIDAllocationIDTuple3.f2));
        this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
        this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
        ArrayBlockingQueue failedSlotFutures = new ArrayBlockingQueue(2);
        ResourceID jobManagerResourceId = ResourceID.generate();
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setFailSlotConsumer((TriConsumer<ResourceID, AllocationID, Throwable>)((TriConsumer)(resourceID, allocationID, throwable) -> failedSlotFutures.offer(allocationID))).setOfferSlotsFunction((resourceID, slotOffers) -> CompletableFuture.completedFuture(new ArrayList(slotOffers))).setRegisterTaskManagerFunction((ignoredJobId, ignoredTaskManagerRegistrationInformation) -> CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jobManagerResourceId))).build();
        String jobManagerAddress = jobMasterGateway.getAddress();
        this.rpc.registerGateway(jobManagerAddress, (RpcGateway)jobMasterGateway);
        this.jobManagerLeaderRetriever.notifyListener(jobManagerAddress, jobMasterGateway.getFencingToken().toUUID());
        taskExecutor.start();
        try {
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            initialSlotReporting.await();
            AllocationID allocationIdInBoth = new AllocationID();
            AllocationID allocationIdOnlyInJM = new AllocationID();
            AllocationID allocationIdOnlyInTM = new AllocationID();
            taskExecutorGateway.requestSlot(new SlotID(taskExecutor.getResourceID(), 0), this.jobId, allocationIdInBoth, ResourceProfile.ZERO, "foobar", testingResourceManagerGateway.getFencingToken(), timeout);
            taskExecutorGateway.requestSlot(new SlotID(taskExecutor.getResourceID(), 1), this.jobId, allocationIdOnlyInTM, ResourceProfile.ZERO, "foobar", testingResourceManagerGateway.getFencingToken(), timeout);
            activeSlots.await();
            List<AllocatedSlotInfo> allocatedSlotInfos = Arrays.asList(new AllocatedSlotInfo(0, allocationIdInBoth), new AllocatedSlotInfo(1, allocationIdOnlyInJM));
            AllocatedSlotReport allocatedSlotReport = new AllocatedSlotReport(this.jobId, allocatedSlotInfos);
            taskExecutorGateway.heartbeatFromJobManager(jobManagerResourceId, allocatedSlotReport);
            Assert.assertThat(failedSlotFutures.take(), (Matcher)Matchers.is((Object)allocationIdOnlyInJM));
            Assert.assertThat(allocationsNotifiedFree.take(), (Matcher)Matchers.is((Object)allocationIdOnlyInTM));
            Assert.assertThat(failedSlotFutures.poll(5L, TimeUnit.MILLISECONDS), (Matcher)Matchers.nullValue());
            Assert.assertThat(allocationsNotifiedFree.poll(5L, TimeUnit.MILLISECONDS), (Matcher)Matchers.nullValue());
        }
        catch (Throwable throwable2) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable2;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlotReportDoesNotContainStaleInformation() throws Exception {
        OneShotLatch receivedSlotRequest = new OneShotLatch();
        CompletableFuture verifySlotReportFuture = new CompletableFuture();
        OneShotLatch terminateSlotReportVerification = new OneShotLatch();
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setTaskExecutorHeartbeatFunction((ignored, heartbeatPayload) -> {
            try {
                ArrayList slots = Lists.newArrayList((Iterable)heartbeatPayload.getSlotReport());
                Assert.assertThat((Object)slots, (Matcher)Matchers.hasSize((int)1));
                SlotStatus slotStatus = (SlotStatus)slots.get(0);
                this.log.info("Received SlotStatus: {}", (Object)slotStatus);
                if (receivedSlotRequest.isTriggered()) {
                    Assert.assertThat((Object)slotStatus.getAllocationID(), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
                } else {
                    Assert.assertThat((Object)slotStatus.getAllocationID(), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
                }
            }
            catch (AssertionError e) {
                verifySlotReportFuture.completeExceptionally((Throwable)((Object)e));
            }
            if (terminateSlotReportVerification.isTriggered()) {
                verifySlotReportFuture.complete(null);
            }
            return FutureUtils.completedVoidFuture();
        });
        CompletableFuture taskExecutorRegistrationFuture = new CompletableFuture();
        testingResourceManagerGateway.setSendSlotReportFunction(ignored -> {
            taskExecutorRegistrationFuture.complete(null);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
        this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable((TaskSlotTable<Task>)new AllocateSlotNotifyingTaskSlotTable(receivedSlotRequest)).build();
        TaskExecutor taskExecutor = this.createTaskExecutor(taskManagerServices);
        ResourceID taskExecutorResourceId = taskManagerServices.getUnresolvedTaskManagerLocation().getResourceID();
        taskExecutor.start();
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
        ScheduledExecutorService heartbeatExecutor = java.util.concurrent.Executors.newSingleThreadScheduledExecutor();
        try {
            taskExecutorRegistrationFuture.get();
            OneShotLatch scheduleFirstHeartbeat = new OneShotLatch();
            ResourceID resourceManagerResourceId = testingResourceManagerGateway.getOwnResourceId();
            long heartbeatInterval = 5L;
            heartbeatExecutor.scheduleWithFixedDelay(() -> {
                scheduleFirstHeartbeat.trigger();
                taskExecutorGateway.heartbeatFromResourceManager(resourceManagerResourceId);
            }, 0L, 5L, TimeUnit.MILLISECONDS);
            scheduleFirstHeartbeat.await();
            taskExecutorGateway.requestSlot(new SlotID(taskExecutorResourceId, 0), this.jobId, new AllocationID(), ResourceProfile.ZERO, "foobar", testingResourceManagerGateway.getFencingToken(), timeout).get();
            terminateSlotReportVerification.trigger();
            verifySlotReportFuture.get();
        }
        catch (Throwable throwable) {
            ExecutorUtils.gracefulShutdown((long)timeout.toMilliseconds(), (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{heartbeatExecutor});
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable;
        }
        ExecutorUtils.gracefulShutdown((long)timeout.toMilliseconds(), (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{heartbeatExecutor});
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
    }

    @Test
    public void testDynamicSlotAllocation() throws Exception {
        AllocationID allocationId = new AllocationID();
        try (TaskExecutorTestingContext submissionContext = this.createTaskExecutorTestingContext(2);){
            submissionContext.start();
            CompletableFuture<Tuple3<ResourceID, InstanceID, SlotReport>> initialSlotReportFuture = new CompletableFuture<Tuple3<ResourceID, InstanceID, SlotReport>>();
            ResourceManagerId resourceManagerId = this.createAndRegisterResourceManager(initialSlotReportFuture);
            initialSlotReportFuture.get();
            ResourceProfile resourceProfile = TaskSlotUtils.DEFAULT_RESOURCE_PROFILE.merge(ResourceProfile.newBuilder().setCpuCores(0.1).build());
            TaskExecutorGateway selfGateway = (TaskExecutorGateway)submissionContext.taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            this.requestSlot(selfGateway, this.jobId, allocationId, SlotID.getDynamicSlotID((ResourceID)ResourceID.generate()), resourceProfile, submissionContext.jobMasterGateway.getAddress(), resourceManagerId);
            ResourceID resourceId = ResourceID.generate();
            SlotReport slotReport = submissionContext.taskSlotTable.createSlotReport(resourceId);
            Assert.assertThat((Object)slotReport, (Matcher)Matchers.containsInAnyOrder((Object[])new SlotStatus[]{new SlotStatus(new SlotID(resourceId, 0), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE), new SlotStatus(new SlotID(resourceId, 1), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE), new SlotStatus(new SlotID(resourceId, 2), resourceProfile, this.jobId, allocationId)}));
        }
    }

    @Test
    public void testReleasingJobResources() throws Exception {
        AllocationID[] slots = (AllocationID[])IntStream.range(0, 5).mapToObj(i -> new AllocationID()).toArray(AllocationID[]::new);
        try (TaskExecutorTestingContext ctx = this.createTaskExecutorTestingContext(slots.length);){
            ctx.start();
            ResourceManagerId rmId = this.getResourceManagerId();
            TaskExecutorGateway tm = (TaskExecutorGateway)ctx.taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            for (int i2 = 0; i2 < slots.length; ++i2) {
                this.requestSlot(tm, this.jobId, slots[i2], this.buildSlotID(i2), ResourceProfile.UNKNOWN, ctx.jobMasterGateway.getAddress(), rmId);
            }
            ctx.offerSlotsLatch.await();
            ExecutionAttemptID exec = this.submit(slots[0], ctx.jobMasterGateway, tm, BlockingNoOpInvokable.class);
            Assert.assertNotNull((Object)ctx.changelogStoragesManager.getChangelogStoragesByJobId(this.jobId));
            Assert.assertNotNull((Object)ctx.metricGroup.getJobMetricsGroup(this.jobId));
            tm.cancelTask(exec, timeout).get();
            this.waitForTasks(ctx, numTasks -> numTasks > 0);
            for (int i3 = 0; i3 < slots.length; ++i3) {
                tm.freeSlot(slots[i3], (Throwable)new RuntimeException("test exception"), timeout).get();
                boolean isLastSlot = i3 == slots.length - 1;
                Assert.assertEquals((Object)isLastSlot, (Object)(null == TaskExecutorTest.callInMain(ctx, () -> ctx.metricGroup.getJobMetricsGroup(this.jobId)) ? 1 : 0));
                Assert.assertEquals((Object)isLastSlot, (Object)(null == TaskExecutorTest.callInMain(ctx, () -> ctx.changelogStoragesManager.getChangelogStoragesByJobId(this.jobId)) ? 1 : 0));
            }
        }
    }

    @Test
    public void taskExecutorJobServicesCloseClassLoaderLeaseUponClosing() throws InterruptedException {
        OneShotLatch leaseReleaseLatch = new OneShotLatch();
        OneShotLatch closeHookLatch = new OneShotLatch();
        TestingClassLoaderLease classLoaderLease = TestingClassLoaderLease.newBuilder().setCloseRunnable(() -> ((OneShotLatch)leaseReleaseLatch).trigger()).build();
        TaskExecutor.TaskExecutorJobServices taskExecutorJobServices = TaskExecutor.TaskExecutorJobServices.create((LibraryCacheManager.ClassLoaderLease)classLoaderLease, () -> ((OneShotLatch)closeHookLatch).trigger());
        taskExecutorJobServices.close();
        leaseReleaseLatch.await();
        closeHookLatch.await();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReleaseOfJobResourcesIfJobMasterIsNotCorrect() throws Exception {
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable((TaskSlotTable<Task>)TaskSlotUtils.createTaskSlotTable(1, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor())).build();
        TestingTaskExecutorPartitionTracker taskExecutorPartitionTracker = new TestingTaskExecutorPartitionTracker();
        CompletableFuture jobPartitionsReleaseFuture = new CompletableFuture();
        taskExecutorPartitionTracker.setIsTrackingPartitionsForFunction(ignored -> true);
        taskExecutorPartitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(jobPartitionsReleaseFuture::complete);
        TaskExecutor taskExecutor = this.createTaskExecutor(taskManagerServices, HEARTBEAT_SERVICES, taskExecutorPartitionTracker);
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setRegisterTaskManagerFunction((ignoredJobId, ignoredTaskManagerRegistrationInformation) -> CompletableFuture.completedFuture(new JMTMRegistrationRejection("foobar"))).build();
        this.rpc.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        InstanceID registrationId = new InstanceID();
        OneShotLatch taskExecutorIsRegistered = new OneShotLatch();
        CompletableFuture<Tuple3<InstanceID, SlotID, AllocationID>> availableSlotFuture = new CompletableFuture<Tuple3<InstanceID, SlotID, AllocationID>>();
        TestingResourceManagerGateway resourceManagerGateway = this.createRmWithTmRegisterAndNotifySlotHooks(registrationId, taskExecutorIsRegistered, availableSlotFuture);
        this.rpc.registerGateway(resourceManagerGateway.getAddress(), (RpcGateway)resourceManagerGateway);
        this.resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());
        try {
            taskExecutor.start();
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            taskExecutorIsRegistered.await();
            AllocationID allocationId = new AllocationID();
            SlotID slotId = new SlotID(taskExecutor.getResourceID(), 0);
            this.requestSlot(taskExecutorGateway, this.jobId, allocationId, slotId, ResourceProfile.UNKNOWN, jobMasterGateway.getAddress(), resourceManagerGateway.getFencingToken());
            this.jobManagerLeaderRetriever.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
            Assert.assertThat((Object)availableSlotFuture.get().f1, (Matcher)Matchers.is((Object)slotId));
            Assert.assertThat((Object)availableSlotFuture.get().f2, (Matcher)Matchers.is((Object)allocationId));
            Assert.assertThat(jobPartitionsReleaseFuture.get(), (Matcher)Matchers.is((Object)this.jobId));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReleaseInactiveSlots() throws Exception {
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable((TaskSlotTable<Task>)TaskSlotUtils.createTaskSlotTable(1, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor())).build();
        TaskExecutor taskExecutor = this.createTaskExecutor(taskManagerServices, HEARTBEAT_SERVICES);
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setRegisterTaskManagerFunction((ignoredJobId, ignoredTaskManagerRegistrationInformation) -> new CompletableFuture()).build();
        this.rpc.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        InstanceID registrationId = new InstanceID();
        OneShotLatch taskExecutorIsRegistered = new OneShotLatch();
        CompletableFuture<Tuple3<InstanceID, SlotID, AllocationID>> availableSlotFuture = new CompletableFuture<Tuple3<InstanceID, SlotID, AllocationID>>();
        TestingResourceManagerGateway resourceManagerGateway = this.createRmWithTmRegisterAndNotifySlotHooks(registrationId, taskExecutorIsRegistered, availableSlotFuture);
        this.rpc.registerGateway(resourceManagerGateway.getAddress(), (RpcGateway)resourceManagerGateway);
        this.resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());
        try {
            taskExecutor.start();
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            taskExecutorIsRegistered.await();
            AllocationID allocationId = new AllocationID();
            SlotID slotId = new SlotID(taskExecutor.getResourceID(), 0);
            this.requestSlot(taskExecutorGateway, this.jobId, allocationId, slotId, ResourceProfile.UNKNOWN, jobMasterGateway.getAddress(), resourceManagerGateway.getFencingToken());
            taskExecutorGateway.freeInactiveSlots(this.jobId, timeout);
            Assert.assertThat((Object)availableSlotFuture.get().f1, (Matcher)Matchers.is((Object)slotId));
            Assert.assertThat((Object)availableSlotFuture.get().f2, (Matcher)Matchers.is((Object)allocationId));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{taskExecutor});
    }

    private TaskExecutorLocalStateStoresManager createTaskExecutorLocalStateStoresManager() throws IOException {
        return new TaskExecutorLocalStateStoresManager(false, Reference.owned((Object)new File[]{this.tmp.newFolder()}), Executors.directExecutor());
    }

    private TaskExecutor createTaskExecutor(int numberOFSlots) throws IOException {
        TaskSlotTableImpl taskSlotTable = TaskSlotUtils.createTaskSlotTable(numberOFSlots, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        LocalUnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable((TaskSlotTable<Task>)taskSlotTable).setUnresolvedTaskManagerLocation(unresolvedTaskManagerLocation).build();
        this.configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numberOFSlots);
        return this.createTaskExecutor(taskManagerServices);
    }

    @Nonnull
    private TaskExecutor createTaskExecutor(TaskManagerServices taskManagerServices) throws IOException {
        return this.createTaskExecutor(taskManagerServices, HEARTBEAT_SERVICES);
    }

    private TaskExecutor createTaskExecutor(TaskManagerServices taskManagerServices, HeartbeatServices heartbeatServices) throws IOException {
        return this.createTaskExecutor(taskManagerServices, heartbeatServices, (TaskExecutorPartitionTracker)new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()));
    }

    private TaskExecutor createTaskExecutor(TaskManagerServices taskManagerServices, HeartbeatServices heartbeatServices, TaskExecutorPartitionTracker taskExecutorPartitionTracker) throws IOException {
        return new TaskExecutor((RpcService)this.rpc, TaskManagerConfiguration.fromConfiguration((Configuration)this.configuration, (TaskExecutorResourceSpec)TM_RESOURCE_SPEC, (String)InetAddress.getLoopbackAddress().getHostAddress(), (File)TestFileUtils.createTempDir()), (HighAvailabilityServices)this.haServices, taskManagerServices, ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, (TaskExecutorBlobService)NoOpTaskExecutorBlobService.INSTANCE, (FatalErrorHandler)this.testingFatalErrorHandler, taskExecutorPartitionTracker, new DelegationTokenReceiverRepository(this.configuration, null));
    }

    private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskManagerServices) throws IOException {
        return this.createTestingTaskExecutor(taskManagerServices, HEARTBEAT_SERVICES);
    }

    private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskManagerServices, HeartbeatServices heartbeatServices) throws IOException {
        return this.createTestingTaskExecutor(taskManagerServices, heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup());
    }

    private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskManagerServices, HeartbeatServices heartbeatServices, TaskManagerMetricGroup metricGroup) throws IOException {
        return new TestingTaskExecutor(this.rpc, TaskManagerConfiguration.fromConfiguration((Configuration)this.configuration, (TaskExecutorResourceSpec)TM_RESOURCE_SPEC, (String)InetAddress.getLoopbackAddress().getHostAddress(), (File)TestFileUtils.createTempDir()), this.haServices, taskManagerServices, ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, heartbeatServices, metricGroup, null, NoOpTaskExecutorBlobService.INSTANCE, this.testingFatalErrorHandler, (TaskExecutorPartitionTracker)new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()), new DelegationTokenReceiverRepository(this.configuration, null));
    }

    private TaskExecutorTestingContext createTaskExecutorTestingContext(int numberOfSlots) throws IOException {
        return this.createTaskExecutorTestingContext((TaskSlotTable<Task>)TaskSlotUtils.createTaskSlotTable(numberOfSlots, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()), HEARTBEAT_SERVICES);
    }

    private TaskExecutorTestingContext createTaskExecutorTestingContext(TaskSlotTable<Task> taskSlotTable, HeartbeatServices heartbeatServices) throws IOException {
        OneShotLatch offerSlotsLatch = new OneShotLatch();
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setOfferSlotsFunction((resourceID, slotOffers) -> {
            offerSlotsLatch.trigger();
            return CompletableFuture.completedFuture(slotOffers);
        }).build();
        this.rpc.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        DefaultJobLeaderService jobLeaderService = new DefaultJobLeaderService(this.unresolvedTaskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
        TaskExecutorLocalStateStoresManager stateStoresManager = this.createTaskExecutorLocalStateStoresManager();
        TaskExecutorStateChangelogStoragesManager changelogStoragesManager = new TaskExecutorStateChangelogStoragesManager();
        TaskManagerMetricGroup metricGroup = TaskManagerMetricGroup.createTaskManagerMetricGroup((MetricRegistry)NoOpMetricRegistry.INSTANCE, (String)"", (ResourceID)ResourceID.generate());
        TestingTaskExecutor taskExecutor = this.createTestingTaskExecutor(new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).setJobLeaderService((JobLeaderService)jobLeaderService).setTaskStateManager(stateStoresManager).setTaskChangelogStoragesManager(changelogStoragesManager).build(), heartbeatServices, metricGroup);
        this.jobManagerLeaderRetriever.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
        return new TaskExecutorTestingContext(jobMasterGateway, taskSlotTable, taskExecutor, changelogStoragesManager, metricGroup, offerSlotsLatch);
    }

    private void requestSlot(TaskExecutorGateway gateway, JobID jobId, AllocationID allocationId, SlotID slotId, ResourceProfile profile, String address, ResourceManagerId token) throws InterruptedException, ExecutionException {
        gateway.requestSlot(slotId, jobId, allocationId, profile, address, token, timeout).get();
    }

    private SlotID buildSlotID(int slotIndex) {
        return new SlotID(this.unresolvedTaskManagerLocation.getResourceID(), slotIndex);
    }

    private static <T> T callInMain(TaskExecutorTestingContext ctx, Callable<T> booleanCallable) throws InterruptedException, ExecutionException {
        return ctx.taskExecutor.getMainThreadExecutableForTesting().callAsync(booleanCallable, Duration.ofSeconds(5L)).get();
    }

    @Test
    public void testSharedResourcesLifecycle() throws Exception {
        SharedResourceCollectingInvokable.reset();
        AllocationID[] slots = (AllocationID[])IntStream.range(0, 10).mapToObj(i -> new AllocationID()).toArray(AllocationID[]::new);
        try (TaskExecutorTestingContext ctx = this.createTaskExecutorTestingContext((TaskSlotTable<Task>)TaskSlotUtils.createTaskSlotTable(slots.length, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()), HeartbeatServices.noOp());){
            ctx.start();
            ResourceManagerId rmId = this.getResourceManagerId();
            TaskExecutorGateway taskGateway = (TaskExecutorGateway)ctx.taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            for (int i2 = 0; i2 < slots.length; ++i2) {
                this.requestSlot(taskGateway, this.jobId, slots[i2], new SlotID(ctx.taskExecutor.getResourceID(), i2), ResourceProfile.UNKNOWN, ctx.jobMasterGateway.getAddress(), rmId);
            }
            ctx.offerSlotsLatch.await();
            ArrayList<ExecutionAttemptID> executions = new ArrayList<ExecutionAttemptID>(slots.length);
            for (AllocationID allocationID : slots) {
                executions.add(this.submit(allocationID, ctx.jobMasterGateway, taskGateway, SharedResourceCollectingInvokable.class));
            }
            this.waitForTasks(ctx, numTasks -> numTasks < slots.length);
            for (int i3 = 0; i3 < executions.size(); ++i3) {
                int numRemaining = slots.length - (i3 + 1);
                ctx.taskExecutor.cancelTask((ExecutionAttemptID)executions.get(i3), timeout).get();
                this.waitForTasks(ctx, numTasks -> numTasks > numRemaining);
                if (numRemaining <= 0) continue;
                Assert.assertEquals((long)0L, (long)SharedResourceCollectingInvokable.timesDeallocated.get());
            }
        }
        Assert.assertEquals((long)1L, (long)SharedResourceCollectingInvokable.timesAllocated.get());
        Assert.assertEquals((long)1L, (long)SharedResourceCollectingInvokable.timesDeallocated.get());
    }

    private void waitForTasks(TaskExecutorTestingContext ctx, Function<Integer, Boolean> waitPredicate) throws InterruptedException, ExecutionException {
        while (TaskExecutorTest.callInMain(ctx, () -> (Boolean)waitPredicate.apply(Iterators.size((Iterator)ctx.taskSlotTable.getTasks(this.jobId)))).booleanValue()) {
            Thread.sleep(50L);
        }
    }

    private ResourceManagerId getResourceManagerId() throws InterruptedException, ExecutionException {
        CompletableFuture<Tuple3<ResourceID, InstanceID, SlotReport>> initialSlotReportFuture = new CompletableFuture<Tuple3<ResourceID, InstanceID, SlotReport>>();
        ResourceManagerId rmId = this.createAndRegisterResourceManager(initialSlotReportFuture);
        initialSlotReportFuture.get();
        return rmId;
    }

    private static /* synthetic */ CompletableFuture lambda$testSlotOfferResponseWithPendingSlotOffer$27(MultiShotLatch offerSlotsLatch, Queue slotOfferResponses, ResourceID resourceID, Collection slotOffers) {
        offerSlotsLatch.trigger();
        return (CompletableFuture)slotOfferResponses.remove();
    }

    public static class SharedResourceCollectingInvokable
    implements TaskInvokable {
        private static final String RESOURCE_ID = "test";
        private static final AtomicInteger timesAllocated = new AtomicInteger(0);
        private static final AtomicInteger timesDeallocated = new AtomicInteger(0);
        private final Environment env;
        private final Object leaseHolder;
        private volatile boolean cancelled = false;

        public static void reset() {
            timesAllocated.set(0);
            timesDeallocated.set(0);
        }

        public SharedResourceCollectingInvokable(Environment env) {
            this.env = env;
            this.leaseHolder = new Object();
        }

        public void invoke() throws Exception {
            this.env.getSharedResources().getOrAllocateSharedResource(RESOURCE_ID, this.leaseHolder, unused -> {
                timesAllocated.incrementAndGet();
                return timesDeallocated::incrementAndGet;
            }, 0L);
            while (!this.cancelled) {
                Thread.sleep(50L);
            }
        }

        public void restore() throws Exception {
        }

        public void cleanUp(@Nullable Throwable throwable) throws Exception {
            this.env.getSharedResources().release(RESOURCE_ID, this.leaseHolder, unused -> {});
        }

        public void cancel() throws Exception {
            this.cancelled = true;
        }

        public boolean isUsingNonBlockingInput() {
            return false;
        }

        public void maybeInterruptOnCancel(Thread toInterrupt, @Nullable String taskName, @Nullable Long timeout) {
        }
    }

    private static final class ActivateSlotNotifyingTaskSlotTable
    extends TaskSlotTableImpl<Task> {
        private final CountDownLatch slotsToActivate;

        private ActivateSlotNotifyingTaskSlotTable(int numberOfDefaultSlots, CountDownLatch slotsToActivate) {
            super(numberOfDefaultSlots, TaskSlotUtils.createTotalResourceProfile(numberOfDefaultSlots), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, 4096, TaskSlotUtils.createDefaultTimerService(timeout.toMilliseconds()), (Executor)Executors.newDirectExecutorService());
            this.slotsToActivate = slotsToActivate;
        }

        public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException {
            boolean result = super.markSlotActive(allocationId);
            if (result) {
                this.slotsToActivate.countDown();
            }
            return result;
        }
    }

    private static final class AllocateSlotNotifyingTaskSlotTable
    extends TaskSlotTableImpl<Task> {
        private final OneShotLatch allocateSlotLatch;

        private AllocateSlotNotifyingTaskSlotTable(OneShotLatch allocateSlotLatch) {
            super(1, TaskSlotUtils.createTotalResourceProfile(1), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, 4096, TaskSlotUtils.createDefaultTimerService(timeout.toMilliseconds()), (Executor)Executors.newDirectExecutorService());
            this.allocateSlotLatch = allocateSlotLatch;
        }

        public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) {
            boolean result = super.allocateSlot(index, jobId, allocationId, slotTimeout);
            this.allocateSlotLatch.trigger();
            return result;
        }

        public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile, Time slotTimeout) {
            boolean result = super.allocateSlot(index, jobId, allocationId, resourceProfile, slotTimeout);
            this.allocateSlotLatch.trigger();
            return result;
        }
    }

    private class TaskExecutorTestingContext
    implements AutoCloseable {
        private final TestingJobMasterGateway jobMasterGateway;
        private final TaskSlotTable taskSlotTable;
        private final TestingTaskExecutor taskExecutor;
        private final TaskExecutorStateChangelogStoragesManager changelogStoragesManager;
        private final TaskManagerMetricGroup metricGroup;
        private final OneShotLatch offerSlotsLatch;

        private TaskExecutorTestingContext(TestingJobMasterGateway jobMasterGateway, TaskSlotTable taskSlotTable, TestingTaskExecutor taskExecutor, TaskExecutorStateChangelogStoragesManager changelogStoragesManager, TaskManagerMetricGroup metricGroup, OneShotLatch offerSlotsLatch) {
            this.jobMasterGateway = jobMasterGateway;
            this.taskSlotTable = taskSlotTable;
            this.taskExecutor = taskExecutor;
            this.changelogStoragesManager = changelogStoragesManager;
            this.metricGroup = metricGroup;
            this.offerSlotsLatch = offerSlotsLatch;
        }

        private void start() {
            this.taskExecutor.start();
            this.taskExecutor.waitUntilStarted();
        }

        @Override
        public void close() throws ExecutionException, InterruptedException, TimeoutException {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{this.taskExecutor});
        }
    }

    private static enum ResponseOrder {
        ACCEPT_THEN_REJECT,
        REJECT_THEN_ACCEPT;

    }
}

