/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation.streaming;

import java.io.IOException;
import org.apache.beam.repackaged.beam_runners_spark.com.google.common.base.Preconditions;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.Checkpoint;
import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function0;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkRunnerStreamingContextFactory
implements Function0<JavaStreamingContext> {
    private static final Logger LOG = LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class);
    private final transient Pipeline pipeline;
    private final transient SparkPipelineOptions options;
    private final transient Checkpoint.CheckpointDir checkpointDir;

    public SparkRunnerStreamingContextFactory(Pipeline pipeline, SparkPipelineOptions options, Checkpoint.CheckpointDir checkpointDir) {
        this.pipeline = pipeline;
        this.options = options;
        this.checkpointDir = checkpointDir;
    }

    public JavaStreamingContext call() throws Exception {
        LOG.info("Creating a new Spark Streaming Context");
        Preconditions.checkArgument(this.options.getMinReadTimeMillis() < this.options.getBatchIntervalMillis(), "Minimum read time has to be less than batch time.");
        Preconditions.checkArgument(this.options.getReadTimePercentage() > 0.0 && this.options.getReadTimePercentage() < 1.0, "Read time percentage is bound to (0, 1).");
        StreamingTransformTranslator.Translator translator = new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());
        Duration batchDuration = new Duration(this.options.getBatchIntervalMillis().longValue());
        LOG.info("Setting Spark streaming batchDuration to {} msec", (Object)batchDuration.milliseconds());
        JavaSparkContext jsc = SparkContextFactory.getSparkContext(this.options);
        JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
        SparkRunner.initAccumulators(this.options, jsc);
        EvaluationContext ctxt = new EvaluationContext(jsc, this.pipeline, this.options, jssc);
        SparkRunner.updateCacheCandidates(this.pipeline, translator, ctxt);
        this.pipeline.traverseTopologically((Pipeline.PipelineVisitor)new SparkRunner.Evaluator(translator, ctxt));
        ctxt.computeOutputs();
        this.checkpoint(jssc, this.checkpointDir);
        return jssc;
    }

    private void checkpoint(JavaStreamingContext jssc, Checkpoint.CheckpointDir checkpointDir) {
        Path rootCheckpointPath = checkpointDir.getRootCheckpointDir();
        Path sparkCheckpointPath = checkpointDir.getSparkCheckpointDir();
        Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir();
        try {
            FileSystem fileSystem = rootCheckpointPath.getFileSystem(jssc.sparkContext().hadoopConfiguration());
            if (!fileSystem.exists(rootCheckpointPath)) {
                fileSystem.mkdirs(rootCheckpointPath);
            }
            if (!fileSystem.exists(sparkCheckpointPath)) {
                fileSystem.mkdirs(sparkCheckpointPath);
            }
            if (!fileSystem.exists(beamCheckpointPath)) {
                fileSystem.mkdirs(beamCheckpointPath);
            }
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to create checkpoint dir", e);
        }
        jssc.checkpoint(sparkCheckpointPath.toString());
    }
}

