/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.composer.flink;

import java.net.URI;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.function.HashFunctionProvider;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.flink.FlinkEnvironmentUtils;
import org.apache.flink.cdc.composer.flink.FlinkPipelineExecution;
import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
import org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator;
import org.apache.flink.cdc.composer.flink.translator.PartitioningTranslator;
import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
import org.apache.flink.cdc.composer.flink.translator.TransformTranslator;
import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent;
import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

@Internal
public class FlinkPipelineComposer
implements PipelineComposer {
    private final StreamExecutionEnvironment env;
    private final boolean isBlocking;

    public static FlinkPipelineComposer ofRemoteCluster(Configuration flinkConfig, List<Path> additionalJars) {
        StreamExecutionEnvironment env = new StreamExecutionEnvironment(flinkConfig);
        additionalJars.forEach(jarPath -> {
            try {
                FlinkEnvironmentUtils.addJar(env, jarPath.makeQualified(jarPath.getFileSystem()).toUri().toURL());
            }
            catch (Exception e) {
                throw new RuntimeException(String.format("Unable to convert JAR path \"%s\" to URL when adding JAR to Flink environment", jarPath), e);
            }
        });
        return new FlinkPipelineComposer(env, false);
    }

    public static FlinkPipelineComposer ofApplicationCluster(StreamExecutionEnvironment env) {
        return new FlinkPipelineComposer(env, false);
    }

    public static FlinkPipelineComposer ofMiniCluster() {
        return new FlinkPipelineComposer(StreamExecutionEnvironment.getExecutionEnvironment(), true);
    }

    private FlinkPipelineComposer(StreamExecutionEnvironment env, boolean isBlocking) {
        this.env = env;
        this.isBlocking = isBlocking;
    }

    @Override
    public PipelineExecution compose(PipelineDef pipelineDef) {
        org.apache.flink.cdc.common.configuration.Configuration pipelineDefConfig = pipelineDef.getConfig();
        int parallelism = (Integer)pipelineDefConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
        this.env.getConfig().setParallelism(parallelism);
        this.translate(this.env, pipelineDef);
        this.addFrameworkJars();
        return new FlinkPipelineExecution(this.env, (String)pipelineDefConfig.get(PipelineOptions.PIPELINE_NAME), this.isBlocking);
    }

    private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef) {
        org.apache.flink.cdc.common.configuration.Configuration pipelineDefConfig = pipelineDef.getConfig();
        int parallelism = (Integer)pipelineDefConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
        SchemaChangeBehavior schemaChangeBehavior = (SchemaChangeBehavior)pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
        boolean isBatchMode = org.apache.flink.cdc.common.pipeline.RuntimeExecutionMode.BATCH.equals(pipelineDefConfig.get(PipelineOptions.PIPELINE_EXECUTION_RUNTIME_MODE));
        if (isBatchMode) {
            env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        } else {
            env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        }
        DataSourceTranslator sourceTranslator = new DataSourceTranslator();
        TransformTranslator transformTranslator = new TransformTranslator();
        PartitioningTranslator partitioningTranslator = new PartitioningTranslator();
        SchemaOperatorTranslator schemaOperatorTranslator = new SchemaOperatorTranslator(schemaChangeBehavior, (String)pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID), (Duration)pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT), (String)pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
        DataSinkTranslator sinkTranslator = new DataSinkTranslator();
        OperatorIDGenerator schemaOperatorIDGenerator = new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
        DataSource dataSource = sourceTranslator.createDataSource(pipelineDef.getSource(), pipelineDefConfig, env);
        DataSink dataSink = sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDefConfig, env);
        boolean isParallelMetadataSource = dataSource.isParallelMetadataSource();
        DataStreamSource<Event> stream = sourceTranslator.translate(pipelineDef.getSource(), dataSource, env, parallelism);
        stream = transformTranslator.translatePreTransform((DataStream<Event>)stream, pipelineDef.getTransforms(), pipelineDef.getUdfs(), pipelineDef.getModels(), dataSource.supportedMetadataColumns(), !isParallelMetadataSource && !isBatchMode);
        stream = transformTranslator.translatePostTransform((DataStream<Event>)stream, pipelineDef.getTransforms(), (String)pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE), pipelineDef.getUdfs(), pipelineDef.getModels(), dataSource.supportedMetadataColumns());
        if (isParallelMetadataSource) {
            DataStream<PartitioningEvent> partitionedStream = partitioningTranslator.translateDistributed((DataStream<Event>)stream, parallelism, parallelism, (HashFunctionProvider<DataChangeEvent>)dataSink.getDataChangeEventHashFunctionProvider(parallelism));
            stream = schemaOperatorTranslator.translateDistributed(partitionedStream, parallelism, dataSink.getMetadataApplier().setAcceptedSchemaEvolutionTypes(pipelineDef.getSink().getIncludedSchemaEvolutionTypes()), pipelineDef.getRoute());
        } else {
            stream = schemaOperatorTranslator.translateRegular((DataStream<Event>)stream, parallelism, isBatchMode, dataSink.getMetadataApplier().setAcceptedSchemaEvolutionTypes(pipelineDef.getSink().getIncludedSchemaEvolutionTypes()), pipelineDef.getRoute());
            stream = partitioningTranslator.translateRegular((DataStream<Event>)stream, parallelism, parallelism, isBatchMode, schemaOperatorIDGenerator.generate(), (HashFunctionProvider<DataChangeEvent>)dataSink.getDataChangeEventHashFunctionProvider(parallelism));
        }
        sinkTranslator.translate(pipelineDef.getSink(), (DataStream<Event>)stream, dataSink, isBatchMode, schemaOperatorIDGenerator.generate());
    }

    private void addFrameworkJars() {
        try {
            Optional<URL> runtimeJar;
            HashSet<URI> frameworkJars = new HashSet<URI>();
            Optional<URL> commonJar = this.getContainingJar(Event.class);
            if (commonJar.isPresent()) {
                frameworkJars.add(commonJar.get().toURI());
            }
            if ((runtimeJar = this.getContainingJar(EventSerializer.class)).isPresent()) {
                frameworkJars.add(runtimeJar.get().toURI());
            }
            for (URI jar : frameworkJars) {
                FlinkEnvironmentUtils.addJar(this.env, jar.toURL());
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to search and add Flink CDC framework JARs", e);
        }
    }

    private Optional<URL> getContainingJar(Class<?> clazz) throws Exception {
        URL container = clazz.getProtectionDomain().getCodeSource().getLocation();
        if (Files.isDirectory(Paths.get(container.toURI()), new LinkOption[0])) {
            return Optional.empty();
        }
        return Optional.of(container);
    }
}

