/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.MarkPartitionFinishedStrategy;
import org.apache.flink.runtime.executiongraph.SimpleInitializeOnMasterContext;
import org.apache.flink.runtime.executiongraph.VertexAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionGroupReleaseStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionGroupReleaseStrategyFactoryLoader;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLoader;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;

public class DefaultExecutionGraphBuilder {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static DefaultExecutionGraph buildGraph(JobGraph jobGraph, Configuration jobManagerConfig, ScheduledExecutorService futureExecutor, Executor ioExecutor, ClassLoader classLoader, CompletedCheckpointStore completedCheckpointStore, CheckpointsCleaner checkpointsCleaner, CheckpointIDCounter checkpointIdCounter, Time rpcTimeout, BlobWriter blobWriter, Logger log, ShuffleMaster<?> shuffleMaster, JobMasterPartitionTracker partitionTracker, TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint, ExecutionDeploymentListener executionDeploymentListener, ExecutionStateUpdateListener executionStateUpdateListener, long initializationTimestamp, VertexAttemptNumberStore vertexAttemptNumberStore, VertexParallelismStore vertexParallelismStore, Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory, boolean isDynamicGraph, ExecutionJobVertex.Factory executionJobVertexFactory, MarkPartitionFinishedStrategy markPartitionFinishedStrategy, boolean nonFinishedHybridPartitionShouldBeUnknown) throws JobExecutionException, JobException {
        DefaultExecutionGraph executionGraph;
        Preconditions.checkNotNull((Object)jobGraph, (String)"job graph cannot be null");
        String jobName = jobGraph.getName();
        JobID jobId = jobGraph.getJobID();
        JobInformation jobInformation = new JobInformation(jobId, jobName, jobGraph.getSerializedExecutionConfig(), jobGraph.getJobConfiguration(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
        int executionHistorySizeLimit = jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);
        PartitionGroupReleaseStrategy.Factory partitionGroupReleaseStrategyFactory = PartitionGroupReleaseStrategyFactoryLoader.loadPartitionGroupReleaseStrategyFactory(jobManagerConfig);
        try {
            executionGraph = new DefaultExecutionGraph(jobInformation, futureExecutor, ioExecutor, rpcTimeout, executionHistorySizeLimit, classLoader, blobWriter, partitionGroupReleaseStrategyFactory, shuffleMaster, partitionTracker, partitionLocationConstraint, executionDeploymentListener, executionStateUpdateListener, initializationTimestamp, vertexAttemptNumberStore, vertexParallelismStore, isDynamicGraph, executionJobVertexFactory, jobGraph.getJobStatusHooks(), markPartitionFinishedStrategy, nonFinishedHybridPartitionShouldBeUnknown);
        }
        catch (IOException e) {
            throw new JobException("Could not create the ExecutionGraph.", e);
        }
        try {
            executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
        }
        catch (Throwable t) {
            log.warn("Cannot create JSON plan for job", t);
            executionGraph.setJsonPlan("{}");
        }
        long initMasterStart = System.nanoTime();
        log.info("Running initialization on master for job {} ({}).", (Object)jobName, (Object)jobId);
        for (JobVertex vertex : jobGraph.getVertices()) {
            String executableClass = vertex.getInvokableClassName();
            if (executableClass == null || executableClass.isEmpty()) {
                throw new JobSubmissionException(jobId, "The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
            }
            try {
                vertex.initializeOnMaster(new SimpleInitializeOnMasterContext(classLoader, vertexParallelismStore.getParallelismInfo(vertex.getID()).getParallelism()));
            }
            catch (Throwable t) {
                throw new JobExecutionException(jobId, "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
            }
        }
        log.info("Successfully ran initialization on master in {} ms.", (Object)((System.nanoTime() - initMasterStart) / 1000000L));
        List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
        if (log.isDebugEnabled()) {
            log.debug("Adding {} vertices from job graph {} ({}).", new Object[]{sortedTopology.size(), jobName, jobId});
        }
        executionGraph.attachJobGraph(sortedTopology);
        if (log.isDebugEnabled()) {
            log.debug("Successfully created execution graph from job graph {} ({}).", (Object)jobName, (Object)jobId);
        }
        if (isDynamicGraph) {
            log.warn("Skip setting up checkpointing for a job with dynamic graph.");
        } else if (DefaultExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) {
            List<MasterTriggerRestoreHook<?>> hooks;
            CheckpointStorage rootStorage;
            CheckpointStorage applicationConfiguredStorage;
            StateBackend rootBackend;
            StateBackend applicationConfiguredBackend;
            JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
            SerializedValue<StateBackend> serializedAppConfigured = snapshotSettings.getDefaultStateBackend();
            if (serializedAppConfigured == null) {
                applicationConfiguredBackend = null;
            } else {
                try {
                    applicationConfiguredBackend = (StateBackend)serializedAppConfigured.deserializeValue(classLoader);
                }
                catch (IOException | ClassNotFoundException e) {
                    throw new JobExecutionException(jobId, "Could not deserialize application-defined state backend.", e);
                }
            }
            try {
                rootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(applicationConfiguredBackend, snapshotSettings.isChangelogStateBackendEnabled(), jobManagerConfig, classLoader, log);
            }
            catch (IOException | IllegalConfigurationException | DynamicCodeLoadingException e) {
                throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);
            }
            SerializedValue<CheckpointStorage> serializedAppConfiguredStorage = snapshotSettings.getDefaultCheckpointStorage();
            if (serializedAppConfiguredStorage == null) {
                applicationConfiguredStorage = null;
            } else {
                try {
                    applicationConfiguredStorage = (CheckpointStorage)serializedAppConfiguredStorage.deserializeValue(classLoader);
                }
                catch (IOException | ClassNotFoundException e) {
                    throw new JobExecutionException(jobId, "Could not deserialize application-defined checkpoint storage.", e);
                }
            }
            try {
                rootStorage = CheckpointStorageLoader.load(applicationConfiguredStorage, null, rootBackend, jobManagerConfig, classLoader, log);
            }
            catch (IllegalConfigurationException | DynamicCodeLoadingException e) {
                throw new JobExecutionException(jobId, "Could not instantiate configured checkpoint storage", e);
            }
            SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks = snapshotSettings.getMasterHooks();
            if (serializedHooks == null) {
                hooks = Collections.emptyList();
            } else {
                MasterTriggerRestoreHook.Factory[] hookFactories;
                try {
                    hookFactories = (MasterTriggerRestoreHook.Factory[])serializedHooks.deserializeValue(classLoader);
                }
                catch (IOException | ClassNotFoundException e) {
                    throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);
                }
                Thread thread = Thread.currentThread();
                ClassLoader originalClassLoader = thread.getContextClassLoader();
                thread.setContextClassLoader(classLoader);
                try {
                    hooks = new ArrayList(hookFactories.length);
                    for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
                        hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
                    }
                }
                finally {
                    thread.setContextClassLoader(originalClassLoader);
                }
            }
            CheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration();
            String changelogStorage = jobManagerConfig.getString(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE);
            executionGraph.enableCheckpointing(chkConfig, hooks, checkpointIdCounter, completedCheckpointStore, rootBackend, rootStorage, checkpointStatsTrackerFactory.get(), checkpointsCleaner, jobManagerConfig.getString(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE));
        }
        return executionGraph;
    }

    public static boolean isCheckpointingEnabled(JobGraph jobGraph) {
        return jobGraph.getCheckpointingSettings() != null;
    }

    private DefaultExecutionGraphBuilder() {
    }
}

