/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.scheduler.DeploymentHandle;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
import org.apache.flink.runtime.scheduler.ExecutionVertexOperations;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.runtime.scheduler.SlotExecutionVertexAssignment;
import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.topology.Vertex;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

public class DefaultScheduler
extends SchedulerBase
implements SchedulerOperations {
    private final Logger log;
    private final ClassLoader userCodeLoader;
    private final ExecutionSlotAllocator executionSlotAllocator;
    private final ExecutionFailureHandler executionFailureHandler;
    private final ScheduledExecutor delayExecutor;
    private final SchedulingStrategy schedulingStrategy;
    private final ExecutionVertexOperations executionVertexOperations;
    private final Set<ExecutionVertexID> verticesWaitingForRestart;

    DefaultScheduler(Logger log, JobGraph jobGraph, Executor ioExecutor, Configuration jobMasterConfiguration, Consumer<ComponentMainThreadExecutor> startUpAction, ScheduledExecutor delayExecutor, ClassLoader userCodeLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, JobManagerJobMetricGroup jobManagerJobMetricGroup, SchedulingStrategyFactory schedulingStrategyFactory, FailoverStrategy.Factory failoverStrategyFactory, RestartBackoffTimeStrategy restartBackoffTimeStrategy, ExecutionVertexOperations executionVertexOperations, ExecutionVertexVersioner executionVertexVersioner, ExecutionSlotAllocatorFactory executionSlotAllocatorFactory, long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, JobStatusListener jobStatusListener, ExecutionGraphFactory executionGraphFactory) throws Exception {
        super(log, jobGraph, ioExecutor, jobMasterConfiguration, userCodeLoader, checkpointRecoveryFactory, jobManagerJobMetricGroup, executionVertexVersioner, initializationTimestamp, mainThreadExecutor, jobStatusListener, executionGraphFactory);
        this.log = log;
        this.delayExecutor = (ScheduledExecutor)Preconditions.checkNotNull((Object)delayExecutor);
        this.userCodeLoader = (ClassLoader)Preconditions.checkNotNull((Object)userCodeLoader);
        this.executionVertexOperations = (ExecutionVertexOperations)Preconditions.checkNotNull((Object)executionVertexOperations);
        FailoverStrategy failoverStrategy = failoverStrategyFactory.create(this.getSchedulingTopology(), this.getResultPartitionAvailabilityChecker());
        log.info("Using failover strategy {} for {} ({}).", new Object[]{failoverStrategy, jobGraph.getName(), jobGraph.getJobID()});
        this.executionFailureHandler = new ExecutionFailureHandler(this.getSchedulingTopology(), failoverStrategy, restartBackoffTimeStrategy);
        this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, this.getSchedulingTopology());
        this.executionSlotAllocator = ((ExecutionSlotAllocatorFactory)Preconditions.checkNotNull((Object)executionSlotAllocatorFactory)).createInstance(new DefaultExecutionSlotAllocationContext());
        this.verticesWaitingForRestart = new HashSet<ExecutionVertexID>();
        startUpAction.accept(mainThreadExecutor);
    }

    @Override
    protected long getNumberOfRestarts() {
        return this.executionFailureHandler.getNumberOfRestarts();
    }

    @Override
    protected void cancelAllPendingSlotRequestsInternal() {
        IterableUtils.toStream(this.getSchedulingTopology().getVertices()).map(Vertex::getId).forEach(this.executionSlotAllocator::cancel);
    }

    @Override
    protected void startSchedulingInternal() {
        this.log.info("Starting scheduling with scheduling strategy [{}]", (Object)this.schedulingStrategy.getClass().getName());
        this.transitionToRunning();
        this.schedulingStrategy.startScheduling();
    }

    @Override
    protected void updateTaskExecutionStateInternal(ExecutionVertexID executionVertexId, TaskExecutionStateTransition taskExecutionState) {
        this.schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState());
        this.maybeHandleTaskFailure(taskExecutionState, executionVertexId);
    }

    private void maybeHandleTaskFailure(TaskExecutionStateTransition taskExecutionState, ExecutionVertexID executionVertexId) {
        if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) {
            Throwable error = taskExecutionState.getError(this.userCodeLoader);
            this.handleTaskFailure(executionVertexId, error);
        }
    }

    private void handleTaskFailure(ExecutionVertexID executionVertexId, @Nullable Throwable error) {
        long timestamp = System.currentTimeMillis();
        this.setGlobalFailureCause(error, timestamp);
        this.notifyCoordinatorsAboutTaskFailure(executionVertexId, error);
        FailureHandlingResult failureHandlingResult = this.executionFailureHandler.getFailureHandlingResult(executionVertexId, error, timestamp);
        this.maybeRestartTasks(failureHandlingResult);
    }

    private void notifyCoordinatorsAboutTaskFailure(ExecutionVertexID executionVertexId, @Nullable Throwable error) {
        ExecutionJobVertex jobVertex = this.getExecutionJobVertex(executionVertexId.getJobVertexId());
        int subtaskIndex = executionVertexId.getSubtaskIndex();
        jobVertex.getOperatorCoordinators().forEach(c -> c.subtaskFailed(subtaskIndex, error));
    }

    @Override
    public void handleGlobalFailure(Throwable error) {
        long timestamp = System.currentTimeMillis();
        this.setGlobalFailureCause(error, timestamp);
        this.log.info("Trying to recover from a global failure.", error);
        FailureHandlingResult failureHandlingResult = this.executionFailureHandler.getGlobalFailureHandlingResult(error, timestamp);
        this.maybeRestartTasks(failureHandlingResult);
    }

    private void maybeRestartTasks(FailureHandlingResult failureHandlingResult) {
        if (failureHandlingResult.canRestart()) {
            this.restartTasksWithDelay(failureHandlingResult);
        } else {
            this.failJob(failureHandlingResult.getError(), failureHandlingResult.getTimestamp());
        }
    }

    private void restartTasksWithDelay(FailureHandlingResult failureHandlingResult) {
        Set<ExecutionVertexID> verticesToRestart = failureHandlingResult.getVerticesToRestart();
        HashSet<ExecutionVertexVersion> executionVertexVersions = new HashSet<ExecutionVertexVersion>(this.executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
        boolean globalRecovery = failureHandlingResult.isGlobalFailure();
        this.addVerticesToRestartPending(verticesToRestart);
        CompletableFuture<?> cancelFuture = this.cancelTasksAsync(verticesToRestart);
        FailureHandlingResultSnapshot failureHandlingResultSnapshot = FailureHandlingResultSnapshot.create(failureHandlingResult, id -> this.getExecutionVertex((ExecutionVertexID)id).getCurrentExecutionAttempt());
        this.delayExecutor.schedule(() -> FutureUtils.assertNoException(cancelFuture.thenRunAsync(() -> {
            this.archiveFromFailureHandlingResult(failureHandlingResultSnapshot);
            this.restartTasks(executionVertexVersions, globalRecovery);
        }, this.getMainThreadExecutor())), failureHandlingResult.getRestartDelayMS(), TimeUnit.MILLISECONDS);
    }

    private void addVerticesToRestartPending(Set<ExecutionVertexID> verticesToRestart) {
        this.verticesWaitingForRestart.addAll(verticesToRestart);
        this.transitionExecutionGraphState(JobStatus.RUNNING, JobStatus.RESTARTING);
    }

    private void removeVerticesFromRestartPending(Set<ExecutionVertexID> verticesToRestart) {
        this.verticesWaitingForRestart.removeAll(verticesToRestart);
        if (this.verticesWaitingForRestart.isEmpty()) {
            this.transitionExecutionGraphState(JobStatus.RESTARTING, JobStatus.RUNNING);
        }
    }

    private void restartTasks(Set<ExecutionVertexVersion> executionVertexVersions, boolean isGlobalRecovery) {
        Set<ExecutionVertexID> verticesToRestart = this.executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
        this.removeVerticesFromRestartPending(verticesToRestart);
        this.resetForNewExecutions(verticesToRestart);
        try {
            this.restoreState(verticesToRestart, isGlobalRecovery);
        }
        catch (Throwable t) {
            this.handleGlobalFailure(t);
            return;
        }
        this.schedulingStrategy.restartTasks(verticesToRestart);
    }

    private CompletableFuture<?> cancelTasksAsync(Set<ExecutionVertexID> verticesToRestart) {
        List cancelFutures = verticesToRestart.stream().map(this::cancelExecutionVertex).collect(Collectors.toList());
        return FutureUtils.combineAll(cancelFutures);
    }

    private CompletableFuture<?> cancelExecutionVertex(ExecutionVertexID executionVertexId) {
        ExecutionVertex vertex = this.getExecutionVertex(executionVertexId);
        this.notifyCoordinatorOfCancellation(vertex);
        this.executionSlotAllocator.cancel(executionVertexId);
        return this.executionVertexOperations.cancel(vertex);
    }

    @Override
    protected void notifyPartitionDataAvailableInternal(IntermediateResultPartitionID partitionId) {
        this.schedulingStrategy.onPartitionConsumable(partitionId);
    }

    @Override
    public void allocateSlotsAndDeploy(List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
        this.validateDeploymentOptions(executionVertexDeploymentOptions);
        Map<ExecutionVertexID, ExecutionVertexDeploymentOption> deploymentOptionsByVertex = DefaultScheduler.groupDeploymentOptionsByVertexId(executionVertexDeploymentOptions);
        List<ExecutionVertexID> verticesToDeploy = executionVertexDeploymentOptions.stream().map(ExecutionVertexDeploymentOption::getExecutionVertexId).collect(Collectors.toList());
        Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex = this.executionVertexVersioner.recordVertexModifications(verticesToDeploy);
        this.transitionToScheduled(verticesToDeploy);
        List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments = this.allocateSlots(executionVertexDeploymentOptions);
        List<DeploymentHandle> deploymentHandles = DefaultScheduler.createDeploymentHandles(requiredVersionByVertex, deploymentOptionsByVertex, slotExecutionVertexAssignments);
        this.waitForAllSlotsAndDeploy(deploymentHandles);
    }

    private void validateDeploymentOptions(Collection<ExecutionVertexDeploymentOption> deploymentOptions) {
        deploymentOptions.stream().map(ExecutionVertexDeploymentOption::getExecutionVertexId).map(this::getExecutionVertex).forEach(v -> Preconditions.checkState((v.getExecutionState() == ExecutionState.CREATED ? 1 : 0) != 0, (String)"expected vertex %s to be in CREATED state, was: %s", (Object[])new Object[]{v.getID(), v.getExecutionState()}));
    }

    private static Map<ExecutionVertexID, ExecutionVertexDeploymentOption> groupDeploymentOptionsByVertexId(Collection<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
        return executionVertexDeploymentOptions.stream().collect(Collectors.toMap(ExecutionVertexDeploymentOption::getExecutionVertexId, Function.identity()));
    }

    private List<SlotExecutionVertexAssignment> allocateSlots(List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
        return this.executionSlotAllocator.allocateSlotsFor(executionVertexDeploymentOptions.stream().map(ExecutionVertexDeploymentOption::getExecutionVertexId).collect(Collectors.toList()));
    }

    private static List<DeploymentHandle> createDeploymentHandles(Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex, Map<ExecutionVertexID, ExecutionVertexDeploymentOption> deploymentOptionsByVertex, List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments) {
        return slotExecutionVertexAssignments.stream().map(slotExecutionVertexAssignment -> {
            ExecutionVertexID executionVertexId = slotExecutionVertexAssignment.getExecutionVertexId();
            return new DeploymentHandle((ExecutionVertexVersion)requiredVersionByVertex.get(executionVertexId), (ExecutionVertexDeploymentOption)deploymentOptionsByVertex.get(executionVertexId), (SlotExecutionVertexAssignment)slotExecutionVertexAssignment);
        }).collect(Collectors.toList());
    }

    private void waitForAllSlotsAndDeploy(List<DeploymentHandle> deploymentHandles) {
        FutureUtils.assertNoException(this.assignAllResources(deploymentHandles).handle(this.deployAll(deploymentHandles)));
    }

    private CompletableFuture<Void> assignAllResources(List<DeploymentHandle> deploymentHandles) {
        ArrayList<CompletionStage> slotAssignedFutures = new ArrayList<CompletionStage>();
        for (DeploymentHandle deploymentHandle : deploymentHandles) {
            CompletionStage slotAssigned = deploymentHandle.getSlotExecutionVertexAssignment().getLogicalSlotFuture().handle(this.assignResourceOrHandleError(deploymentHandle));
            slotAssignedFutures.add(slotAssigned);
        }
        return FutureUtils.waitForAll(slotAssignedFutures);
    }

    private BiFunction<Void, Throwable, Void> deployAll(List<DeploymentHandle> deploymentHandles) {
        return (ignored, throwable) -> {
            DefaultScheduler.propagateIfNonNull(throwable);
            for (DeploymentHandle deploymentHandle : deploymentHandles) {
                SlotExecutionVertexAssignment slotExecutionVertexAssignment = deploymentHandle.getSlotExecutionVertexAssignment();
                CompletableFuture<LogicalSlot> slotAssigned = slotExecutionVertexAssignment.getLogicalSlotFuture();
                Preconditions.checkState((boolean)slotAssigned.isDone());
                FutureUtils.assertNoException(slotAssigned.handle(this.deployOrHandleError(deploymentHandle)));
            }
            return null;
        };
    }

    private static void propagateIfNonNull(Throwable throwable) {
        if (throwable != null) {
            throw new CompletionException(throwable);
        }
    }

    private BiFunction<LogicalSlot, Throwable, Void> assignResourceOrHandleError(DeploymentHandle deploymentHandle) {
        ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion();
        ExecutionVertexID executionVertexId = deploymentHandle.getExecutionVertexId();
        return (logicalSlot, throwable) -> {
            if (this.executionVertexVersioner.isModified(requiredVertexVersion)) {
                this.log.debug("Refusing to assign slot to execution vertex {} because this deployment was superseded by another deployment", (Object)executionVertexId);
                this.releaseSlotIfPresent((LogicalSlot)logicalSlot);
                return null;
            }
            if (throwable == null) {
                ExecutionVertex executionVertex = this.getExecutionVertex(executionVertexId);
                boolean notifyPartitionDataAvailable = deploymentHandle.getDeploymentOption().notifyPartitionDataAvailable();
                executionVertex.getCurrentExecutionAttempt().registerProducedPartitions(logicalSlot.getTaskManagerLocation(), notifyPartitionDataAvailable);
                executionVertex.tryAssignResource((LogicalSlot)logicalSlot);
            } else {
                this.handleTaskDeploymentFailure(executionVertexId, DefaultScheduler.maybeWrapWithNoResourceAvailableException(throwable));
            }
            return null;
        };
    }

    private void releaseSlotIfPresent(@Nullable LogicalSlot logicalSlot) {
        if (logicalSlot != null) {
            logicalSlot.releaseSlot(null);
        }
    }

    private void handleTaskDeploymentFailure(ExecutionVertexID executionVertexId, Throwable error) {
        this.executionVertexOperations.markFailed(this.getExecutionVertex(executionVertexId), error);
    }

    private static Throwable maybeWrapWithNoResourceAvailableException(Throwable failure) {
        Throwable strippedThrowable = ExceptionUtils.stripCompletionException((Throwable)failure);
        if (strippedThrowable instanceof TimeoutException) {
            return new NoResourceAvailableException("Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources.", failure);
        }
        return failure;
    }

    private BiFunction<Object, Throwable, Void> deployOrHandleError(DeploymentHandle deploymentHandle) {
        ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion();
        ExecutionVertexID executionVertexId = requiredVertexVersion.getExecutionVertexId();
        return (ignored, throwable) -> {
            if (this.executionVertexVersioner.isModified(requiredVertexVersion)) {
                this.log.debug("Refusing to deploy execution vertex {} because this deployment was superseded by another deployment", (Object)executionVertexId);
                return null;
            }
            if (throwable == null) {
                this.deployTaskSafe(executionVertexId);
            } else {
                this.handleTaskDeploymentFailure(executionVertexId, (Throwable)throwable);
            }
            return null;
        };
    }

    private void deployTaskSafe(ExecutionVertexID executionVertexId) {
        try {
            ExecutionVertex executionVertex = this.getExecutionVertex(executionVertexId);
            this.executionVertexOperations.deploy(executionVertex);
        }
        catch (Throwable e) {
            this.handleTaskDeploymentFailure(executionVertexId, e);
        }
    }

    private void notifyCoordinatorOfCancellation(ExecutionVertex vertex) {
        ExecutionState currentState = vertex.getExecutionState();
        if (currentState == ExecutionState.FAILED || currentState == ExecutionState.CANCELING || currentState == ExecutionState.CANCELED) {
            return;
        }
        for (OperatorCoordinatorHolder coordinator : vertex.getJobVertex().getOperatorCoordinators()) {
            coordinator.subtaskFailed(vertex.getParallelSubtaskIndex(), null);
        }
    }

    private class DefaultExecutionSlotAllocationContext
    implements ExecutionSlotAllocationContext {
        private DefaultExecutionSlotAllocationContext() {
        }

        @Override
        public ResourceProfile getResourceProfile(ExecutionVertexID executionVertexId) {
            return DefaultScheduler.this.getExecutionVertex(executionVertexId).getResourceProfile();
        }

        @Override
        public AllocationID getPriorAllocationId(ExecutionVertexID executionVertexId) {
            return DefaultScheduler.this.getExecutionVertex(executionVertexId).getLatestPriorAllocation();
        }

        @Override
        public SchedulingTopology getSchedulingTopology() {
            return DefaultScheduler.this.getSchedulingTopology();
        }

        @Override
        public Set<SlotSharingGroup> getLogicalSlotSharingGroups() {
            return DefaultScheduler.this.getJobGraph().getSlotSharingGroups();
        }

        @Override
        public Set<CoLocationGroup> getCoLocationGroups() {
            return DefaultScheduler.this.getJobGraph().getCoLocationGroups();
        }

        @Override
        public Collection<Collection<ExecutionVertexID>> getConsumedResultPartitionsProducers(ExecutionVertexID executionVertexId) {
            return DefaultScheduler.this.inputsLocationsRetriever.getConsumedResultPartitionsProducers(executionVertexId);
        }

        @Override
        public Optional<CompletableFuture<TaskManagerLocation>> getTaskManagerLocation(ExecutionVertexID executionVertexId) {
            return DefaultScheduler.this.inputsLocationsRetriever.getTaskManagerLocation(executionVertexId);
        }

        @Override
        public Optional<TaskManagerLocation> getStateLocation(ExecutionVertexID executionVertexId) {
            return DefaultScheduler.this.stateLocationRetriever.getStateLocation(executionVertexId);
        }
    }
}

