/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation.streaming;

import com.google.api.client.util.Lists;
import com.google.api.client.util.Maps;
import com.google.api.client.util.Sets;
import com.google.common.reflect.TypeToken;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.io.KafkaIO;
import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
import org.apache.beam.runners.spark.translation.DoFnFunction;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.TransformEvaluator;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.WindowingHelpers;
import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.AssignWindowsDoFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

public final class StreamingTransformTranslator {
    private static final TransformTranslator.FieldGetter WINDOW_FG = new TransformTranslator.FieldGetter(Window.Bound.class);
    private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps.newHashMap();
    private static final Set<Class<? extends PTransform>> UNSUPPORTED_EVALUATORS;

    private StreamingTransformTranslator() {
    }

    private static <T> TransformEvaluator<ConsoleIO.Write.Unbound<T>> print() {
        return new TransformEvaluator<ConsoleIO.Write.Unbound<T>>(){

            @Override
            public void evaluate(ConsoleIO.Write.Unbound<T> transform, EvaluationContext context) {
                JavaDStreamLike<?, ?, ?> dstream = ((StreamingEvaluationContext)context).getStream(transform);
                dstream.map(WindowingHelpers.unwindowFunction()).print(transform.getNum());
            }
        };
    }

    private static <K, V> TransformEvaluator<KafkaIO.Read.Unbound<K, V>> kafka() {
        return new TransformEvaluator<KafkaIO.Read.Unbound<K, V>>(){

            @Override
            public void evaluate(KafkaIO.Read.Unbound<K, V> transform, EvaluationContext context) {
                StreamingEvaluationContext sec = (StreamingEvaluationContext)context;
                JavaStreamingContext jssc = sec.getStreamingContext();
                Class keyClazz = transform.getKeyClass();
                Class valueClazz = transform.getValueClass();
                Class keyDecoderClazz = transform.getKeyDecoderClass();
                Class valueDecoderClazz = transform.getValueDecoderClass();
                Map<String, String> kafkaParams = transform.getKafkaParams();
                Set<String> topics = transform.getTopics();
                JavaPairInputDStream inputPairStream = KafkaUtils.createDirectStream((JavaStreamingContext)jssc, keyClazz, valueClazz, keyDecoderClazz, valueDecoderClazz, kafkaParams, topics);
                JavaDStream inputStream = inputPairStream.map(new Function<Tuple2<K, V>, KV<K, V>>(){

                    public KV<K, V> call(Tuple2<K, V> t2) throws Exception {
                        return KV.of((Object)t2._1(), (Object)t2._2());
                    }
                }).map(WindowingHelpers.windowFunction());
                sec.setStream(transform, inputStream);
            }
        };
    }

    private static <T> TransformEvaluator<Create.Values<T>> create() {
        return new TransformEvaluator<Create.Values<T>>(){

            @Override
            public void evaluate(Create.Values<T> transform, EvaluationContext context) {
                StreamingEvaluationContext sec = (StreamingEvaluationContext)context;
                Iterable elems = transform.getElements();
                Coder coder = ((PCollection)sec.getOutput(transform)).getCoder();
                sec.setDStreamFromQueue((PTransform<?, ?>)transform, Collections.singletonList(elems), coder);
            }
        };
    }

    private static <T> TransformEvaluator<CreateStream.QueuedValues<T>> createFromQueue() {
        return new TransformEvaluator<CreateStream.QueuedValues<T>>(){

            @Override
            public void evaluate(CreateStream.QueuedValues<T> transform, EvaluationContext context) {
                StreamingEvaluationContext sec = (StreamingEvaluationContext)context;
                Iterable values = transform.getQueuedValues();
                Coder coder = ((PCollection)sec.getOutput(transform)).getCoder();
                sec.setDStreamFromQueue(transform, values, coder);
            }
        };
    }

    private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
        return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>(){

            @Override
            public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
                StreamingEvaluationContext sec = (StreamingEvaluationContext)context;
                PCollectionList pcs = (PCollectionList)sec.getInput(transform);
                JavaDStream first = (JavaDStream)sec.getStream((PValue)pcs.get(0));
                ArrayList rest = Lists.newArrayListWithCapacity((int)(pcs.size() - 1));
                for (int i = 1; i < pcs.size(); ++i) {
                    rest.add((JavaDStream)sec.getStream((PValue)pcs.get(i)));
                }
                JavaDStream dstream = sec.getStreamingContext().union(first, (List)rest);
                sec.setStream((PTransform<?, ?>)transform, dstream);
            }
        };
    }

    private static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> rddTransform(final SparkPipelineTranslator rddTranslator) {
        return new TransformEvaluator<TransformT>(){

            @Override
            public void evaluate(TransformT transform, EvaluationContext context) {
                TransformEvaluator<?> rddEvaluator = rddTranslator.translate(transform.getClass());
                StreamingEvaluationContext sec = (StreamingEvaluationContext)context;
                if (sec.hasStream((PTransform<?, ?>)transform)) {
                    JavaDStreamLike<?, ?, ?> dStream = sec.getStream((PTransform<?, ?>)transform);
                    sec.setStream((PTransform<?, ?>)transform, dStream.transform(new RDDTransform(sec, rddEvaluator, (PTransform)transform, null)));
                } else {
                    rddEvaluator.evaluate(transform, context);
                }
            }
        };
    }

    private static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> foreachRDD(final SparkPipelineTranslator rddTranslator) {
        return new TransformEvaluator<TransformT>(){

            @Override
            public void evaluate(TransformT transform, EvaluationContext context) {
                TransformEvaluator<?> rddEvaluator = rddTranslator.translate(transform.getClass());
                StreamingEvaluationContext sec = (StreamingEvaluationContext)context;
                if (sec.hasStream((PTransform<?, ?>)transform)) {
                    JavaDStreamLike<?, ?, ?> dStream = sec.getStream((PTransform<?, ?>)transform);
                    dStream.foreachRDD(new RDDOutputOperator(sec, rddEvaluator, (PTransform)transform, null));
                } else {
                    rddEvaluator.evaluate(transform, context);
                }
            }
        };
    }

    private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
        return new TransformEvaluator<Window.Bound<T>>(){

            @Override
            public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
                Duration windowDuration;
                StreamingEvaluationContext sec = (StreamingEvaluationContext)context;
                WindowFn windowFn = (WindowFn)WINDOW_FG.get("windowFn", transform);
                JavaDStream dStream = (JavaDStream)sec.getStream((PTransform<?, ?>)transform);
                if (windowFn instanceof FixedWindows) {
                    windowDuration = Durations.milliseconds((long)((FixedWindows)windowFn).getSize().getMillis());
                    sec.setStream((PTransform<?, ?>)transform, dStream.window(windowDuration));
                } else if (windowFn instanceof SlidingWindows) {
                    windowDuration = Durations.milliseconds((long)((SlidingWindows)windowFn).getSize().getMillis());
                    Duration slideDuration = Durations.milliseconds((long)((SlidingWindows)windowFn).getPeriod().getMillis());
                    sec.setStream((PTransform<?, ?>)transform, dStream.window(windowDuration, slideDuration));
                }
                AssignWindowsDoFn addWindowsDoFn = new AssignWindowsDoFn(windowFn);
                DoFnFunction dofn = new DoFnFunction(addWindowsDoFn, ((StreamingEvaluationContext)context).getRuntimeContext(), null);
                JavaDStreamLike<?, ?, ?> dstream = sec.getStream((PTransform<?, ?>)transform);
                sec.setStream((PTransform<?, ?>)transform, dstream.mapPartitions(dofn));
            }
        };
    }

    private static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> getTransformEvaluator(Class<TransformT> clazz, SparkPipelineTranslator rddTranslator) {
        TransformEvaluator<?> transform = EVALUATORS.get(clazz);
        if (transform == null) {
            if (UNSUPPORTED_EVALUATORS.contains(clazz)) {
                throw new UnsupportedOperationException("Dataflow transformation " + clazz.getCanonicalName() + " is currently unsupported by the Spark streaming pipeline");
            }
            Class<?> pTOutputClazz = StreamingTransformTranslator.getPTransformOutputClazz(clazz);
            if (PDone.class.equals(pTOutputClazz)) {
                return StreamingTransformTranslator.foreachRDD(rddTranslator);
            }
            return StreamingTransformTranslator.rddTransform(rddTranslator);
        }
        return transform;
    }

    private static <TransformT extends PTransform<?, ?>> Class<?> getPTransformOutputClazz(Class<TransformT> clazz) {
        Type[] types = ((ParameterizedType)clazz.getGenericSuperclass()).getActualTypeArguments();
        return TypeToken.of(clazz).resolveType(types[1]).getRawType();
    }

    static {
        EVALUATORS.put(ConsoleIO.Write.Unbound.class, StreamingTransformTranslator.print());
        EVALUATORS.put(CreateStream.QueuedValues.class, StreamingTransformTranslator.createFromQueue());
        EVALUATORS.put(Create.Values.class, StreamingTransformTranslator.create());
        EVALUATORS.put(KafkaIO.Read.Unbound.class, StreamingTransformTranslator.kafka());
        EVALUATORS.put(Window.Bound.class, StreamingTransformTranslator.window());
        EVALUATORS.put(Flatten.FlattenPCollectionList.class, StreamingTransformTranslator.flattenPColl());
        UNSUPPORTED_EVALUATORS = Sets.newHashSet();
        UNSUPPORTED_EVALUATORS.add(TextIO.Read.Bound.class);
        UNSUPPORTED_EVALUATORS.add(TextIO.Write.Bound.class);
        UNSUPPORTED_EVALUATORS.add(AvroIO.Read.Bound.class);
        UNSUPPORTED_EVALUATORS.add(AvroIO.Write.Bound.class);
        UNSUPPORTED_EVALUATORS.add(HadoopIO.Read.Bound.class);
        UNSUPPORTED_EVALUATORS.add(HadoopIO.Write.Bound.class);
    }

    public static class Translator
    implements SparkPipelineTranslator {
        private final SparkPipelineTranslator rddTranslator;

        public Translator(SparkPipelineTranslator rddTranslator) {
            this.rddTranslator = rddTranslator;
        }

        @Override
        public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
            return EVALUATORS.containsKey(clazz) || this.rddTranslator.hasTranslation(clazz);
        }

        @Override
        public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translate(Class<TransformT> clazz) {
            return StreamingTransformTranslator.getTransformEvaluator(clazz, this.rddTranslator);
        }
    }

    private static final class RDDOutputOperator<TransformT extends PTransform<?, ?>>
    implements VoidFunction<JavaRDD<WindowedValue<Object>>> {
        private final StreamingEvaluationContext context;
        private final AppliedPTransform<?, ?, ?> appliedPTransform;
        private final TransformEvaluator<TransformT> rddEvaluator;
        private final TransformT transform;

        private RDDOutputOperator(StreamingEvaluationContext context, TransformEvaluator<TransformT> rddEvaluator, TransformT transform) {
            this.context = context;
            this.appliedPTransform = context.getCurrentTransform();
            this.rddEvaluator = rddEvaluator;
            this.transform = transform;
        }

        public void call(JavaRDD<WindowedValue<Object>> rdd) throws Exception {
            AppliedPTransform<?, ?, ?> existingAPT = this.context.getCurrentTransform();
            this.context.setCurrentTransform(this.appliedPTransform);
            this.context.setInputRDD((PTransform<? extends PInput, ?>)this.transform, rdd);
            this.rddEvaluator.evaluate(this.transform, this.context);
            this.context.setCurrentTransform(existingAPT);
        }

        /* synthetic */ RDDOutputOperator(StreamingEvaluationContext x0, TransformEvaluator x1, PTransform x2, 1 x3) {
            this(x0, x1, x2);
        }
    }

    private static final class RDDTransform<TransformT extends PTransform<?, ?>>
    implements Function<JavaRDD<WindowedValue<Object>>, JavaRDD<WindowedValue<Object>>> {
        private final StreamingEvaluationContext context;
        private final AppliedPTransform<?, ?, ?> appliedPTransform;
        private final TransformEvaluator<TransformT> rddEvaluator;
        private final TransformT transform;

        private RDDTransform(StreamingEvaluationContext context, TransformEvaluator<TransformT> rddEvaluator, TransformT transform) {
            this.context = context;
            this.appliedPTransform = context.getCurrentTransform();
            this.rddEvaluator = rddEvaluator;
            this.transform = transform;
        }

        public JavaRDD<WindowedValue<Object>> call(JavaRDD<WindowedValue<Object>> rdd) throws Exception {
            AppliedPTransform<?, ?, ?> existingAPT = this.context.getCurrentTransform();
            this.context.setCurrentTransform(this.appliedPTransform);
            this.context.setInputRDD((PTransform<? extends PInput, ?>)this.transform, rdd);
            this.rddEvaluator.evaluate(this.transform, this.context);
            if (!this.context.hasOutputRDD((PTransform<? extends PInput, ?>)this.transform)) {
                this.context.setOutputRDD((PTransform<?, ?>)this.transform, this.context.getSparkContext().emptyRDD());
            }
            JavaRDD outRDD = (JavaRDD)this.context.getOutputRDD((PTransform<?, ?>)this.transform);
            this.context.setCurrentTransform(existingAPT);
            return outRDD;
        }

        /* synthetic */ RDDTransform(StreamingEvaluationContext x0, TransformEvaluator x1, PTransform x2, 1 x3) {
            this(x0, x1, x2);
        }
    }
}

