/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark;

import com.google.common.collect.Iterables;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkPipelineResult;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.aggregators.SparkAggregators;
import org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetricSource;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.TransformEvaluator;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkEnv$;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.metrics.MetricsSystem;
import org.apache.spark.metrics.source.Source;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class SparkRunner
extends PipelineRunner<SparkPipelineResult> {
    private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class);
    private final SparkPipelineOptions mOptions;

    public static SparkRunner create() {
        SparkPipelineOptions options = (SparkPipelineOptions)PipelineOptionsFactory.as(SparkPipelineOptions.class);
        options.setRunner(SparkRunner.class);
        return new SparkRunner(options);
    }

    public static SparkRunner create(SparkPipelineOptions options) {
        return new SparkRunner(options);
    }

    public static SparkRunner fromOptions(PipelineOptions options) {
        SparkPipelineOptions sparkOptions = (SparkPipelineOptions)PipelineOptionsValidator.validate(SparkPipelineOptions.class, (PipelineOptions)options);
        return new SparkRunner(sparkOptions);
    }

    private SparkRunner(SparkPipelineOptions options) {
        this.mOptions = options;
    }

    private void registerMetrics(SparkPipelineOptions opts, JavaSparkContext jsc) {
        Accumulator<NamedAggregators> accum = SparkAggregators.getNamedAggregators(jsc);
        NamedAggregators initialValue = (NamedAggregators)accum.value();
        if (opts.getEnableSparkMetricSinks().booleanValue()) {
            MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem();
            AggregatorMetricSource aggregatorMetricSource = new AggregatorMetricSource(opts.getAppName(), initialValue);
            metricsSystem.removeSource((Source)aggregatorMetricSource);
            metricsSystem.registerSource((Source)aggregatorMetricSource);
        }
    }

    public SparkPipelineResult run(final Pipeline pipeline) {
        SparkPipelineResult result;
        LOG.info("Executing pipeline using the SparkRunner.");
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        this.detectTranslationMode(pipeline);
        if (this.mOptions.isStreaming()) {
            SparkRunnerStreamingContextFactory contextFactory = new SparkRunnerStreamingContextFactory(pipeline, this.mOptions);
            final JavaStreamingContext jssc = JavaStreamingContext.getOrCreate((String)this.mOptions.getCheckpointDir(), (JavaStreamingContextFactory)contextFactory);
            Future<?> startPipeline = executorService.submit(new Runnable(){

                @Override
                public void run() {
                    SparkRunner.this.registerMetrics(SparkRunner.this.mOptions, jssc.sparkContext());
                    LOG.info("Starting streaming pipeline execution.");
                    jssc.start();
                }
            });
            result = new SparkPipelineResult.StreamingMode(startPipeline, jssc);
        } else {
            final JavaSparkContext jsc = SparkContextFactory.getSparkContext(this.mOptions);
            final EvaluationContext evaluationContext = new EvaluationContext(jsc, pipeline);
            Future<?> startPipeline = executorService.submit(new Runnable(){

                @Override
                public void run() {
                    SparkRunner.this.registerMetrics(SparkRunner.this.mOptions, jsc);
                    pipeline.traverseTopologically((Pipeline.PipelineVisitor)new Evaluator(new TransformTranslator.Translator(), evaluationContext));
                    evaluationContext.computeOutputs();
                    LOG.info("Batch pipeline execution complete.");
                }
            });
            result = new SparkPipelineResult.BatchMode(startPipeline, jsc);
        }
        return result;
    }

    private void detectTranslationMode(Pipeline pipeline) {
        TranslationModeDetector detector = new TranslationModeDetector();
        pipeline.traverseTopologically((Pipeline.PipelineVisitor)detector);
        if (detector.getTranslationMode().equals((Object)TranslationMode.STREAMING)) {
            this.mOptions.setStreaming(true);
        }
    }

    public static class Evaluator
    extends Pipeline.PipelineVisitor.Defaults {
        private static final Logger LOG = LoggerFactory.getLogger(Evaluator.class);
        private final EvaluationContext ctxt;
        private final SparkPipelineTranslator translator;

        public Evaluator(SparkPipelineTranslator translator, EvaluationContext ctxt) {
            this.translator = translator;
            this.ctxt = ctxt;
        }

        public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
            Class<?> transformClass;
            if (node.getTransform() != null && this.translator.hasTranslation(transformClass = node.getTransform().getClass()) && !this.shouldDefer(node)) {
                LOG.info("Entering directly-translatable composite transform: '{}'", (Object)node.getFullName());
                LOG.debug("Composite transform class: '{}'", transformClass);
                this.doVisitTransform(node);
                return Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
            }
            return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
        }

        private boolean shouldDefer(TransformHierarchy.Node node) {
            if (node.getInputs().size() != 1) {
                return false;
            }
            PValue input = (PValue)Iterables.getOnlyElement((Iterable)node.getInputs());
            if (!(input instanceof PCollection) || ((PCollection)input).getWindowingStrategy().getWindowFn().isNonMerging()) {
                return false;
            }
            PTransform transform = node.getTransform();
            boolean hasSideInput = false;
            if (transform instanceof Combine.PerKey) {
                List sideInputs = ((Combine.PerKey)transform).getSideInputs();
                hasSideInput = sideInputs != null && !sideInputs.isEmpty();
            } else if (transform instanceof Combine.Globally) {
                List sideInputs = ((Combine.Globally)transform).getSideInputs();
                boolean bl = hasSideInput = sideInputs != null && !sideInputs.isEmpty();
            }
            if (hasSideInput) {
                LOG.info("Deferring combine transformation {} for job {}", (Object)transform, (Object)this.ctxt.getPipeline().getOptions().getJobName());
                return true;
            }
            return false;
        }

        public void visitPrimitiveTransform(TransformHierarchy.Node node) {
            this.doVisitTransform(node);
        }

        <TransformT extends PTransform<? super PInput, POutput>> void doVisitTransform(TransformHierarchy.Node node) {
            PTransform transform = node.getTransform();
            Class<?> transformClass = transform.getClass();
            TransformEvaluator<PTransform> evaluator = this.translate(node, transform, transformClass);
            LOG.info("Evaluating {}", (Object)transform);
            AppliedPTransform appliedTransform = node.toAppliedPTransform();
            this.ctxt.setCurrentTransform(appliedTransform);
            evaluator.evaluate(transform, this.ctxt);
            this.ctxt.setCurrentTransform(null);
        }

        private <TransformT extends PTransform<? super PInput, POutput>> TransformEvaluator<TransformT> translate(TransformHierarchy.Node node, TransformT transform, Class<TransformT> transformClass) {
            Collection pValues = node.getInputs().isEmpty() ? node.getOutputs() : node.getInputs();
            PCollection.IsBounded isNodeBounded = this.isBoundedCollection(pValues);
            LOG.debug("Translating {} as {}", transform, (Object)isNodeBounded);
            return isNodeBounded.equals((Object)PCollection.IsBounded.BOUNDED) ? this.translator.translateBounded(transformClass) : this.translator.translateUnbounded(transformClass);
        }

        private PCollection.IsBounded isBoundedCollection(Collection<? extends PValue> pValues) {
            PCollection.IsBounded isBounded = PCollection.IsBounded.BOUNDED;
            for (PValue pValue : pValues) {
                if (pValue instanceof PCollection) {
                    isBounded = isBounded.and(((PCollection)pValue).isBounded());
                    continue;
                }
                isBounded = isBounded.and(PCollection.IsBounded.BOUNDED);
            }
            return isBounded;
        }
    }

    static class TranslationModeDetector
    extends Pipeline.PipelineVisitor.Defaults {
        private static final Logger LOG = LoggerFactory.getLogger(TranslationModeDetector.class);
        private TranslationMode translationMode;

        TranslationModeDetector(TranslationMode defaultMode) {
            this.translationMode = defaultMode;
        }

        TranslationModeDetector() {
            this(TranslationMode.BATCH);
        }

        TranslationMode getTranslationMode() {
            return this.translationMode;
        }

        public void visitPrimitiveTransform(TransformHierarchy.Node node) {
            Class<?> transformClass;
            if (this.translationMode.equals((Object)TranslationMode.BATCH) && (transformClass = node.getTransform().getClass()) == Read.Unbounded.class) {
                LOG.info("Found {}. Switching to streaming execution.", transformClass);
                this.translationMode = TranslationMode.STREAMING;
            }
        }
    }

    static enum TranslationMode {
        BATCH,
        STREAMING;

    }
}

