/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.datastream.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.FromDataSource;
import org.apache.flink.api.connector.dsv2.WrappedSource;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.datastream.api.ExecutionEnvironment;
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
import org.apache.flink.datastream.impl.ExecutionEnvironmentFactory;
import org.apache.flink.datastream.impl.stream.NonKeyedPartitionStreamImpl;
import org.apache.flink.datastream.impl.utils.StreamUtils;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.runtime.translators.DataStreamV2SinkTransformationTranslator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

public class ExecutionEnvironmentImpl
implements ExecutionEnvironment {
    private final List<Transformation<?>> transformations = new ArrayList();
    private final ExecutionConfig executionConfig;
    private final CheckpointConfig checkpointCfg;
    private final Configuration configuration;
    private final ClassLoader userClassloader;
    private final PipelineExecutorServiceLoader executorServiceLoader;
    private static ExecutionEnvironmentFactory contextEnvironmentFactory = null;

    public static ExecutionEnvironment newInstance() {
        if (contextEnvironmentFactory != null) {
            return contextEnvironmentFactory.createExecutionEnvironment(new Configuration());
        }
        Configuration configuration = new Configuration();
        configuration.set(DeploymentOptions.TARGET, (Object)"local");
        configuration.set(DeploymentOptions.ATTACHED, (Object)true);
        return new ExecutionEnvironmentImpl((PipelineExecutorServiceLoader)new DefaultExecutorServiceLoader(), configuration, null);
    }

    ExecutionEnvironmentImpl(PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, ClassLoader classLoader) {
        this.executorServiceLoader = (PipelineExecutorServiceLoader)Preconditions.checkNotNull((Object)executorServiceLoader);
        this.configuration = configuration;
        this.executionConfig = new ExecutionConfig(this.configuration);
        this.checkpointCfg = new CheckpointConfig(this.configuration);
        this.userClassloader = classLoader == null ? this.getClass().getClassLoader() : classLoader;
        this.configure((ReadableConfig)configuration, this.userClassloader);
    }

    public void execute(String jobName) throws Exception {
        StreamGraph streamGraph = this.getStreamGraph();
        if (jobName != null) {
            streamGraph.setJobName(jobName);
        }
        this.execute(streamGraph);
    }

    public RuntimeExecutionMode getExecutionMode() {
        return (RuntimeExecutionMode)this.configuration.get(ExecutionOptions.RUNTIME_MODE);
    }

    public ExecutionEnvironment setExecutionMode(RuntimeExecutionMode runtimeMode) {
        Preconditions.checkNotNull((Object)runtimeMode);
        this.configuration.set(ExecutionOptions.RUNTIME_MODE, (Object)runtimeMode);
        return this;
    }

    protected static void initializeContextEnvironment(ExecutionEnvironmentFactory ctx) {
        contextEnvironmentFactory = ctx;
    }

    protected static void resetContextEnvironment() {
        contextEnvironmentFactory = null;
    }

    public <OUT> NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream<OUT> fromSource(org.apache.flink.api.connector.dsv2.Source<OUT> source, String sourceName) {
        if (source instanceof WrappedSource) {
            Source innerSource = ((WrappedSource)source).getWrappedSource();
            Object resolvedTypeInfo = ExecutionEnvironmentImpl.getSourceTypeInfo(innerSource, sourceName);
            SourceTransformation sourceTransformation = new SourceTransformation(sourceName, innerSource, WatermarkStrategy.noWatermarks(), resolvedTypeInfo, this.getParallelism(), false);
            return StreamUtils.wrapWithConfigureHandle(new NonKeyedPartitionStreamImpl(this, sourceTransformation));
        }
        if (source instanceof FromDataSource) {
            Collection data = ((FromDataSource)source).getData();
            TypeInformation<OUT> outType = ExecutionEnvironmentImpl.extractTypeInfoFromCollection(data);
            FromElementsGeneratorFunction generatorFunction = new FromElementsGeneratorFunction(outType, this.executionConfig, (Iterable)data);
            DataGeneratorSource generatorSource = new DataGeneratorSource((GeneratorFunction)generatorFunction, (long)data.size(), outType);
            return this.fromSource((org.apache.flink.api.connector.dsv2.Source<OUT>)new WrappedSource((Source)generatorSource), "Collection Source");
        }
        throw new UnsupportedOperationException("Unsupported type of source, you could use DataStreamV2SourceUtils to wrap a FLIP-27 based source.");
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public int getParallelism() {
        return this.executionConfig.getParallelism();
    }

    public List<Transformation<?>> getTransformations() {
        return this.transformations;
    }

    public void setParallelism(int parallelism) {
        this.executionConfig.setParallelism(parallelism);
    }

    public CheckpointConfig getCheckpointCfg() {
        return this.checkpointCfg;
    }

    private static <OUT> TypeInformation<OUT> extractTypeInfoFromCollection(Collection<OUT> data) {
        TypeInformation typeInfo;
        Preconditions.checkNotNull(data, (String)"Collection must not be null");
        if (data.isEmpty()) {
            throw new IllegalArgumentException("Collection must not be empty");
        }
        OUT first = data.iterator().next();
        if (first == null) {
            throw new IllegalArgumentException("Collection must not contain null elements");
        }
        try {
            typeInfo = TypeExtractor.getForObject(first);
        }
        catch (Exception e) {
            throw new RuntimeException("Could not create TypeInformation for type " + String.valueOf(first.getClass()) + "; please specify the TypeInformation manually via the version of the method that explicitly accepts it as an argument.", e);
        }
        return typeInfo;
    }

    private static <OUT, T extends TypeInformation<OUT>> T getSourceTypeInfo(Source<OUT, ?, ?> source, String sourceName) {
        TypeInformation resolvedTypeInfo = null;
        if (source instanceof ResultTypeQueryable) {
            resolvedTypeInfo = ((ResultTypeQueryable)source).getProducedType();
        }
        if (resolvedTypeInfo == null) {
            try {
                resolvedTypeInfo = TypeExtractor.createTypeInfo(Source.class, source.getClass(), (int)0, null, null);
            }
            catch (InvalidTypesException e) {
                resolvedTypeInfo = new MissingTypeInfo(sourceName, e);
            }
        }
        return (T)resolvedTypeInfo;
    }

    public void addOperator(Transformation<?> transformation) {
        Preconditions.checkNotNull(transformation, (String)"transformation must not be null.");
        this.transformations.add(transformation);
    }

    private void execute(StreamGraph streamGraph) throws Exception {
        JobClient jobClient = this.executeAsync(streamGraph);
        try {
            if (((Boolean)this.configuration.get(DeploymentOptions.ATTACHED)).booleanValue()) {
                jobClient.getJobExecutionResult().get();
            }
        }
        catch (Throwable t) {
            Throwable strippedException = ExceptionUtils.stripExecutionException((Throwable)t);
            ExceptionUtils.rethrowException((Throwable)strippedException);
        }
    }

    private JobClient executeAsync(StreamGraph streamGraph) throws Exception {
        Preconditions.checkNotNull((Object)streamGraph, (String)"StreamGraph cannot be null.");
        PipelineExecutor executor = this.getPipelineExecutor();
        CompletableFuture jobClientFuture = executor.execute((Pipeline)streamGraph, this.configuration, this.getClass().getClassLoader());
        try {
            return (JobClient)jobClientFuture.get();
        }
        catch (ExecutionException executionException) {
            Throwable strippedException = ExceptionUtils.stripExecutionException((Throwable)executionException);
            throw new FlinkException(String.format("Failed to execute job '%s'.", streamGraph.getJobName()), strippedException);
        }
    }

    public StreamGraph getStreamGraph() {
        StreamGraph streamGraph = this.getStreamGraphGenerator(this.transformations).generate();
        this.transformations.clear();
        return streamGraph;
    }

    private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations) {
        if (transformations.size() <= 0) {
            throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
        }
        return new StreamGraphGenerator(new ArrayList(transformations), this.executionConfig, this.checkpointCfg, this.configuration);
    }

    private PipelineExecutor getPipelineExecutor() throws Exception {
        Preconditions.checkNotNull((Object)((String)this.configuration.get(DeploymentOptions.TARGET)), (String)"No execution.target specified in your configuration file.");
        PipelineExecutorFactory executorFactory = this.executorServiceLoader.getExecutorFactory(this.configuration);
        Preconditions.checkNotNull((Object)executorFactory, (String)"Cannot find compatible factory for specified execution.target (=%s)", (Object[])new Object[]{this.configuration.get(DeploymentOptions.TARGET)});
        return executorFactory.getExecutor(this.configuration);
    }

    private void configure(ReadableConfig configuration, ClassLoader classLoader) {
        this.configuration.addAll(Configuration.fromMap((Map)configuration.toMap()));
        this.executionConfig.configure(configuration, classLoader);
        this.checkpointCfg.configure(configuration);
    }

    static {
        try {
            DataStreamV2SinkTransformationTranslator.registerSinkTransformationTranslator();
        }
        catch (Exception e) {
            throw new RuntimeException("Can not register process function transformation translator.", e);
        }
    }
}

