/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
import org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder;
import org.apache.beam.runners.spark.io.hadoop.TemplatedAvroKeyOutputFormat;
import org.apache.beam.runners.spark.io.hadoop.TemplatedTextOutputFormat;
import org.apache.beam.runners.spark.translation.DoFnFunction;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.MultiDoFnFunction;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.TransformEvaluator;
import org.apache.beam.runners.spark.translation.WindowingHelpers;
import org.apache.beam.runners.spark.util.BroadcastHelper;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.AssignWindowsDoFn;
import org.apache.beam.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn;
import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
import org.apache.beam.sdk.util.SystemReduceFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.InMemoryStateInternals;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.joda.time.Instant;
import scala.Tuple2;

public final class TransformTranslator {
    private static final FieldGetter GROUPED_FG = new FieldGetter(Combine.GroupedValues.class);
    private static final FieldGetter COMBINE_GLOBALLY_FG = new FieldGetter(Combine.Globally.class);
    private static final FieldGetter COMBINE_PERKEY_FG = new FieldGetter(Combine.PerKey.class);
    private static final FieldGetter MULTIDO_FG = new FieldGetter(ParDo.BoundMulti.class);
    private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps.newHashMap();

    private TransformTranslator() {
    }

    private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
        return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>(){

            @Override
            public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
                PCollectionList pcs = (PCollectionList)context.getInput(transform);
                JavaRDD[] rdds = new JavaRDD[pcs.size()];
                for (int i = 0; i < rdds.length; ++i) {
                    rdds[i] = (JavaRDD)context.getRDD((PValue)pcs.get(i));
                }
                JavaRDD rdd = context.getSparkContext().union(rdds);
                context.setOutputRDD((PTransform<?, ?>)transform, rdd);
            }
        };
    }

    private static <K, V> TransformEvaluator<GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly<K, V>> gbk() {
        return new TransformEvaluator<GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly<K, V>>(){

            @Override
            public void evaluate(GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly<K, V> transform, EvaluationContext context) {
                JavaRDDLike<?, ?> inRDD = context.getInputRDD((PTransform<? extends PInput, ?>)transform);
                KvCoder coder = (KvCoder)((PCollection)context.getInput(transform)).getCoder();
                Coder keyCoder = coder.getKeyCoder();
                Coder valueCoder = coder.getValueCoder();
                JavaRDD outRDD = TransformTranslator.fromPair(TransformTranslator.toPair((JavaRDDLike)inRDD.map(WindowingHelpers.unwindowFunction())).mapToPair(CoderHelpers.toByteFunction(keyCoder, valueCoder)).groupByKey().mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, valueCoder))).map(WindowingHelpers.windowFunction());
                context.setOutputRDD((PTransform<?, ?>)transform, outRDD);
            }
        };
    }

    private static <K, V, W extends BoundedWindow> TransformEvaluator<GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow<K, V>> gabw() {
        return new TransformEvaluator<GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow<K, V>>(){

            @Override
            public void evaluate(GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow<K, V> transform, EvaluationContext context) {
                JavaRDDLike<?, ?> inRDD = context.getInputRDD((PTransform<? extends PInput, ?>)transform);
                Coder inputCoder = ((PCollection)context.getInput(transform)).getCoder();
                Coder keyCoder = transform.getKeyCoder(inputCoder);
                Coder valueCoder = transform.getValueCoder(inputCoder);
                KvCoder inputKvCoder = (KvCoder)((PCollection)context.getInput(transform)).getCoder();
                Coder inputValueCoder = inputKvCoder.getValueCoder();
                IterableCoder inputIterableValueCoder = (IterableCoder)inputValueCoder;
                Coder inputIterableElementCoder = inputIterableValueCoder.getElemCoder();
                WindowedValue.WindowedValueCoder inputIterableWindowedValueCoder = (WindowedValue.WindowedValueCoder)inputIterableElementCoder;
                Coder inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder();
                WindowingStrategy windowingStrategy = transform.getWindowingStrategy();
                GroupAlsoByWindowsViaOutputBufferDoFn gabwDoFn = new GroupAlsoByWindowsViaOutputBufferDoFn(windowingStrategy, new InMemoryStateInternalsFactory(), SystemReduceFn.buffering((Coder)inputIterableElementValueCoder));
                JavaRDD outRDD = inRDD.mapPartitions(new DoFnFunction(gabwDoFn, context.getRuntimeContext(), null));
                context.setOutputRDD((PTransform<?, ?>)transform, outRDD);
            }
        };
    }

    private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>> grouped() {
        return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>(){

            @Override
            public void evaluate(Combine.GroupedValues<K, InputT, OutputT> transform, EvaluationContext context) {
                Combine.KeyedCombineFn keyed = (Combine.KeyedCombineFn)GROUPED_FG.get("fn", transform);
                JavaRDDLike<?, ?> inRDD = context.getInputRDD((PTransform<? extends PInput, ?>)transform);
                context.setOutputRDD((PTransform<?, ?>)transform, inRDD.map(new KVFunction(keyed)));
            }
        };
    }

    private static <InputT, AccumT, OutputT> TransformEvaluator<Combine.Globally<InputT, OutputT>> combineGlobally() {
        return new TransformEvaluator<Combine.Globally<InputT, OutputT>>(){

            @Override
            public void evaluate(Combine.Globally<InputT, OutputT> transform, EvaluationContext context) {
                Coder aCoder;
                final Combine.CombineFn globally = (Combine.CombineFn)COMBINE_GLOBALLY_FG.get("fn", transform);
                JavaRDDLike<?, ?> inRdd = context.getInputRDD((PTransform<? extends PInput, ?>)transform);
                final Coder iCoder = ((PCollection)context.getInput(transform)).getCoder();
                try {
                    aCoder = globally.getAccumulatorCoder(context.getPipeline().getCoderRegistry(), iCoder);
                }
                catch (CannotProvideCoderException e) {
                    throw new IllegalStateException("Could not determine coder for accumulator", e);
                }
                JavaRDD inRddBytes = inRdd.map(WindowingHelpers.unwindowFunction()).map(CoderHelpers.toByteFunction(iCoder));
                byte[] acc = (byte[])inRddBytes.aggregate((Object)CoderHelpers.toByteArray(globally.createAccumulator(), aCoder), (Function2)new Function2<byte[], byte[], byte[]>(){

                    public byte[] call(byte[] ab, byte[] ib) throws Exception {
                        Object a = CoderHelpers.fromByteArray(ab, aCoder);
                        Object i = CoderHelpers.fromByteArray(ib, iCoder);
                        return CoderHelpers.toByteArray(globally.addInput(a, i), aCoder);
                    }
                }, (Function2)new Function2<byte[], byte[], byte[]>(){

                    public byte[] call(byte[] a1b, byte[] a2b) throws Exception {
                        Object a1 = CoderHelpers.fromByteArray(a1b, aCoder);
                        Object a2 = CoderHelpers.fromByteArray(a2b, aCoder);
                        List<Object> accumulators = Collections.unmodifiableList(Arrays.asList(a1, a2));
                        Object merged = globally.mergeAccumulators(accumulators);
                        return CoderHelpers.toByteArray(merged, aCoder);
                    }
                });
                Object output = globally.extractOutput(CoderHelpers.fromByteArray(acc, aCoder));
                Coder coder = ((PCollection)context.getOutput(transform)).getCoder();
                JavaRDD outRdd = context.getSparkContext().parallelize(CoderHelpers.toByteArrays(Collections.singleton(output), coder));
                context.setOutputRDD((PTransform<?, ?>)transform, outRdd.map(CoderHelpers.fromByteFunction(coder)).map(WindowingHelpers.windowFunction()));
            }
        };
    }

    private static <K, InputT, AccumT, OutputT> TransformEvaluator<Combine.PerKey<K, InputT, OutputT>> combinePerKey() {
        return new TransformEvaluator<Combine.PerKey<K, InputT, OutputT>>(){

            @Override
            public void evaluate(Combine.PerKey<K, InputT, OutputT> transform, EvaluationContext context) {
                Coder vaCoder;
                final Combine.KeyedCombineFn keyed = (Combine.KeyedCombineFn)COMBINE_PERKEY_FG.get("fn", transform);
                JavaRDDLike<?, ?> inRdd = context.getInputRDD((PTransform<? extends PInput, ?>)transform);
                KvCoder inputCoder = (KvCoder)((PCollection)context.getInput(transform)).getCoder();
                Coder keyCoder = inputCoder.getKeyCoder();
                Coder viCoder = inputCoder.getValueCoder();
                try {
                    vaCoder = keyed.getAccumulatorCoder(context.getPipeline().getCoderRegistry(), keyCoder, viCoder);
                }
                catch (CannotProvideCoderException e) {
                    throw new IllegalStateException("Could not determine coder for accumulator", e);
                }
                KvCoder kviCoder = KvCoder.of((Coder)keyCoder, (Coder)viCoder);
                KvCoder kvaCoder = KvCoder.of((Coder)keyCoder, (Coder)vaCoder);
                JavaPairRDD inRddDuplicatedKeyPair = inRdd.flatMapToPair(new PairFlatMapFunction<WindowedValue<KV<K, InputT>>, WindowedValue<K>, WindowedValue<KV<K, InputT>>>(){

                    public Iterable<Tuple2<WindowedValue<K>, WindowedValue<KV<K, InputT>>>> call(WindowedValue<KV<K, InputT>> kv) {
                        ArrayList tuple2s = Lists.newArrayListWithCapacity((int)kv.getWindows().size());
                        for (BoundedWindow boundedWindow : kv.getWindows()) {
                            WindowedValue wk = WindowedValue.of((Object)((KV)kv.getValue()).getKey(), (Instant)boundedWindow.maxTimestamp(), (BoundedWindow)boundedWindow, (PaneInfo)kv.getPane());
                            tuple2s.add(new Tuple2((Object)wk, kv));
                        }
                        return tuple2s;
                    }
                });
                WindowedValue.FullWindowedValueCoder wkCoder = WindowedValue.FullWindowedValueCoder.of((Coder)keyCoder, (Coder)((PCollection)context.getInput(transform)).getWindowingStrategy().getWindowFn().windowCoder());
                final WindowedValue.FullWindowedValueCoder wkviCoder = WindowedValue.FullWindowedValueCoder.of((Coder)kviCoder, (Coder)((PCollection)context.getInput(transform)).getWindowingStrategy().getWindowFn().windowCoder());
                final WindowedValue.FullWindowedValueCoder wkvaCoder = WindowedValue.FullWindowedValueCoder.of((Coder)kvaCoder, (Coder)((PCollection)context.getInput(transform)).getWindowingStrategy().getWindowFn().windowCoder());
                JavaPairRDD inRddDuplicatedKeyPairBytes = inRddDuplicatedKeyPair.mapToPair(CoderHelpers.toByteFunction(wkCoder, wkviCoder));
                JavaPairRDD accumulatedBytes = inRddDuplicatedKeyPairBytes.combineByKey((Function)new Function<byte[], byte[]>(){

                    public byte[] call(byte[] input) {
                        WindowedValue wkvi = (WindowedValue)CoderHelpers.fromByteArray(input, wkviCoder);
                        Object va = keyed.createAccumulator(((KV)wkvi.getValue()).getKey());
                        va = keyed.addInput(((KV)wkvi.getValue()).getKey(), va, ((KV)wkvi.getValue()).getValue());
                        WindowedValue wkva = WindowedValue.of((Object)KV.of((Object)((KV)wkvi.getValue()).getKey(), (Object)va), (Instant)wkvi.getTimestamp(), (Collection)wkvi.getWindows(), (PaneInfo)wkvi.getPane());
                        return CoderHelpers.toByteArray(wkva, wkvaCoder);
                    }
                }, (Function2)new Function2<byte[], byte[], byte[]>(){

                    public byte[] call(byte[] acc, byte[] input) {
                        WindowedValue wkva = (WindowedValue)CoderHelpers.fromByteArray(acc, wkvaCoder);
                        WindowedValue wkvi = (WindowedValue)CoderHelpers.fromByteArray(input, wkviCoder);
                        Object va = keyed.addInput(((KV)wkva.getValue()).getKey(), ((KV)wkva.getValue()).getValue(), ((KV)wkvi.getValue()).getValue());
                        wkva = WindowedValue.of((Object)KV.of((Object)((KV)wkva.getValue()).getKey(), (Object)va), (Instant)wkva.getTimestamp(), (Collection)wkva.getWindows(), (PaneInfo)wkva.getPane());
                        return CoderHelpers.toByteArray(wkva, wkvaCoder);
                    }
                }, (Function2)new Function2<byte[], byte[], byte[]>(){

                    public byte[] call(byte[] acc1, byte[] acc2) {
                        WindowedValue wkva1 = (WindowedValue)CoderHelpers.fromByteArray(acc1, wkvaCoder);
                        WindowedValue wkva2 = (WindowedValue)CoderHelpers.fromByteArray(acc2, wkvaCoder);
                        Object va = keyed.mergeAccumulators(((KV)wkva1.getValue()).getKey(), Collections.unmodifiableList(Arrays.asList(((KV)wkva1.getValue()).getValue(), ((KV)wkva2.getValue()).getValue())));
                        WindowedValue wkva = WindowedValue.of((Object)KV.of((Object)((KV)wkva1.getValue()).getKey(), (Object)va), (Instant)wkva1.getTimestamp(), (Collection)wkva1.getWindows(), (PaneInfo)wkva1.getPane());
                        return CoderHelpers.toByteArray(wkva, wkvaCoder);
                    }
                });
                JavaPairRDD extracted = accumulatedBytes.mapToPair(CoderHelpers.fromByteFunction(wkCoder, wkvaCoder)).mapValues(new Function<WindowedValue<KV<K, AccumT>>, WindowedValue<OutputT>>(){

                    public WindowedValue<OutputT> call(WindowedValue<KV<K, AccumT>> acc) {
                        return WindowedValue.of((Object)keyed.extractOutput(((KV)acc.getValue()).getKey(), ((KV)acc.getValue()).getValue()), (Instant)acc.getTimestamp(), (Collection)acc.getWindows(), (PaneInfo)acc.getPane());
                    }
                });
                context.setOutputRDD((PTransform<?, ?>)transform, TransformTranslator.fromPair(extracted).map(new Function<KV<WindowedValue<K>, WindowedValue<OutputT>>, WindowedValue<KV<K, OutputT>>>(){

                    public WindowedValue<KV<K, OutputT>> call(KV<WindowedValue<K>, WindowedValue<OutputT>> kwvo) throws Exception {
                        WindowedValue wvo = (WindowedValue)kwvo.getValue();
                        KV kvo = KV.of((Object)((WindowedValue)kwvo.getKey()).getValue(), (Object)wvo.getValue());
                        return WindowedValue.of((Object)kvo, (Instant)wvo.getTimestamp(), (Collection)wvo.getWindows(), (PaneInfo)wvo.getPane());
                    }
                }));
            }
        };
    }

    private static <K, V> JavaPairRDD<K, V> toPair(JavaRDDLike<KV<K, V>, ?> rdd) {
        return rdd.mapToPair(new PairFunction<KV<K, V>, K, V>(){

            public Tuple2<K, V> call(KV<K, V> kv) {
                return new Tuple2(kv.getKey(), kv.getValue());
            }
        });
    }

    private static <K, V> JavaRDDLike<KV<K, V>, ?> fromPair(JavaPairRDD<K, V> rdd) {
        return rdd.map(new Function<Tuple2<K, V>, KV<K, V>>(){

            public KV<K, V> call(Tuple2<K, V> t2) {
                return KV.of((Object)t2._1(), (Object)t2._2());
            }
        });
    }

    private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, OutputT>> parDo() {
        return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>(){

            @Override
            public void evaluate(ParDo.Bound<InputT, OutputT> transform, EvaluationContext context) {
                DoFnFunction dofn = new DoFnFunction(transform.getFn(), context.getRuntimeContext(), TransformTranslator.getSideInputs(transform.getSideInputs(), context));
                JavaRDDLike<?, ?> inRDD = context.getInputRDD((PTransform<? extends PInput, ?>)transform);
                context.setOutputRDD((PTransform<?, ?>)transform, inRDD.mapPartitions(dofn));
            }
        };
    }

    private static <InputT, OutputT> TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>> multiDo() {
        return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>(){

            @Override
            public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) {
                TupleTag mainOutputTag = (TupleTag)MULTIDO_FG.get("mainOutputTag", transform);
                MultiDoFnFunction multifn = new MultiDoFnFunction(transform.getFn(), context.getRuntimeContext(), mainOutputTag, TransformTranslator.getSideInputs(transform.getSideInputs(), context));
                JavaRDDLike<?, ?> inRDD = context.getInputRDD((PTransform<? extends PInput, ?>)transform);
                JavaPairRDD all = inRDD.mapPartitionsToPair(multifn).cache();
                PCollectionTuple pct = (PCollectionTuple)context.getOutput(transform);
                for (Map.Entry e : pct.getAll().entrySet()) {
                    JavaPairRDD filtered = all.filter(new TupleTagFilter((TupleTag)e.getKey()));
                    JavaRDD values = filtered.values();
                    context.setRDD((PValue)e.getValue(), values);
                }
            }
        };
    }

    private static <T> TransformEvaluator<TextIO.Read.Bound<T>> readText() {
        return new TransformEvaluator<TextIO.Read.Bound<T>>(){

            @Override
            public void evaluate(TextIO.Read.Bound<T> transform, EvaluationContext context) {
                String pattern = transform.getFilepattern();
                JavaRDD rdd = context.getSparkContext().textFile(pattern).map(WindowingHelpers.windowFunction());
                context.setOutputRDD((PTransform<?, ?>)transform, rdd);
            }
        };
    }

    private static <T> TransformEvaluator<TextIO.Write.Bound<T>> writeText() {
        return new TransformEvaluator<TextIO.Write.Bound<T>>(){

            @Override
            public void evaluate(TextIO.Write.Bound<T> transform, EvaluationContext context) {
                JavaPairRDD last = context.getInputRDD((PTransform<? extends PInput, ?>)transform).map(WindowingHelpers.unwindowFunction()).mapToPair(new PairFunction<T, T, Void>(){

                    public Tuple2<T, Void> call(T t) throws Exception {
                        return new Tuple2(t, null);
                    }
                });
                ShardTemplateInformation shardTemplateInfo = new ShardTemplateInformation(transform.getNumShards(), transform.getShardTemplate(), transform.getFilenamePrefix(), transform.getFilenameSuffix());
                TransformTranslator.writeHadoopFile(last, new Configuration(), shardTemplateInfo, Text.class, NullWritable.class, TemplatedTextOutputFormat.class);
            }
        };
    }

    private static <T> TransformEvaluator<AvroIO.Read.Bound<T>> readAvro() {
        return new TransformEvaluator<AvroIO.Read.Bound<T>>(){

            @Override
            public void evaluate(AvroIO.Read.Bound<T> transform, EvaluationContext context) {
                String pattern = transform.getFilepattern();
                JavaSparkContext jsc = context.getSparkContext();
                JavaRDD avroFile = jsc.newAPIHadoopFile(pattern, AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, new Configuration()).keys();
                JavaRDD rdd = avroFile.map(new Function<AvroKey<T>, T>(){

                    public T call(AvroKey<T> key) {
                        return key.datum();
                    }
                }).map(WindowingHelpers.windowFunction());
                context.setOutputRDD((PTransform<?, ?>)transform, rdd);
            }
        };
    }

    private static <T> TransformEvaluator<AvroIO.Write.Bound<T>> writeAvro() {
        return new TransformEvaluator<AvroIO.Write.Bound<T>>(){

            @Override
            public void evaluate(AvroIO.Write.Bound<T> transform, EvaluationContext context) {
                Job job;
                try {
                    job = Job.getInstance();
                }
                catch (IOException e) {
                    throw new IllegalStateException(e);
                }
                AvroJob.setOutputKeySchema((Job)job, (Schema)transform.getSchema());
                JavaPairRDD last = context.getInputRDD((PTransform<? extends PInput, ?>)transform).map(WindowingHelpers.unwindowFunction()).mapToPair(new PairFunction<T, AvroKey<T>, NullWritable>(){

                    public Tuple2<AvroKey<T>, NullWritable> call(T t) throws Exception {
                        return new Tuple2((Object)new AvroKey(t), (Object)NullWritable.get());
                    }
                });
                ShardTemplateInformation shardTemplateInfo = new ShardTemplateInformation(transform.getNumShards(), transform.getShardTemplate(), transform.getFilenamePrefix(), transform.getFilenameSuffix());
                TransformTranslator.writeHadoopFile(last, job.getConfiguration(), shardTemplateInfo, AvroKey.class, NullWritable.class, TemplatedAvroKeyOutputFormat.class);
            }
        };
    }

    private static <K, V> TransformEvaluator<HadoopIO.Read.Bound<K, V>> readHadoop() {
        return new TransformEvaluator<HadoopIO.Read.Bound<K, V>>(){

            @Override
            public void evaluate(HadoopIO.Read.Bound<K, V> transform, EvaluationContext context) {
                String pattern = transform.getFilepattern();
                JavaSparkContext jsc = context.getSparkContext();
                JavaPairRDD file = jsc.newAPIHadoopFile(pattern, transform.getFormatClass(), transform.getKeyClass(), transform.getValueClass(), new Configuration());
                JavaRDD rdd = file.map(new Function<Tuple2<K, V>, KV<K, V>>(){

                    public KV<K, V> call(Tuple2<K, V> t2) throws Exception {
                        return KV.of((Object)t2._1(), (Object)t2._2());
                    }
                }).map(WindowingHelpers.windowFunction());
                context.setOutputRDD(transform, rdd);
            }
        };
    }

    private static <K, V> TransformEvaluator<HadoopIO.Write.Bound<K, V>> writeHadoop() {
        return new TransformEvaluator<HadoopIO.Write.Bound<K, V>>(){

            @Override
            public void evaluate(HadoopIO.Write.Bound<K, V> transform, EvaluationContext context) {
                JavaPairRDD last = context.getInputRDD(transform).map(WindowingHelpers.unwindowFunction()).mapToPair(new PairFunction<KV<K, V>, K, V>(){

                    public Tuple2<K, V> call(KV<K, V> t) throws Exception {
                        return new Tuple2(t.getKey(), t.getValue());
                    }
                });
                ShardTemplateInformation shardTemplateInfo = new ShardTemplateInformation(transform.getNumShards(), transform.getShardTemplate(), transform.getFilenamePrefix(), transform.getFilenameSuffix());
                Configuration conf = new Configuration();
                for (Map.Entry<String, String> e : transform.getConfigurationProperties().entrySet()) {
                    conf.set(e.getKey(), e.getValue());
                }
                TransformTranslator.writeHadoopFile(last, conf, shardTemplateInfo, transform.getKeyClass(), transform.getValueClass(), transform.getFormatClass());
            }
        };
    }

    private static <K, V> void writeHadoopFile(JavaPairRDD<K, V> rdd, Configuration conf, ShardTemplateInformation shardTemplateInfo, Class<?> keyClass, Class<?> valueClass, Class<? extends FileOutputFormat> formatClass) {
        int numShards = shardTemplateInfo.getNumShards();
        String shardTemplate = shardTemplateInfo.getShardTemplate();
        String filenamePrefix = shardTemplateInfo.getFilenamePrefix();
        String filenameSuffix = shardTemplateInfo.getFilenameSuffix();
        if (numShards != 0) {
            rdd = rdd.repartition(numShards);
        }
        int actualNumShards = rdd.partitions().size();
        String template = ShardNameBuilder.replaceShardCount(shardTemplate, actualNumShards);
        String outputDir = ShardNameBuilder.getOutputDirectory(filenamePrefix, template);
        String filePrefix = ShardNameBuilder.getOutputFilePrefix(filenamePrefix, template);
        String fileTemplate = ShardNameBuilder.getOutputFileTemplate(filenamePrefix, template);
        conf.set("spark.dataflow.fileoutputformat.prefix", filePrefix);
        conf.set("spark.dataflow.fileoutputformat.template", fileTemplate);
        conf.set("spark.dataflow.fileoutputformat.suffix", filenameSuffix);
        rdd.saveAsNewAPIHadoopFile(outputDir, keyClass, valueClass, formatClass, conf);
    }

    private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
        return new TransformEvaluator<Window.Bound<T>>(){

            @Override
            public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
                JavaRDDLike<?, ?> inRDD = context.getInputRDD((PTransform<? extends PInput, ?>)transform);
                WindowFn windowFn = transform.getWindowFn();
                if (windowFn == null || ((PCollection)context.getInput(transform)).getWindowingStrategy().getWindowFn() instanceof GlobalWindows && windowFn instanceof GlobalWindows) {
                    context.setOutputRDD((PTransform<?, ?>)transform, inRDD);
                } else {
                    AssignWindowsDoFn addWindowsDoFn = new AssignWindowsDoFn(windowFn);
                    DoFnFunction dofn = new DoFnFunction(addWindowsDoFn, context.getRuntimeContext(), null);
                    context.setOutputRDD((PTransform<?, ?>)transform, inRDD.mapPartitions(dofn));
                }
            }
        };
    }

    private static <T> TransformEvaluator<Create.Values<T>> create() {
        return new TransformEvaluator<Create.Values<T>>(){

            @Override
            public void evaluate(Create.Values<T> transform, EvaluationContext context) {
                Iterable elems = transform.getElements();
                Coder coder = ((PCollection)context.getOutput(transform)).getCoder();
                context.setOutputRDDFromValues((PTransform<?, ?>)transform, elems, coder);
            }
        };
    }

    private static <T> TransformEvaluator<View.AsSingleton<T>> viewAsSingleton() {
        return new TransformEvaluator<View.AsSingleton<T>>(){

            @Override
            public void evaluate(View.AsSingleton<T> transform, EvaluationContext context) {
                Iterable iter = context.getWindowedValues((PCollection)context.getInput(transform));
                context.setPView((PValue)context.getOutput(transform), iter);
            }
        };
    }

    private static <T> TransformEvaluator<View.AsIterable<T>> viewAsIter() {
        return new TransformEvaluator<View.AsIterable<T>>(){

            @Override
            public void evaluate(View.AsIterable<T> transform, EvaluationContext context) {
                Iterable iter = context.getWindowedValues((PCollection)context.getInput(transform));
                context.setPView((PValue)context.getOutput(transform), iter);
            }
        };
    }

    private static <ReadT, WriteT> TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>> createPCollView() {
        return new TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>(){

            @Override
            public void evaluate(View.CreatePCollectionView<ReadT, WriteT> transform, EvaluationContext context) {
                Iterable iter = context.getWindowedValues((PCollection)context.getInput(transform));
                context.setPView((PValue)context.getOutput(transform), iter);
            }
        };
    }

    private static Map<TupleTag<?>, BroadcastHelper<?>> getSideInputs(List<PCollectionView<?>> views, EvaluationContext context) {
        if (views == null) {
            return ImmutableMap.of();
        }
        HashMap sideInputs = Maps.newHashMap();
        for (PCollectionView<?> view : views) {
            Iterable<WindowedValue<?>> collectionView = context.getPCollectionView(view);
            Coder coderInternal = view.getCoderInternal();
            BroadcastHelper<Iterable<WindowedValue<?>>> helper = BroadcastHelper.create(collectionView, coderInternal);
            helper.broadcast(context.getSparkContext());
            sideInputs.put(view.getTagInternal(), helper);
        }
        return sideInputs;
    }

    public static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> getTransformEvaluator(Class<TransformT> clazz) {
        TransformEvaluator<?> transform = EVALUATORS.get(clazz);
        if (transform == null) {
            throw new IllegalStateException("No TransformEvaluator registered for " + clazz);
        }
        return transform;
    }

    static {
        EVALUATORS.put(TextIO.Read.Bound.class, TransformTranslator.readText());
        EVALUATORS.put(TextIO.Write.Bound.class, TransformTranslator.writeText());
        EVALUATORS.put(AvroIO.Read.Bound.class, TransformTranslator.readAvro());
        EVALUATORS.put(AvroIO.Write.Bound.class, TransformTranslator.writeAvro());
        EVALUATORS.put(HadoopIO.Read.Bound.class, TransformTranslator.readHadoop());
        EVALUATORS.put(HadoopIO.Write.Bound.class, TransformTranslator.writeHadoop());
        EVALUATORS.put(ParDo.Bound.class, TransformTranslator.parDo());
        EVALUATORS.put(ParDo.BoundMulti.class, TransformTranslator.multiDo());
        EVALUATORS.put(GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly.class, TransformTranslator.gbk());
        EVALUATORS.put(GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow.class, TransformTranslator.gabw());
        EVALUATORS.put(Combine.GroupedValues.class, TransformTranslator.grouped());
        EVALUATORS.put(Combine.Globally.class, TransformTranslator.combineGlobally());
        EVALUATORS.put(Combine.PerKey.class, TransformTranslator.combinePerKey());
        EVALUATORS.put(Flatten.FlattenPCollectionList.class, TransformTranslator.flattenPColl());
        EVALUATORS.put(Create.Values.class, TransformTranslator.create());
        EVALUATORS.put(View.AsSingleton.class, TransformTranslator.viewAsSingleton());
        EVALUATORS.put(View.AsIterable.class, TransformTranslator.viewAsIter());
        EVALUATORS.put(View.CreatePCollectionView.class, TransformTranslator.createPCollView());
        EVALUATORS.put(Window.Bound.class, TransformTranslator.window());
    }

    private static class InMemoryStateInternalsFactory<K>
    implements StateInternalsFactory<K>,
    Serializable {
        private InMemoryStateInternalsFactory() {
        }

        public StateInternals<K> stateInternalsForKey(K key) {
            return InMemoryStateInternals.forKey(key);
        }
    }

    public static class Translator
    implements SparkPipelineTranslator {
        @Override
        public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
            return EVALUATORS.containsKey(clazz);
        }

        @Override
        public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translate(Class<TransformT> clazz) {
            return TransformTranslator.getTransformEvaluator(clazz);
        }
    }

    private static final class TupleTagFilter<V>
    implements Function<Tuple2<TupleTag<V>, WindowedValue<?>>, Boolean> {
        private final TupleTag<V> tag;

        private TupleTagFilter(TupleTag<V> tag) {
            this.tag = tag;
        }

        public Boolean call(Tuple2<TupleTag<V>, WindowedValue<?>> input) {
            return this.tag.equals(input._1());
        }
    }

    private static final class ShardTemplateInformation {
        private final int numShards;
        private final String shardTemplate;
        private final String filenamePrefix;
        private final String filenameSuffix;

        private ShardTemplateInformation(int numShards, String shardTemplate, String filenamePrefix, String filenameSuffix) {
            this.numShards = numShards;
            this.shardTemplate = shardTemplate;
            this.filenamePrefix = filenamePrefix;
            this.filenameSuffix = filenameSuffix;
        }

        int getNumShards() {
            return this.numShards;
        }

        String getShardTemplate() {
            return this.shardTemplate;
        }

        String getFilenamePrefix() {
            return this.filenamePrefix;
        }

        String getFilenameSuffix() {
            return this.filenameSuffix;
        }
    }

    private static final class KVFunction<K, InputT, OutputT>
    implements Function<WindowedValue<KV<K, Iterable<InputT>>>, WindowedValue<KV<K, OutputT>>> {
        private final Combine.KeyedCombineFn<K, InputT, ?, OutputT> keyed;

        KVFunction(Combine.KeyedCombineFn<K, InputT, ?, OutputT> keyed) {
            this.keyed = keyed;
        }

        public WindowedValue<KV<K, OutputT>> call(WindowedValue<KV<K, Iterable<InputT>>> windowedKv) throws Exception {
            KV kv = (KV)windowedKv.getValue();
            return WindowedValue.of((Object)KV.of((Object)kv.getKey(), (Object)this.keyed.apply(kv.getKey(), (Iterable)kv.getValue())), (Instant)windowedKv.getTimestamp(), (Collection)windowedKv.getWindows(), (PaneInfo)windowedKv.getPane());
        }
    }

    public static class FieldGetter {
        private final Map<String, Field> fields = Maps.newHashMap();

        public FieldGetter(Class<?> clazz) {
            for (Field f : clazz.getDeclaredFields()) {
                f.setAccessible(true);
                this.fields.put(f.getName(), f);
            }
        }

        public <T> T get(String fieldname, Object value) {
            try {
                Object fieldValue = this.fields.get(fieldname).get(value);
                return (T)fieldValue;
            }
            catch (IllegalAccessException e) {
                throw new IllegalStateException(e);
            }
        }
    }
}

