/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark;

import java.io.OutputStream;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
import org.apache.beam.runners.core.construction.graph.SplittableParDoExpander;
import org.apache.beam.runners.core.construction.resources.PipelineResources;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineJarUtils;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineResult;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkPipelineResult;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.translation.SparkBatchPortablePipelineTranslator;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.SparkTranslationContext;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.spark.api.java.JavaSparkContext;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkPipelineRunner
implements PortablePipelineRunner {
    private static final Logger LOG = LoggerFactory.getLogger(SparkPipelineRunner.class);
    private final SparkPipelineOptions pipelineOptions;

    public SparkPipelineRunner(SparkPipelineOptions pipelineOptions) {
        this.pipelineOptions = pipelineOptions;
    }

    public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) {
        RunnerApi.Pipeline fusedPipeline;
        SparkBatchPortablePipelineTranslator translator = new SparkBatchPortablePipelineTranslator();
        RunnerApi.Pipeline pipelineWithSdfExpanded = ProtoOverrides.updateTransform((String)"beam:transform:pardo:v1", (RunnerApi.Pipeline)pipeline, (ProtoOverrides.TransformReplacement)SplittableParDoExpander.createSizedReplacement());
        RunnerApi.Pipeline trimmedPipeline = PipelineTrimmer.trim((RunnerApi.Pipeline)pipelineWithSdfExpanded, translator.knownUrns());
        RunnerApi.Pipeline pipeline2 = fusedPipeline = trimmedPipeline.getComponents().getTransformsMap().values().stream().anyMatch(proto -> "beam:runner:executable_stage:v1".equals(proto.getSpec().getUrn())) ? trimmedPipeline : GreedyPipelineFuser.fuse((RunnerApi.Pipeline)trimmedPipeline).toPipeline();
        if (this.pipelineOptions.getFilesToStage() == null) {
            this.pipelineOptions.setFilesToStage(PipelineResources.detectClassPathResourcesToStage((ClassLoader)SparkPipelineRunner.class.getClassLoader(), (PipelineOptions)this.pipelineOptions));
            LOG.info("PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath");
        }
        SparkPipelineOptions.prepareFilesToStage(this.pipelineOptions);
        LOG.info("Will stage {} files. (Enable logging at DEBUG level to see which files will be staged.)", (Object)this.pipelineOptions.getFilesToStage().size());
        LOG.debug("Staging files: {}", this.pipelineOptions.getFilesToStage());
        JavaSparkContext jsc = SparkContextFactory.getSparkContext(this.pipelineOptions);
        LOG.info(String.format("Running job %s on Spark master %s", jobInfo.jobId(), jsc.master()));
        AggregatorsAccumulator.init(this.pipelineOptions, jsc);
        MetricsEnvironment.setMetricsSupported((boolean)false);
        MetricsAccumulator.init(this.pipelineOptions, jsc);
        SparkTranslationContext context = new SparkTranslationContext(jsc, this.pipelineOptions, jobInfo);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<?> submissionFuture = executorService.submit(() -> {
            translator.translate(fusedPipeline, context);
            LOG.info(String.format("Job %s: Pipeline translated successfully. Computing outputs", jobInfo.jobId()));
            context.computeOutputs();
            LOG.info(String.format("Job %s finished.", jobInfo.jobId()));
        });
        SparkPipelineResult.PortableBatchMode result = new SparkPipelineResult.PortableBatchMode(submissionFuture, jsc);
        MetricsPusher metricsPusher = new MetricsPusher(MetricsAccumulator.getInstance().value(), (MetricsOptions)this.pipelineOptions.as(MetricsOptions.class), (PipelineResult)result);
        metricsPusher.start();
        result.waitUntilFinish();
        executorService.shutdown();
        return result;
    }

    public static void main(String[] args) throws Exception {
        FileSystems.setDefaultPipelineOptions((PipelineOptions)PipelineOptionsFactory.create());
        SparkPipelineRunnerConfiguration configuration = SparkPipelineRunner.parseArgs(args);
        String baseJobName = configuration.baseJobName == null ? PortablePipelineJarUtils.getDefaultJobName() : configuration.baseJobName;
        Preconditions.checkArgument((baseJobName != null ? 1 : 0) != 0, (Object)"No default job name found. Job name must be set using --base-job-name.");
        RunnerApi.Pipeline pipeline = PortablePipelineJarUtils.getPipelineFromClasspath((String)baseJobName);
        Struct originalOptions = PortablePipelineJarUtils.getPipelineOptionsFromClasspath((String)baseJobName);
        PortablePipelineOptions portablePipelineOptions = (PortablePipelineOptions)PipelineOptionsTranslation.fromProto((Struct)originalOptions).as(PortablePipelineOptions.class);
        portablePipelineOptions.setRetrievalServiceType(PortablePipelineOptions.RetrievalServiceType.CLASSLOADER);
        String retrievalToken = PortablePipelineJarUtils.getArtifactManifestUri((String)baseJobName);
        SparkPipelineOptions sparkOptions = (SparkPipelineOptions)portablePipelineOptions.as(SparkPipelineOptions.class);
        String invocationId = String.format("%s_%s", sparkOptions.getJobName(), UUID.randomUUID().toString());
        if (sparkOptions.getAppName() == null) {
            LOG.debug("App name was null. Using invocationId {}", (Object)invocationId);
            sparkOptions.setAppName(invocationId);
        }
        SparkPipelineRunner runner = new SparkPipelineRunner(sparkOptions);
        JobInfo jobInfo = JobInfo.create((String)invocationId, (String)sparkOptions.getJobName(), (String)retrievalToken, (Struct)PipelineOptionsTranslation.toProto((PipelineOptions)sparkOptions));
        try {
            runner.run(pipeline, jobInfo);
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Job %s failed.", invocationId), e);
        }
        LOG.info("Job {} finished successfully.", (Object)invocationId);
    }

    private static SparkPipelineRunnerConfiguration parseArgs(String[] args) {
        SparkPipelineRunnerConfiguration configuration = new SparkPipelineRunnerConfiguration();
        CmdLineParser parser = new CmdLineParser((Object)configuration);
        try {
            parser.parseArgument(args);
        }
        catch (CmdLineException e) {
            LOG.error("Unable to parse command line arguments.", (Throwable)e);
            parser.printUsage((OutputStream)System.err);
            throw new IllegalArgumentException("Unable to parse command line arguments.", e);
        }
        return configuration;
    }

    private static class SparkPipelineRunnerConfiguration {
        @Option(name="--base-job-name", usage="The job to run. This must correspond to a subdirectory of the jar's BEAM-PIPELINE directory. *Only needs to be specified if the jar contains multiple pipelines.*")
        private String baseJobName = null;

        private SparkPipelineRunnerConfiguration() {
        }
    }
}

