/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.delegation;

import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobStatusHook;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.util.StringUtils;

@Internal
public class DefaultExecutor
implements Executor {
    private static final String DEFAULT_JOB_NAME = "Flink Exec Table Job";
    private final StreamExecutionEnvironment executionEnvironment;

    public DefaultExecutor(StreamExecutionEnvironment executionEnvironment) {
        this.executionEnvironment = executionEnvironment;
    }

    public StreamExecutionEnvironment getExecutionEnvironment() {
        return this.executionEnvironment;
    }

    public ReadableConfig getConfiguration() {
        return this.executionEnvironment.getConfiguration();
    }

    public Pipeline createPipeline(List<Transformation<?>> transformations, ReadableConfig tableConfiguration, @Nullable String defaultJobName) {
        return this.createPipeline(transformations, tableConfiguration, defaultJobName, Collections.emptyList());
    }

    public Pipeline createPipeline(List<Transformation<?>> transformations, ReadableConfig tableConfiguration, @Nullable String defaultJobName, List<JobStatusHook> jobStatusHookList) {
        this.executionEnvironment.configure(tableConfiguration);
        RuntimeExecutionMode mode = (RuntimeExecutionMode)this.getConfiguration().get(ExecutionOptions.RUNTIME_MODE);
        switch (mode) {
            case BATCH: {
                this.configureBatchSpecificProperties();
                break;
            }
            case STREAMING: {
                break;
            }
            default: {
                throw new TableException(String.format("Unsupported runtime mode: %s", mode));
            }
        }
        StreamGraph streamGraph = this.executionEnvironment.generateStreamGraph(transformations);
        this.setJobName(streamGraph, defaultJobName);
        for (JobStatusHook hook : jobStatusHookList) {
            streamGraph.registerJobStatusHook(hook);
        }
        return streamGraph;
    }

    public JobExecutionResult execute(Pipeline pipeline) throws Exception {
        return this.executionEnvironment.execute((StreamGraph)pipeline);
    }

    public JobClient executeAsync(Pipeline pipeline) throws Exception {
        return this.executionEnvironment.executeAsync((StreamGraph)pipeline);
    }

    public boolean isCheckpointingEnabled() {
        return this.executionEnvironment.getCheckpointConfig().isCheckpointingEnabled();
    }

    private void configureBatchSpecificProperties() {
        this.executionEnvironment.getConfig().enableObjectReuse();
    }

    private void setJobName(StreamGraph streamGraph, @Nullable String defaultJobName) {
        String adjustedDefaultJobName = StringUtils.isNullOrWhitespaceOnly((String)defaultJobName) ? DEFAULT_JOB_NAME : defaultJobName;
        String jobName = this.getConfiguration().getOptional(PipelineOptions.NAME).orElse(adjustedDefaultJobName);
        streamGraph.setJobName(jobName);
    }
}

