/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation.streaming;

import com.google.common.collect.Iterables;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nullable;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.translation.Dataset;
import org.apache.beam.runners.spark.translation.WindowingHelpers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnboundedDataset<T>
implements Dataset {
    private static final Logger LOG = LoggerFactory.getLogger(UnboundedDataset.class);
    @Nullable
    private transient JavaStreamingContext jssc;
    private Iterable<Iterable<T>> values;
    private Coder<T> coder;
    private JavaDStream<WindowedValue<T>> dStream;

    UnboundedDataset(JavaDStream<WindowedValue<T>> dStream) {
        this.dStream = dStream;
    }

    public UnboundedDataset(Iterable<Iterable<T>> values, JavaStreamingContext jssc, Coder<T> coder) {
        this.values = values;
        this.jssc = jssc;
        this.coder = coder;
    }

    JavaDStream<WindowedValue<T>> getDStream() {
        if (this.dStream == null) {
            WindowedValue.ValueOnlyWindowedValueCoder windowCoder = WindowedValue.getValueOnlyCoder(this.coder);
            LinkedBlockingQueue<JavaRDD> rddQueue = new LinkedBlockingQueue<JavaRDD>();
            JavaRDD lastRDD = null;
            for (Iterable<T> v : this.values) {
                Iterable windowedValues = Iterables.transform(v, WindowingHelpers.windowValueFunction());
                JavaRDD rdd = this.jssc.sc().parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder)).map(CoderHelpers.fromByteFunction(windowCoder));
                rddQueue.offer(rdd);
                lastRDD = rdd;
            }
            this.dStream = lastRDD != null ? this.jssc.queueStream(rddQueue, true, lastRDD) : this.jssc.queueStream(rddQueue, true);
        }
        return this.dStream;
    }

    public void cache() {
        this.dStream.cache();
    }

    @Override
    public void cache(String storageLevel) {
        LOG.warn("Provided StorageLevel ignored for stream, using default level");
        this.cache();
    }

    @Override
    public void action() {
        this.dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>(){

            public void call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
                rdd.count();
            }
        });
    }

    @Override
    public void setName(String name) {
    }
}

