/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.GlobalFailureHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.Executing;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import org.apache.flink.runtime.scheduler.adaptive.State;
import org.apache.flink.runtime.scheduler.adaptive.StateFactory;
import org.apache.flink.runtime.scheduler.adaptive.StateTransitions;
import org.apache.flink.runtime.scheduler.adaptive.StateWithoutExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;

public class CreatingExecutionGraph
extends StateWithoutExecutionGraph {
    private final Context context;
    private final OperatorCoordinatorHandlerFactory operatorCoordinatorHandlerFactory;
    @Nullable
    private final ExecutionGraph previousExecutionGraph;

    public CreatingExecutionGraph(Context context, CompletableFuture<ExecutionGraphWithVertexParallelism> executionGraphWithParallelismFuture, Logger logger, OperatorCoordinatorHandlerFactory operatorCoordinatorFactory, @Nullable ExecutionGraph previousExecutionGraph) {
        super(context, logger);
        this.context = context;
        this.operatorCoordinatorHandlerFactory = operatorCoordinatorFactory;
        FutureUtils.assertNoException((CompletableFuture)executionGraphWithParallelismFuture.handle((executionGraphWithVertexParallelism, throwable) -> {
            context.runIfState(this, () -> this.handleExecutionGraphCreation((ExecutionGraphWithVertexParallelism)executionGraphWithVertexParallelism, (Throwable)throwable), Duration.ZERO);
            return null;
        }));
        this.previousExecutionGraph = previousExecutionGraph;
    }

    private void handleExecutionGraphCreation(@Nullable ExecutionGraphWithVertexParallelism executionGraphWithVertexParallelism, @Nullable Throwable throwable) {
        if (throwable != null) {
            this.getLogger().info("Failed to go from {} to {} because the ExecutionGraph creation failed.", new Object[]{CreatingExecutionGraph.class.getSimpleName(), Executing.class.getSimpleName(), throwable});
            this.context.goToFinished(this.context.getArchivedExecutionGraph(JobStatus.FAILED, throwable));
        } else {
            for (ExecutionVertex vertex : executionGraphWithVertexParallelism.executionGraph.getAllExecutionVertices()) {
                vertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
            }
            AssignmentResult result = this.context.tryToAssignSlots(executionGraphWithVertexParallelism);
            if (result.isSuccess()) {
                this.getLogger().debug("Successfully reserved and assigned the required slots for the ExecutionGraph.");
                ExecutionGraph executionGraph = result.getExecutionGraph();
                ExecutionGraphHandler executionGraphHandler = new ExecutionGraphHandler(executionGraph, this.getLogger(), this.context.getIOExecutor(), this.context.getMainThreadExecutor());
                OperatorCoordinatorHandler operatorCoordinatorHandler = this.operatorCoordinatorHandlerFactory.create(executionGraph, this.context);
                operatorCoordinatorHandler.initializeOperatorCoordinators(this.context.getMainThreadExecutor());
                JobPlanInfo.Plan updatedPlan = JsonPlanGenerator.generatePlan(executionGraph.getJobID(), executionGraph.getJobName(), JobType.STREAMING, () -> IterableUtils.toStream(executionGraph.getVerticesTopologically()).map(ExecutionJobVertex::getJobVertex).iterator(), executionGraphWithVertexParallelism.getVertexParallelism());
                executionGraph.setPlan(updatedPlan);
                executionGraph.transitionToRunning();
                operatorCoordinatorHandler.startAllOperatorCoordinators();
                this.context.goToExecuting(result.getExecutionGraph(), executionGraphHandler, operatorCoordinatorHandler, Collections.emptyList());
            } else {
                this.getLogger().debug("Failed to reserve and assign the required slots. Waiting for new resources.");
                this.context.goToWaitingForResources(this.previousExecutionGraph);
            }
        }
    }

    @Override
    public JobStatus getJobStatus() {
        return JobStatus.CREATED;
    }

    static class ExecutionGraphWithVertexParallelism {
        private final ExecutionGraph executionGraph;
        private final JobSchedulingPlan jobSchedulingPlan;

        private ExecutionGraphWithVertexParallelism(ExecutionGraph executionGraph, JobSchedulingPlan jobSchedulingPlan) {
            this.executionGraph = executionGraph;
            this.jobSchedulingPlan = jobSchedulingPlan;
        }

        public static ExecutionGraphWithVertexParallelism create(ExecutionGraph executionGraph, JobSchedulingPlan vertexParallelism) {
            return new ExecutionGraphWithVertexParallelism(executionGraph, vertexParallelism);
        }

        public ExecutionGraph getExecutionGraph() {
            return this.executionGraph;
        }

        public VertexParallelism getVertexParallelism() {
            return this.jobSchedulingPlan.getVertexParallelism();
        }

        public JobSchedulingPlan getJobSchedulingPlan() {
            return this.jobSchedulingPlan;
        }
    }

    static class Factory
    implements StateFactory<CreatingExecutionGraph> {
        private final Context context;
        private final CompletableFuture<ExecutionGraphWithVertexParallelism> executionGraphWithParallelismFuture;
        @Nullable
        private final ExecutionGraph previousExecutionGraph;
        private final Logger log;

        Factory(Context context, CompletableFuture<ExecutionGraphWithVertexParallelism> executionGraphWithParallelismFuture, Logger log, @Nullable ExecutionGraph previousExecutionGraph) {
            this.context = context;
            this.executionGraphWithParallelismFuture = executionGraphWithParallelismFuture;
            this.log = log;
            this.previousExecutionGraph = previousExecutionGraph;
        }

        @Override
        public Class<CreatingExecutionGraph> getStateClass() {
            return CreatingExecutionGraph.class;
        }

        @Override
        public CreatingExecutionGraph getState() {
            return new CreatingExecutionGraph(this.context, this.executionGraphWithParallelismFuture, this.log, DefaultOperatorCoordinatorHandler::new, this.previousExecutionGraph);
        }
    }

    static final class AssignmentResult {
        private static final AssignmentResult NOT_POSSIBLE = new AssignmentResult(null);
        @Nullable
        private final ExecutionGraph executionGraph;

        private AssignmentResult(@Nullable ExecutionGraph executionGraph) {
            this.executionGraph = executionGraph;
        }

        boolean isSuccess() {
            return this.executionGraph != null;
        }

        ExecutionGraph getExecutionGraph() {
            Preconditions.checkState((boolean)this.isSuccess(), (Object)"Can only return the ExecutionGraph if it is a success.");
            return this.executionGraph;
        }

        static AssignmentResult success(ExecutionGraph executionGraph) {
            return new AssignmentResult((ExecutionGraph)Preconditions.checkNotNull((Object)executionGraph, (String)"AssignmentResult.success expects a non-null ExecutionGraph."));
        }

        static AssignmentResult notPossible() {
            return NOT_POSSIBLE;
        }
    }

    @FunctionalInterface
    static interface OperatorCoordinatorHandlerFactory {
        public OperatorCoordinatorHandler create(ExecutionGraph var1, GlobalFailureHandler var2);
    }

    static interface Context
    extends StateWithoutExecutionGraph.Context,
    GlobalFailureHandler,
    StateTransitions.ToExecuting,
    StateTransitions.ToWaitingForResources {
        public ScheduledFuture<?> runIfState(State var1, Runnable var2, Duration var3);

        public AssignmentResult tryToAssignSlots(ExecutionGraphWithVertexParallelism var1);

        public Executor getIOExecutor();

        public ComponentMainThreadExecutor getMainThreadExecutor();

        public JobManagerJobMetricGroup getMetricGroup();
    }
}

