/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.SpeculativeExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader;
import org.apache.flink.runtime.executiongraph.failover.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartBackoffTimeStrategyFactoryLoader;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroup;
import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.event.FileSystemJobEventStore;
import org.apache.flink.runtime.jobmaster.event.JobEventManager;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.DefaultExecutionOperations;
import org.apache.flink.runtime.scheduler.ExecutionOperations;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
import org.apache.flink.runtime.scheduler.SimpleExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryHandler;
import org.apache.flink.runtime.scheduler.adaptivebatch.DefaultBatchJobRecoveryHandler;
import org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider;
import org.apache.flink.runtime.scheduler.adaptivebatch.DummyBatchJobRecoveryHandler;
import org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismAndInputInfosDecider;
import org.apache.flink.runtime.scheduler.strategy.AllFinishedInputConsumableDecider;
import org.apache.flink.runtime.scheduler.strategy.DefaultInputConsumableDecider;
import org.apache.flink.runtime.scheduler.strategy.InputConsumableDecider;
import org.apache.flink.runtime.scheduler.strategy.PartialFinishedInputConsumableDecider;
import org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.util.SlotSelectionStrategyUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AdaptiveBatchSchedulerFactory
implements SchedulerNGFactory {
    private static final Logger LOG = LoggerFactory.getLogger(AdaptiveBatchSchedulerFactory.class);

    @Override
    public SchedulerNG createInstance(Logger log, JobGraph jobGraph, Executor ioExecutor, Configuration jobMasterConfiguration, SlotPoolService slotPoolService, ScheduledExecutorService futureExecutor, ClassLoader userCodeLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, Time rpcTimeout, BlobWriter blobWriter, JobManagerJobMetricGroup jobManagerJobMetricGroup, Time slotRequestTimeout, ShuffleMaster<?> shuffleMaster, JobMasterPartitionTracker partitionTracker, ExecutionDeploymentTracker executionDeploymentTracker, long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, FatalErrorHandler fatalErrorHandler, JobStatusListener jobStatusListener, Collection<FailureEnricher> failureEnrichers, BlocklistOperations blocklistOperations) throws Exception {
        BatchJobRecoveryHandler jobRecoveryHandler;
        boolean isJobRecoveryEnabled;
        SlotPool slotPool = slotPoolService.castInto(SlotPool.class).orElseThrow(() -> new IllegalStateException("The AdaptiveBatchScheduler requires a SlotPool."));
        ExecutionSlotAllocatorFactory allocatorFactory = AdaptiveBatchSchedulerFactory.createExecutionSlotAllocatorFactory(jobMasterConfiguration, slotPool);
        ExecutionConfig executionConfig = jobGraph.getSerializedExecutionConfig().deserializeValue(userCodeLoader);
        RestartBackoffTimeStrategy restartBackoffTimeStrategy = RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory(executionConfig.getRestartStrategy(), jobGraph.getJobConfiguration(), jobMasterConfiguration, jobGraph.isCheckpointingEnabled()).create();
        log.info("Using restart back off time strategy {} for {} ({}).", new Object[]{restartBackoffTimeStrategy, jobGraph.getName(), jobGraph.getJobID()});
        boolean bl = isJobRecoveryEnabled = jobMasterConfiguration.getBoolean(BatchExecutionOptions.JOB_RECOVERY_ENABLED) && shuffleMaster.supportsBatchSnapshot();
        if (isJobRecoveryEnabled) {
            FileSystemJobEventStore jobEventStore = new FileSystemJobEventStore(jobGraph.getJobID(), jobMasterConfiguration);
            JobEventManager jobEventManager = new JobEventManager(jobEventStore);
            jobRecoveryHandler = new DefaultBatchJobRecoveryHandler(jobEventManager, jobMasterConfiguration);
        } else {
            jobRecoveryHandler = new DummyBatchJobRecoveryHandler();
        }
        return AdaptiveBatchSchedulerFactory.createScheduler(log, jobGraph, executionConfig, ioExecutor, jobMasterConfiguration, futureExecutor, userCodeLoader, checkpointRecoveryFactory, new CheckpointsCleaner(), rpcTimeout, blobWriter, jobManagerJobMetricGroup, shuffleMaster, partitionTracker, executionDeploymentTracker, initializationTimestamp, mainThreadExecutor, jobStatusListener, failureEnrichers, blocklistOperations, new DefaultExecutionOperations(), allocatorFactory, restartBackoffTimeStrategy, new ScheduledExecutorServiceAdapter(futureExecutor), DefaultVertexParallelismAndInputInfosDecider.from(AdaptiveBatchSchedulerFactory.getDefaultMaxParallelism(jobMasterConfiguration, executionConfig), jobMasterConfiguration), jobRecoveryHandler);
    }

    @VisibleForTesting
    public static AdaptiveBatchScheduler createScheduler(Logger log, JobGraph jobGraph, ExecutionConfig executionConfig, Executor ioExecutor, Configuration jobMasterConfiguration, ScheduledExecutorService futureExecutor, ClassLoader userCodeLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, CheckpointsCleaner checkpointsCleaner, Time rpcTimeout, BlobWriter blobWriter, JobManagerJobMetricGroup jobManagerJobMetricGroup, ShuffleMaster<?> shuffleMaster, JobMasterPartitionTracker partitionTracker, ExecutionDeploymentTracker executionDeploymentTracker, long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, JobStatusListener jobStatusListener, Collection<FailureEnricher> failureEnrichers, BlocklistOperations blocklistOperations, ExecutionOperations executionOperations, ExecutionSlotAllocatorFactory allocatorFactory, RestartBackoffTimeStrategy restartBackoffTimeStrategy, ScheduledExecutor delayExecutor, VertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider, BatchJobRecoveryHandler jobRecoveryHandler) throws Exception {
        Preconditions.checkState(jobGraph.getJobType() == JobType.BATCH, "Adaptive batch scheduler only supports batch jobs");
        AdaptiveBatchSchedulerFactory.checkAllExchangesAreSupported(jobGraph);
        boolean enableSpeculativeExecution = jobMasterConfiguration.get(BatchExecutionOptions.SPECULATIVE_ENABLED);
        JobManagerOptions.HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint = AdaptiveBatchSchedulerFactory.getOrDecideHybridPartitionDataConsumeConstraint(jobMasterConfiguration, enableSpeculativeExecution);
        DefaultExecutionGraphFactory executionGraphFactory = new DefaultExecutionGraphFactory(jobMasterConfiguration, userCodeLoader, executionDeploymentTracker, futureExecutor, ioExecutor, rpcTimeout, jobManagerJobMetricGroup, blobWriter, shuffleMaster, partitionTracker, true, AdaptiveBatchSchedulerFactory.createExecutionJobVertexFactory(enableSpeculativeExecution), hybridPartitionDataConsumeConstraint == JobManagerOptions.HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS);
        VertexwiseSchedulingStrategy.Factory schedulingStrategyFactory = new VertexwiseSchedulingStrategy.Factory(AdaptiveBatchSchedulerFactory.loadInputConsumableDeciderFactory(hybridPartitionDataConsumeConstraint));
        int defaultMaxParallelism = AdaptiveBatchSchedulerFactory.getDefaultMaxParallelism(jobMasterConfiguration, executionConfig);
        Map<JobVertexID, ForwardGroup> forwardGroupsByJobVertexId = ForwardGroupComputeUtil.computeForwardGroupsAndCheckParallelism(jobGraph.getVerticesSortedTopologicallyFromSources());
        return new AdaptiveBatchScheduler(log, jobGraph, ioExecutor, jobMasterConfiguration, componentMainThreadExecutor -> {}, delayExecutor, userCodeLoader, checkpointsCleaner, checkpointRecoveryFactory, jobManagerJobMetricGroup, schedulingStrategyFactory, FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration), restartBackoffTimeStrategy, executionOperations, new ExecutionVertexVersioner(), allocatorFactory, initializationTimestamp, mainThreadExecutor, jobStatusListener, failureEnrichers, executionGraphFactory, shuffleMaster, rpcTimeout, vertexParallelismAndInputInfosDecider, defaultMaxParallelism, blocklistOperations, hybridPartitionDataConsumeConstraint, forwardGroupsByJobVertexId, jobRecoveryHandler);
    }

    public static InputConsumableDecider.Factory loadInputConsumableDeciderFactory(JobManagerOptions.HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint) {
        switch (hybridPartitionDataConsumeConstraint) {
            case ALL_PRODUCERS_FINISHED: {
                return AllFinishedInputConsumableDecider.Factory.INSTANCE;
            }
            case ONLY_FINISHED_PRODUCERS: {
                return PartialFinishedInputConsumableDecider.Factory.INSTANCE;
            }
            case UNFINISHED_PRODUCERS: {
                return DefaultInputConsumableDecider.Factory.INSTANCE;
            }
        }
        throw new IllegalStateException((Object)((Object)hybridPartitionDataConsumeConstraint) + "is not supported.");
    }

    public static JobManagerOptions.HybridPartitionDataConsumeConstraint getOrDecideHybridPartitionDataConsumeConstraint(Configuration configuration, boolean enableSpeculativeExecution) {
        JobManagerOptions.HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint = configuration.getOptional(JobManagerOptions.HYBRID_PARTITION_DATA_CONSUME_CONSTRAINT).orElseGet(() -> {
            JobManagerOptions.HybridPartitionDataConsumeConstraint defaultConstraint = enableSpeculativeExecution ? JobManagerOptions.HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS : JobManagerOptions.HybridPartitionDataConsumeConstraint.UNFINISHED_PRODUCERS;
            LOG.info("Set {} to {} as it is not configured", (Object)JobManagerOptions.HYBRID_PARTITION_DATA_CONSUME_CONSTRAINT.key(), (Object)defaultConstraint.name());
            return defaultConstraint;
        });
        if (enableSpeculativeExecution) {
            Preconditions.checkState(hybridPartitionDataConsumeConstraint != JobManagerOptions.HybridPartitionDataConsumeConstraint.UNFINISHED_PRODUCERS, "For speculative execution, only supports consume finished partition now.");
        }
        return hybridPartitionDataConsumeConstraint;
    }

    private static ExecutionSlotAllocatorFactory createExecutionSlotAllocatorFactory(Configuration configuration, SlotPool slotPool) {
        SlotSelectionStrategy slotSelectionStrategy = SlotSelectionStrategyUtils.selectSlotSelectionStrategy(JobType.BATCH, configuration);
        PhysicalSlotProviderImpl physicalSlotProvider = new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
        return new SimpleExecutionSlotAllocator.Factory(physicalSlotProvider, false);
    }

    private static ExecutionJobVertex.Factory createExecutionJobVertexFactory(boolean enableSpeculativeExecution) {
        if (enableSpeculativeExecution) {
            return new SpeculativeExecutionJobVertex.Factory();
        }
        return new ExecutionJobVertex.Factory();
    }

    private static void checkAllExchangesAreSupported(JobGraph jobGraph) {
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            for (IntermediateDataSet dataSet : jobVertex.getProducedDataSets()) {
                Preconditions.checkState(dataSet.getResultType().isBlockingOrBlockingPersistentResultPartition() || dataSet.getResultType().isHybridResultPartition(), String.format("At the moment, adaptive batch scheduler requires batch workloads to be executed with types of all edges being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure '%s' to '%s' or '%s/%s'. Note that for DataSet jobs which do not recognize the aforementioned shuffle mode, the ExecutionMode needs to be %s to force BLOCKING shuffle", new Object[]{ExecutionOptions.BATCH_SHUFFLE_MODE.key(), BatchShuffleMode.ALL_EXCHANGES_BLOCKING, BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL, BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE, ExecutionMode.BATCH_FORCED}));
            }
        }
    }

    static int getDefaultMaxParallelism(Configuration configuration, ExecutionConfig executionConfig) {
        return configuration.getOptional(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM).orElse(executionConfig.getParallelism() == -1 ? BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM.defaultValue() : Integer.valueOf(executionConfig.getParallelism()));
    }

    @Override
    public JobManagerOptions.SchedulerType getSchedulerType() {
        return JobManagerOptions.SchedulerType.AdaptiveBatch;
    }
}

