/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark;

import org.apache.beam.runners.spark.EvaluationResult;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.SparkPipelineEvaluator;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.SparkProcessContext;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext;
import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.StreamingWindowPipelineDetector;
import org.apache.beam.runners.spark.util.SinglePrimitiveOutputPTransform;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class SparkPipelineRunner
extends PipelineRunner<EvaluationResult> {
    private static final Logger LOG = LoggerFactory.getLogger(SparkPipelineRunner.class);
    private final SparkPipelineOptions mOptions;

    public static SparkPipelineRunner create() {
        SparkPipelineOptions options = (SparkPipelineOptions)PipelineOptionsFactory.as(SparkPipelineOptions.class);
        options.setRunner(SparkPipelineRunner.class);
        return new SparkPipelineRunner(options);
    }

    public static SparkPipelineRunner create(SparkPipelineOptions options) {
        return new SparkPipelineRunner(options);
    }

    public static SparkPipelineRunner fromOptions(PipelineOptions options) {
        SparkPipelineOptions sparkOptions = (SparkPipelineOptions)PipelineOptionsValidator.validate(SparkPipelineOptions.class, (PipelineOptions)options);
        return new SparkPipelineRunner(sparkOptions);
    }

    public <OutputT extends POutput, InputT extends PInput> OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
        if (transform instanceof GroupByKey) {
            return (OutputT)((PCollection)input).apply((PTransform)new GroupByKeyViaGroupByKeyOnly((GroupByKey)transform));
        }
        if (transform instanceof Create.Values) {
            return (OutputT)super.apply(new SinglePrimitiveOutputPTransform((Create.Values)transform), input);
        }
        return (OutputT)super.apply(transform, input);
    }

    private SparkPipelineRunner(SparkPipelineOptions options) {
        this.mOptions = options;
    }

    public EvaluationResult run(Pipeline pipeline) {
        try {
            if (this.mOptions.isStreaming() && !(this.mOptions instanceof SparkStreamingPipelineOptions)) {
                String string = String.valueOf(SparkStreamingPipelineOptions.class.getSimpleName());
                String string2 = String.valueOf(this.mOptions.getClass().getSimpleName());
                throw new RuntimeException(new StringBuilder(48 + String.valueOf(string).length() + String.valueOf(string2).length()).append("A streaming job must be configured with ").append(string).append(", found ").append(string2).toString());
            }
            LOG.info("Executing pipeline using the SparkPipelineRunner.");
            JavaSparkContext jsc = SparkContextFactory.getSparkContext(this.mOptions.getSparkMaster(), this.mOptions.getAppName());
            if (this.mOptions.isStreaming()) {
                StreamingTransformTranslator.Translator translator = new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());
                StreamingWindowPipelineDetector streamingWindowPipelineDetector = new StreamingWindowPipelineDetector(translator);
                pipeline.traverseTopologically((Pipeline.PipelineVisitor)streamingWindowPipelineDetector);
                if (!streamingWindowPipelineDetector.isWindowing()) {
                    throw new IllegalStateException("Spark streaming pipeline must be windowed!");
                }
                Duration batchInterval = streamingWindowPipelineDetector.getBatchDuration();
                LOG.info("Setting Spark streaming batchInterval to {} msec", (Object)batchInterval.milliseconds());
                EvaluationContext ctxt = this.createStreamingEvaluationContext(jsc, pipeline, batchInterval);
                pipeline.traverseTopologically((Pipeline.PipelineVisitor)new SparkPipelineEvaluator(ctxt, translator));
                ctxt.computeOutputs();
                LOG.info("Streaming pipeline construction complete. Starting execution..");
                ((StreamingEvaluationContext)ctxt).getStreamingContext().start();
                return ctxt;
            }
            EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
            TransformTranslator.Translator translator = new TransformTranslator.Translator();
            pipeline.traverseTopologically((Pipeline.PipelineVisitor)new SparkPipelineEvaluator(ctxt, translator));
            ctxt.computeOutputs();
            LOG.info("Pipeline execution complete.");
            return ctxt;
        }
        catch (Exception e) {
            if (e instanceof SparkException && e.getCause() != null) {
                if (e.getCause() instanceof SparkProcessContext.SparkProcessException && e.getCause().getCause() != null) {
                    throw new RuntimeException(e.getCause().getCause());
                }
                throw new RuntimeException(e.getCause());
            }
            throw new RuntimeException(e);
        }
    }

    private EvaluationContext createStreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline, Duration batchDuration) {
        SparkStreamingPipelineOptions streamingOptions = (SparkStreamingPipelineOptions)this.mOptions;
        JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
        return new StreamingEvaluationContext(jsc, pipeline, jssc, streamingOptions.getTimeout());
    }

    public static abstract class Evaluator
    extends Pipeline.PipelineVisitor.Defaults {
        protected static final Logger LOG = LoggerFactory.getLogger(Evaluator.class);
        protected final SparkPipelineTranslator translator;

        protected Evaluator(SparkPipelineTranslator translator) {
            this.translator = translator;
        }

        public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
            Class<?> transformClass;
            if (node.getTransform() != null && this.translator.hasTranslation(transformClass = node.getTransform().getClass())) {
                LOG.info("Entering directly-translatable composite transform: '{}'", (Object)node.getFullName());
                LOG.debug("Composite transform class: '{}'", transformClass);
                this.doVisitTransform(node);
                return Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
            }
            return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
        }

        public void visitPrimitiveTransform(TransformTreeNode node) {
            this.doVisitTransform(node);
        }

        protected abstract <TransformT extends PTransform<? super PInput, POutput>> void doVisitTransform(TransformTreeNode var1);
    }
}

