/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.cli;

import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
import org.apache.flink.cdc.composer.flink.deployment.ComposeDeployment;
import org.apache.flink.cdc.composer.flink.deployment.K8SApplicationDeploymentExecutor;
import org.apache.flink.cdc.composer.flink.deployment.YarnApplicationDeploymentExecutor;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CliExecutor {
    private final Path pipelineDefPath;
    private final org.apache.flink.configuration.Configuration flinkConfig;
    private final Configuration globalPipelineConfig;
    private final List<Path> additionalJars;
    private final Path flinkHome;
    private final CommandLine commandLine;
    private PipelineComposer composer = null;

    public CliExecutor(CommandLine commandLine, Path pipelineDefPath, org.apache.flink.configuration.Configuration flinkConfig, Configuration globalPipelineConfig, List<Path> additionalJars, Path flinkHome) {
        this.commandLine = commandLine;
        this.pipelineDefPath = pipelineDefPath;
        this.flinkConfig = flinkConfig;
        this.globalPipelineConfig = globalPipelineConfig;
        this.additionalJars = additionalJars;
        this.flinkHome = flinkHome;
    }

    public PipelineExecution.ExecutionInfo run() throws Exception {
        String deploymentTargetStr = this.getDeploymentTarget();
        ComposeDeployment deploymentTarget = ComposeDeployment.getDeploymentFromName((String)deploymentTargetStr);
        switch (deploymentTarget) {
            case KUBERNETES_APPLICATION: {
                return this.deployWithApplicationComposer((PipelineDeploymentExecutor)new K8SApplicationDeploymentExecutor());
            }
            case YARN_APPLICATION: {
                return this.deployWithApplicationComposer((PipelineDeploymentExecutor)new YarnApplicationDeploymentExecutor());
            }
            case LOCAL: {
                return this.deployWithComposer((PipelineComposer)FlinkPipelineComposer.ofMiniCluster());
            }
            case REMOTE: 
            case YARN_SESSION: {
                return this.deployWithComposer((PipelineComposer)FlinkPipelineComposer.ofRemoteCluster((org.apache.flink.configuration.Configuration)this.flinkConfig, this.additionalJars));
            }
        }
        throw new IllegalArgumentException(String.format("Deployment target %s is not supported", deploymentTargetStr));
    }

    private PipelineExecution.ExecutionInfo deployWithApplicationComposer(PipelineDeploymentExecutor composeExecutor) throws Exception {
        return composeExecutor.deploy(this.commandLine, this.flinkConfig, this.additionalJars, this.flinkHome);
    }

    private PipelineExecution.ExecutionInfo deployWithComposer(PipelineComposer composer) throws Exception {
        YamlPipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
        PipelineDef pipelineDef = pipelineDefinitionParser.parse(this.pipelineDefPath, this.globalPipelineConfig);
        PipelineExecution execution = composer.compose(pipelineDef);
        return execution.execute();
    }

    @VisibleForTesting
    public PipelineExecution.ExecutionInfo deployWithNoOpComposer() throws Exception {
        return this.deployWithComposer(this.composer);
    }

    public static void main(String[] args) throws Exception {
        YamlPipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
        PipelineDef pipelineDef = pipelineDefinitionParser.parse(args[0], new Configuration());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkPipelineComposer flinkPipelineComposer = FlinkPipelineComposer.ofApplicationCluster((StreamExecutionEnvironment)env);
        PipelineExecution execution = flinkPipelineComposer.compose(pipelineDef);
        execution.execute();
    }

    @VisibleForTesting
    void setComposer(PipelineComposer composer) {
        this.composer = composer;
    }

    @VisibleForTesting
    public org.apache.flink.configuration.Configuration getFlinkConfig() {
        return this.flinkConfig;
    }

    @VisibleForTesting
    public Configuration getGlobalPipelineConfig() {
        return this.globalPipelineConfig;
    }

    @VisibleForTesting
    public List<Path> getAdditionalJars() {
        return this.additionalJars;
    }

    public String getDeploymentTarget() {
        return (String)this.flinkConfig.get(DeploymentOptions.TARGET);
    }
}

