/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentState;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.TaskManagerRegistrationInformation;
import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

public class JobMasterExecutionDeploymentReconciliationTest
extends TestLogger {
    private static final Time testingTimeout = Time.seconds((long)10L);
    private final HeartbeatServices heartbeatServices = new HeartbeatServices(Integer.MAX_VALUE, Integer.MAX_VALUE);
    private final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    private final SettableLeaderRetrievalService resourceManagerLeaderRetriever = new SettableLeaderRetrievalService();
    private final TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
    @ClassRule
    public static final TestingRpcServiceResource RPC_SERVICE_RESOURCE = new TestingRpcServiceResource();
    @Rule
    public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();

    @Before
    public void setup() {
        this.haServices.setResourceManagerLeaderRetriever(this.resourceManagerLeaderRetriever);
        this.haServices.setResourceManagerLeaderElectionService(this.resourceManagerLeaderElectionService);
        this.haServices.setCheckpointRecoveryFactory((CheckpointRecoveryFactory)new StandaloneCheckpointRecoveryFactory());
    }

    @Test
    public void testExecutionDeploymentReconciliation() throws Exception {
        JobMasterBuilder.TestingOnCompletionActions onCompletionActions = new JobMasterBuilder.TestingOnCompletionActions();
        TestingExecutionDeploymentTrackerWrapper deploymentTrackerWrapper = new TestingExecutionDeploymentTrackerWrapper();
        JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        try (JobMaster jobMaster = this.createAndStartJobMaster(onCompletionActions, deploymentTrackerWrapper, jobGraph);){
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            RPC_SERVICE_RESOURCE.getTestingRpcService().registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
            CompletableFuture<ExecutionAttemptID> taskCancellationFuture = new CompletableFuture<ExecutionAttemptID>();
            TaskExecutorGateway taskExecutorGateway = this.createTaskExecutorGateway(taskCancellationFuture);
            LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
            this.registerTaskExecutorAndOfferSlots(jobMasterGateway, jobGraph.getJobID(), taskExecutorGateway, localUnresolvedTaskManagerLocation);
            ExecutionAttemptID deployedExecution = deploymentTrackerWrapper.getTaskDeploymentFuture().get();
            Assert.assertFalse((boolean)taskCancellationFuture.isDone());
            ExecutionAttemptID unknownDeployment = ExecutionGraphTestUtils.createExecutionAttemptId();
            jobMasterGateway.heartbeatFromTaskManager(localUnresolvedTaskManagerLocation.getResourceID(), new TaskExecutorToJobManagerHeartbeatPayload(new AccumulatorReport(Collections.emptyList()), new ExecutionDeploymentReport(Collections.singleton(unknownDeployment))));
            Assert.assertThat((Object)taskCancellationFuture.get(), (Matcher)Is.is((Object)unknownDeployment));
            Assert.assertThat((Object)deploymentTrackerWrapper.getStopFuture().get(), (Matcher)Is.is((Object)deployedExecution));
            Assert.assertThat((Object)onCompletionActions.getJobReachedGloballyTerminalStateFuture().get().getArchivedExecutionGraph().getState(), (Matcher)Is.is((Object)JobStatus.FAILED));
        }
    }

    @Test
    public void testExecutionDeploymentReconciliationForPendingExecution() throws Exception {
        TestingExecutionDeploymentTrackerWrapper deploymentTrackerWrapper = new TestingExecutionDeploymentTrackerWrapper();
        JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        try (JobMaster jobMaster = this.createAndStartJobMaster(deploymentTrackerWrapper, jobGraph);){
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            RPC_SERVICE_RESOURCE.getTestingRpcService().registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
            CompletableFuture<ExecutionAttemptID> taskSubmissionFuture = new CompletableFuture<ExecutionAttemptID>();
            CompletableFuture<ExecutionAttemptID> taskCancellationFuture = new CompletableFuture<ExecutionAttemptID>();
            CompletableFuture<Acknowledge> taskSubmissionAcknowledgeFuture = new CompletableFuture<Acknowledge>();
            TaskExecutorGateway taskExecutorGateway = this.createTaskExecutorGateway(taskCancellationFuture, taskSubmissionFuture, taskSubmissionAcknowledgeFuture);
            LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
            this.registerTaskExecutorAndOfferSlots(jobMasterGateway, jobGraph.getJobID(), taskExecutorGateway, localUnresolvedTaskManagerLocation);
            ExecutionAttemptID pendingExecutionId = taskSubmissionFuture.get();
            jobMasterGateway.heartbeatFromTaskManager(localUnresolvedTaskManagerLocation.getResourceID(), new TaskExecutorToJobManagerHeartbeatPayload(new AccumulatorReport(Collections.emptyList()), new ExecutionDeploymentReport(Collections.singleton(pendingExecutionId))));
            taskSubmissionAcknowledgeFuture.complete(Acknowledge.get());
            deploymentTrackerWrapper.getTaskDeploymentFuture().get();
            Assert.assertFalse((boolean)taskCancellationFuture.isDone());
        }
    }

    private JobMaster createAndStartJobMaster(ExecutionDeploymentTracker executionDeploymentTracker, JobGraph jobGraph) throws Exception {
        return this.createAndStartJobMaster(new JobMasterBuilder.TestingOnCompletionActions(), executionDeploymentTracker, jobGraph);
    }

    private JobMaster createAndStartJobMaster(OnCompletionActions onCompletionActions, ExecutionDeploymentTracker executionDeploymentTracker, JobGraph jobGraph) throws Exception {
        JobMaster jobMaster = new JobMasterBuilder(jobGraph, RPC_SERVICE_RESOURCE.getTestingRpcService()).withFatalErrorHandler(this.testingFatalErrorHandlerResource.getFatalErrorHandler()).withHighAvailabilityServices(this.haServices).withHeartbeatServices(this.heartbeatServices).withExecutionDeploymentTracker(executionDeploymentTracker).withOnCompletionActions(onCompletionActions).createJobMaster();
        jobMaster.start();
        return jobMaster;
    }

    private TaskExecutorGateway createTaskExecutorGateway(CompletableFuture<ExecutionAttemptID> taskCancellationFuture) {
        return this.createTaskExecutorGateway(taskCancellationFuture, new CompletableFuture<ExecutionAttemptID>(), CompletableFuture.completedFuture(Acknowledge.get()));
    }

    private TaskExecutorGateway createTaskExecutorGateway(CompletableFuture<ExecutionAttemptID> taskCancellationFuture, CompletableFuture<ExecutionAttemptID> taskSubmissionFuture, CompletableFuture<Acknowledge> taskSubmissionResponse) {
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString()).setCancelTaskFunction(executionAttemptId -> {
            taskCancellationFuture.complete((ExecutionAttemptID)executionAttemptId);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setSubmitTaskConsumer((tdd, ignored) -> {
            taskSubmissionFuture.complete(tdd.getExecutionAttemptId());
            return taskSubmissionResponse;
        }).createTestingTaskExecutorGateway();
        RPC_SERVICE_RESOURCE.getTestingRpcService().registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        return taskExecutorGateway;
    }

    private void registerTaskExecutorAndOfferSlots(JobMasterGateway jobMasterGateway, JobID jobId, TaskExecutorGateway taskExecutorGateway, UnresolvedTaskManagerLocation taskManagerLocation) throws ExecutionException, InterruptedException {
        jobMasterGateway.registerTaskManager(jobId, TaskManagerRegistrationInformation.create((String)taskExecutorGateway.getAddress(), (UnresolvedTaskManagerLocation)taskManagerLocation, (UUID)TestingUtils.zeroUUID()), testingTimeout).get();
        Set<SlotOffer> slotOffers = Collections.singleton(new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY));
        jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), slotOffers, testingTimeout).get();
    }

    private static class TestingExecutionDeploymentTrackerWrapper
    implements ExecutionDeploymentTracker {
        private final ExecutionDeploymentTracker originalTracker;
        private final CompletableFuture<ExecutionAttemptID> taskDeploymentFuture;
        private final CompletableFuture<ExecutionAttemptID> stopFuture;

        private TestingExecutionDeploymentTrackerWrapper() {
            this((ExecutionDeploymentTracker)new DefaultExecutionDeploymentTracker());
        }

        private TestingExecutionDeploymentTrackerWrapper(ExecutionDeploymentTracker originalTracker) {
            this.originalTracker = originalTracker;
            this.taskDeploymentFuture = new CompletableFuture();
            this.stopFuture = new CompletableFuture();
        }

        public void startTrackingPendingDeploymentOf(ExecutionAttemptID executionAttemptId, ResourceID host) {
            this.originalTracker.startTrackingPendingDeploymentOf(executionAttemptId, host);
        }

        public void completeDeploymentOf(ExecutionAttemptID executionAttemptId) {
            this.originalTracker.completeDeploymentOf(executionAttemptId);
            this.taskDeploymentFuture.complete(executionAttemptId);
        }

        public void stopTrackingDeploymentOf(ExecutionAttemptID executionAttemptId) {
            this.originalTracker.stopTrackingDeploymentOf(executionAttemptId);
            this.stopFuture.complete(executionAttemptId);
        }

        public Map<ExecutionAttemptID, ExecutionDeploymentState> getExecutionsOn(ResourceID host) {
            return this.originalTracker.getExecutionsOn(host);
        }

        public CompletableFuture<ExecutionAttemptID> getTaskDeploymentFuture() {
            return this.taskDeploymentFuture;
        }

        public CompletableFuture<ExecutionAttemptID> getStopFuture() {
            return this.stopFuture;
        }
    }
}

