/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.cli;

import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.flink.cdc.cli.CliExecutor;
import org.apache.flink.cdc.cli.CliFrontendOptions;
import org.apache.flink.cdc.cli.utils.ConfigurationUtils;
import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.ConfigOptions;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.flink.deployment.ComposeDeployment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.shaded.guava31.com.google.common.base.Joiner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CliFrontend {
    private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);
    private static final String FLINK_HOME_ENV_VAR = "FLINK_HOME";
    private static final String FLINK_CDC_HOME_ENV_VAR = "FLINK_CDC_HOME";

    public static void main(String[] args) throws Exception {
        Options cliOptions = CliFrontendOptions.initializeOptions();
        DefaultParser parser = new DefaultParser();
        CommandLine commandLine = parser.parse(cliOptions, args);
        if (args.length == 0 || commandLine.hasOption(CliFrontendOptions.HELP)) {
            HelpFormatter formatter = new HelpFormatter();
            formatter.setLeftPadding(4);
            formatter.setWidth(80);
            formatter.printHelp(" ", cliOptions);
            return;
        }
        PipelineExecution.ExecutionInfo result = CliFrontend.createExecutor(commandLine).run();
        CliFrontend.printExecutionInfo(result);
    }

    @VisibleForTesting
    static CliExecutor createExecutor(CommandLine commandLine) throws Exception {
        List unparsedArgs = commandLine.getArgList();
        if (unparsedArgs.isEmpty()) {
            throw new IllegalArgumentException("Missing pipeline definition file path in arguments. ");
        }
        Path pipelineDefPath = new Path((String)unparsedArgs.get(0));
        LOG.info("Real Path pipelineDefPath {}", (Object)pipelineDefPath);
        org.apache.flink.cdc.common.configuration.Configuration globalPipelineConfig = CliFrontend.getGlobalConfig(commandLine);
        Path flinkHome = CliFrontend.getFlinkHome(commandLine);
        org.apache.flink.cdc.common.configuration.Configuration configuration = FlinkEnvironmentUtils.loadFlinkConfiguration(flinkHome);
        CliFrontend.overrideFlinkConfiguration(configuration, commandLine);
        Configuration flinkConfig = Configuration.fromMap((Map)configuration.toMap());
        SavepointRestoreSettings savepointSettings = CliFrontend.createSavepointRestoreSettings(commandLine);
        SavepointRestoreSettings.toConfiguration((SavepointRestoreSettings)savepointSettings, (Configuration)flinkConfig);
        List<Path> additionalJars = Arrays.stream((Object[])Optional.ofNullable(commandLine.getOptionValues(CliFrontendOptions.JAR)).orElse(new String[0])).map(Path::new).collect(Collectors.toList());
        return new CliExecutor(commandLine, pipelineDefPath, flinkConfig, globalPipelineConfig, additionalJars, flinkHome);
    }

    private static void overrideFlinkConfiguration(org.apache.flink.cdc.common.configuration.Configuration flinkConfig, CommandLine commandLine) {
        String target = commandLine.hasOption(CliFrontendOptions.USE_MINI_CLUSTER) ? ComposeDeployment.LOCAL.getName() : commandLine.getOptionValue(CliFrontendOptions.TARGET, ComposeDeployment.REMOTE.getName());
        flinkConfig.set(ConfigOptions.key((String)DeploymentOptions.TARGET.key()).stringType().defaultValue((Object)target), (Object)target);
        Properties properties = commandLine.getOptionProperties(CliFrontendOptions.FLINK_CONFIG.getOpt());
        LOG.info("Dynamic flink config items found: {}", (Object)properties);
        for (String key : properties.stringPropertyNames()) {
            String value = properties.getProperty(key);
            if (StringUtils.isNullOrWhitespaceOnly((String)key) || StringUtils.isNullOrWhitespaceOnly((String)value)) {
                throw new IllegalArgumentException(String.format("null or white space argument for key or value: %s=%s", key, value));
            }
            ConfigOption configOption = ConfigOptions.key((String)key.trim()).stringType().defaultValue((Object)value.trim());
            flinkConfig.set(configOption, (Object)value.trim());
        }
    }

    private static SavepointRestoreSettings createSavepointRestoreSettings(CommandLine commandLine) {
        if (commandLine.hasOption(CliFrontendOptions.SAVEPOINT_PATH_OPTION.getOpt())) {
            String savepointPath = commandLine.getOptionValue(CliFrontendOptions.SAVEPOINT_PATH_OPTION.getOpt());
            boolean allowNonRestoredState = commandLine.hasOption(CliFrontendOptions.SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
            Object restoreMode = commandLine.hasOption(CliFrontendOptions.SAVEPOINT_CLAIM_MODE) ? org.apache.flink.configuration.ConfigurationUtils.convertValue((Object)commandLine.getOptionValue(CliFrontendOptions.SAVEPOINT_CLAIM_MODE), ConfigurationUtils.getClaimModeClass()) : SavepointConfigOptions.RESTORE_MODE.defaultValue();
            return (SavepointRestoreSettings)Arrays.stream(SavepointRestoreSettings.class.getMethods()).filter(method -> method.getName().equals("forPath") && method.getParameterCount() == 3).findFirst().map(method -> {
                try {
                    return method.invoke(null, savepointPath, allowNonRestoredState, restoreMode);
                }
                catch (IllegalAccessException | InvocationTargetException e) {
                    throw new RuntimeException("Failed to invoke SavepointRestoreSettings#forPath nethod.", e);
                }
            }).orElseThrow(() -> new RuntimeException("Failed to resolve SavepointRestoreSettings#forPath method."));
        }
        return SavepointRestoreSettings.none();
    }

    private static Path getFlinkHome(CommandLine commandLine) {
        String flinkHomeFromArgs = commandLine.getOptionValue(CliFrontendOptions.FLINK_HOME);
        if (flinkHomeFromArgs != null) {
            LOG.debug("Flink home is loaded by command-line argument: {}", (Object)flinkHomeFromArgs);
            return new Path(flinkHomeFromArgs);
        }
        String flinkHomeFromEnvVar = System.getenv(FLINK_HOME_ENV_VAR);
        if (flinkHomeFromEnvVar != null) {
            LOG.debug("Flink home is loaded by environment variable: {}", (Object)flinkHomeFromEnvVar);
            return new Path(flinkHomeFromEnvVar);
        }
        throw new IllegalArgumentException("Cannot find Flink home from either command line arguments \"--flink-home\" or the environment variable \"FLINK_HOME\". Please make sure Flink home is properly set. ");
    }

    private static org.apache.flink.cdc.common.configuration.Configuration getGlobalConfig(CommandLine commandLine) throws Exception {
        String globalConfig = commandLine.getOptionValue(CliFrontendOptions.GLOBAL_CONFIG);
        if (globalConfig != null) {
            Path globalConfigPath = new Path(globalConfig);
            LOG.info("Using global config in command line: {}", (Object)globalConfigPath);
            return ConfigurationUtils.loadConfigFile(globalConfigPath);
        }
        String flinkCdcHome = System.getenv(FLINK_CDC_HOME_ENV_VAR);
        if (flinkCdcHome != null) {
            Path globalConfigPath = new Path(flinkCdcHome, Joiner.on((String)File.separator).join((Object)"conf", (Object)"flink-cdc.yaml", new Object[0]));
            LOG.info("Using global config in FLINK_CDC_HOME: {}", (Object)globalConfigPath);
            return ConfigurationUtils.loadConfigFile(globalConfigPath);
        }
        LOG.warn("Cannot find global configuration in command-line or FLINK_CDC_HOME. Will use empty global configuration.");
        return new org.apache.flink.cdc.common.configuration.Configuration();
    }

    private static void printExecutionInfo(PipelineExecution.ExecutionInfo info) {
        System.out.println("Pipeline has been submitted to cluster.");
        System.out.printf("Job ID: %s\n", info.getId());
        System.out.printf("Job Description: %s\n", info.getDescription());
    }
}

