/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation.streaming;

import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

public class StreamingEvaluationContext
extends EvaluationContext {
    private final JavaStreamingContext jssc;
    private final long timeout;
    private final Map<PValue, DStreamHolder<?>> pstreams = new LinkedHashMap();
    private final Set<DStreamHolder<?>> leafStreams = new LinkedHashSet();
    private PipelineResult.State state = PipelineResult.State.RUNNING;

    public StreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline, JavaStreamingContext jssc, long timeout) {
        super(jsc, pipeline);
        this.jssc = jssc;
        this.timeout = timeout;
    }

    <T> void setDStreamFromQueue(PTransform<?, ?> transform, Iterable<Iterable<T>> values, Coder<T> coder) {
        this.pstreams.put((PValue)this.getOutput((PTransform<?, OutputT>)((PTransform)transform)), new DStreamHolder<T>(values, coder));
    }

    <T> void setStream(PTransform<?, ?> transform, JavaDStream<WindowedValue<T>> dStream) {
        PValue pvalue = (PValue)this.getOutput((PTransform<?, OutputT>)((PTransform)transform));
        DStreamHolder<T> dStreamHolder = new DStreamHolder<T>(dStream);
        this.pstreams.put(pvalue, dStreamHolder);
        this.leafStreams.add(dStreamHolder);
    }

    boolean hasStream(PTransform<?, ?> transform) {
        PValue pvalue = (PValue)this.getInput((PTransform<InputT, ?>)((PTransform)transform));
        return this.pstreams.containsKey(pvalue);
    }

    JavaDStreamLike<?, ?, ?> getStream(PTransform<?, ?> transform) {
        return this.getStream((PValue)this.getInput((PTransform<InputT, ?>)((PTransform)transform)));
    }

    JavaDStreamLike<?, ?, ?> getStream(PValue pvalue) {
        DStreamHolder<?> dStreamHolder = this.pstreams.get(pvalue);
        JavaDStream<WindowedValue<?>> dStream = dStreamHolder.getDStream();
        this.leafStreams.remove(dStreamHolder);
        return dStream;
    }

    <T> void setInputRDD(PTransform<? extends PInput, ?> transform, JavaRDDLike<WindowedValue<T>, ?> rdd) {
        this.setRDD((PValue)this.getInput((PTransform<InputT, ?>)((PTransform)transform)), rdd);
    }

    JavaRDDLike<?, ?> getOutputRDD(PTransform<?, ?> transform) {
        return this.getRDD((PValue)this.getOutput((PTransform<?, OutputT>)((PTransform)transform)));
    }

    public JavaStreamingContext getStreamingContext() {
        return this.jssc;
    }

    @Override
    public void computeOutputs() {
        for (DStreamHolder<?> streamHolder : this.leafStreams) {
            StreamingEvaluationContext.computeOutput(streamHolder);
        }
    }

    private static <T> void computeOutput(DStreamHolder<T> streamHolder) {
        streamHolder.getDStream().foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>(){

            public void call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
                rdd.rdd().cache();
                rdd.count();
            }
        });
    }

    @Override
    public void close() {
        if (this.timeout > 0L) {
            this.jssc.awaitTerminationOrTimeout(this.timeout);
        } else {
            this.jssc.awaitTermination();
        }
        this.jssc.stop(false, false);
        this.state = PipelineResult.State.DONE;
        super.close();
    }

    @Override
    public PipelineResult.State getState() {
        return this.state;
    }

    protected <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {
        return super.getInput(transform);
    }

    protected <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) {
        return super.getOutput(transform);
    }

    @Override
    protected JavaSparkContext getSparkContext() {
        return super.getSparkContext();
    }

    @Override
    protected SparkRuntimeContext getRuntimeContext() {
        return super.getRuntimeContext();
    }

    @Override
    protected void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
        super.setCurrentTransform(transform);
    }

    @Override
    protected AppliedPTransform<?, ?, ?> getCurrentTransform() {
        return super.getCurrentTransform();
    }

    @Override
    protected <T> void setOutputRDD(PTransform<?, ?> transform, JavaRDDLike<WindowedValue<T>, ?> rdd) {
        super.setOutputRDD(transform, rdd);
    }

    @Override
    protected <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values, Coder<T> coder) {
        super.setOutputRDDFromValues(transform, values, coder);
    }

    @Override
    protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) {
        return super.hasOutputRDD(transform);
    }

    private class DStreamHolder<T> {
        private Iterable<Iterable<T>> values;
        private Coder<T> coder;
        private JavaDStream<WindowedValue<T>> dStream;

        DStreamHolder(Iterable<Iterable<T>> values, Coder<T> coder) {
            this.values = values;
            this.coder = coder;
        }

        DStreamHolder(JavaDStream<WindowedValue<T>> dStream) {
            this.dStream = dStream;
        }

        JavaDStream<WindowedValue<T>> getDStream() {
            if (this.dStream == null) {
                LinkedBlockingQueue<JavaRDD> rddQueue = new LinkedBlockingQueue<JavaRDD>();
                for (Iterable<T> v : this.values) {
                    StreamingEvaluationContext.this.setOutputRDDFromValues(StreamingEvaluationContext.this.currentTransform.getTransform(), v, this.coder);
                    rddQueue.offer((JavaRDD)StreamingEvaluationContext.this.getOutputRDD(StreamingEvaluationContext.this.currentTransform.getTransform()));
                }
                this.dStream = StreamingEvaluationContext.this.jssc.queueStream(rddQueue, true);
            }
            return this.dStream;
        }
    }
}

