/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.beam.runners.core.AssignWindowsDoFn;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.aggregators.SparkAggregators;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.io.SourceRDD;
import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
import org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder;
import org.apache.beam.runners.spark.io.hadoop.TemplatedAvroKeyOutputFormat;
import org.apache.beam.runners.spark.io.hadoop.TemplatedTextOutputFormat;
import org.apache.beam.runners.spark.translation.BoundedDataset;
import org.apache.beam.runners.spark.translation.Dataset;
import org.apache.beam.runners.spark.translation.DoFnFunction;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.GroupCombineFunctions;
import org.apache.beam.runners.spark.translation.MultiDoFnFunction;
import org.apache.beam.runners.spark.translation.SparkKeyedCombineFn;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.translation.StorageLevelPTransform;
import org.apache.beam.runners.spark.translation.TransformEvaluator;
import org.apache.beam.runners.spark.translation.TranslationUtils;
import org.apache.beam.runners.spark.translation.WindowingHelpers;
import org.apache.beam.runners.spark.util.BroadcastHelper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public final class TransformTranslator {
    private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps.newHashMap();

    private TransformTranslator() {
    }

    private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
        return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>(){

            @Override
            public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
                JavaRDD unionRDD;
                PCollectionList pcs = (PCollectionList)context.getInput(transform);
                if (pcs.size() == 0) {
                    unionRDD = context.getSparkContext().emptyRDD();
                } else {
                    JavaRDD[] rdds = new JavaRDD[pcs.size()];
                    for (int i = 0; i < rdds.length; ++i) {
                        rdds[i] = ((BoundedDataset)context.borrowDataset((PValue)pcs.get(i))).getRDD();
                    }
                    unionRDD = context.getSparkContext().union(rdds);
                }
                context.putDataset((PTransform<?, ?>)transform, (Dataset)new BoundedDataset(unionRDD));
            }
        };
    }

    private static <K, V> TransformEvaluator<GroupByKey<K, V>> groupByKey() {
        return new TransformEvaluator<GroupByKey<K, V>>(){

            @Override
            public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) {
                JavaRDD inRDD = ((BoundedDataset)context.borrowDataset((PTransform<?, ?>)transform)).getRDD();
                KvCoder coder = (KvCoder)((PCollection)context.getInput(transform)).getCoder();
                Accumulator<NamedAggregators> accum = SparkAggregators.getNamedAggregators(context.getSparkContext());
                context.putDataset((PTransform<?, ?>)transform, (Dataset)new BoundedDataset(GroupCombineFunctions.groupByKey(inRDD, accum, coder, context.getRuntimeContext(), ((PCollection)context.getInput(transform)).getWindowingStrategy())));
            }
        };
    }

    private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>> combineGrouped() {
        return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>(){

            @Override
            public void evaluate(Combine.GroupedValues<K, InputT, OutputT> transform, EvaluationContext context) {
                PCollection input = (PCollection)context.getInput(transform);
                WindowingStrategy windowingStrategy = input.getWindowingStrategy();
                CombineWithContext.KeyedCombineFnWithContext fn = CombineFnUtil.toFnWithContext((CombineFnBase.PerKeyCombineFn)transform.getFn());
                JavaRDD inRDD = ((BoundedDataset)context.borrowDataset((PTransform<?, ?>)transform)).getRDD();
                SparkKeyedCombineFn combineFnWithContext = new SparkKeyedCombineFn(fn, context.getRuntimeContext(), TranslationUtils.getSideInputs(transform.getSideInputs(), context), windowingStrategy);
                context.putDataset((PTransform<?, ?>)transform, (Dataset)new BoundedDataset(inRDD.map(new TranslationUtils.CombineGroupedValues(combineFnWithContext))));
            }
        };
    }

    private static <InputT, AccumT, OutputT> TransformEvaluator<Combine.Globally<InputT, OutputT>> combineGlobally() {
        return new TransformEvaluator<Combine.Globally<InputT, OutputT>>(){

            @Override
            public void evaluate(Combine.Globally<InputT, OutputT> transform, EvaluationContext context) {
                PCollection input = (PCollection)context.getInput(transform);
                Coder iCoder = ((PCollection)context.getInput(transform)).getCoder();
                Coder oCoder = ((PCollection)context.getOutput(transform)).getCoder();
                CombineWithContext.CombineFnWithContext combineFn = CombineFnUtil.toFnWithContext((CombineFnBase.GlobalCombineFn)transform.getFn());
                WindowingStrategy windowingStrategy = input.getWindowingStrategy();
                SparkRuntimeContext runtimeContext = context.getRuntimeContext();
                Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), context);
                boolean hasDefault = transform.isInsertDefault();
                JavaRDD inRdd = ((BoundedDataset)context.borrowDataset((PTransform<?, ?>)transform)).getRDD();
                context.putDataset((PTransform<?, ?>)transform, (Dataset)new BoundedDataset(GroupCombineFunctions.combineGlobally(inRdd, combineFn, iCoder, oCoder, runtimeContext, windowingStrategy, sideInputs, hasDefault)));
            }
        };
    }

    private static <K, InputT, AccumT, OutputT> TransformEvaluator<Combine.PerKey<K, InputT, OutputT>> combinePerKey() {
        return new TransformEvaluator<Combine.PerKey<K, InputT, OutputT>>(){

            @Override
            public void evaluate(Combine.PerKey<K, InputT, OutputT> transform, EvaluationContext context) {
                PCollection input = (PCollection)context.getInput(transform);
                KvCoder inputCoder = (KvCoder)((PCollection)context.getInput(transform)).getCoder();
                CombineWithContext.KeyedCombineFnWithContext combineFn = CombineFnUtil.toFnWithContext((CombineFnBase.PerKeyCombineFn)transform.getFn());
                WindowingStrategy windowingStrategy = input.getWindowingStrategy();
                SparkRuntimeContext runtimeContext = context.getRuntimeContext();
                Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), context);
                JavaRDD inRdd = ((BoundedDataset)context.borrowDataset((PTransform<?, ?>)transform)).getRDD();
                context.putDataset((PTransform<?, ?>)transform, (Dataset)new BoundedDataset(GroupCombineFunctions.combinePerKey(inRdd, combineFn, inputCoder, runtimeContext, windowingStrategy, sideInputs)));
            }
        };
    }

    private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, OutputT>> parDo() {
        return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>(){

            @Override
            public void evaluate(ParDo.Bound<InputT, OutputT> transform, EvaluationContext context) {
                DoFn doFn = transform.getNewFn();
                TranslationUtils.rejectStateAndTimers(doFn);
                JavaRDD inRDD = ((BoundedDataset)context.borrowDataset((PTransform<?, ?>)transform)).getRDD();
                WindowFn windowFn = ((PCollection)context.getInput(transform)).getWindowingStrategy().getWindowFn();
                Accumulator<NamedAggregators> accum = SparkAggregators.getNamedAggregators(context.getSparkContext());
                Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), context);
                context.putDataset((PTransform<?, ?>)transform, (Dataset)new BoundedDataset(inRDD.mapPartitions(new DoFnFunction(accum, transform.getFn(), context.getRuntimeContext(), sideInputs, windowFn))));
            }
        };
    }

    private static <InputT, OutputT> TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>> multiDo() {
        return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>(){

            @Override
            public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) {
                DoFn doFn = transform.getNewFn();
                TranslationUtils.rejectStateAndTimers(doFn);
                JavaRDD inRDD = ((BoundedDataset)context.borrowDataset((PTransform<?, ?>)transform)).getRDD();
                WindowFn windowFn = ((PCollection)context.getInput(transform)).getWindowingStrategy().getWindowFn();
                Accumulator<NamedAggregators> accum = SparkAggregators.getNamedAggregators(context.getSparkContext());
                JavaPairRDD all = inRDD.mapPartitionsToPair(new MultiDoFnFunction(accum, transform.getFn(), context.getRuntimeContext(), transform.getMainOutputTag(), TranslationUtils.getSideInputs(transform.getSideInputs(), context), windowFn)).cache();
                PCollectionTuple pct = (PCollectionTuple)context.getOutput(transform);
                for (Map.Entry e : pct.getAll().entrySet()) {
                    JavaPairRDD filtered = all.filter(new TranslationUtils.TupleTagFilter((TupleTag)e.getKey()));
                    JavaRDD values = filtered.values();
                    context.putDataset((PValue)e.getValue(), new BoundedDataset(values));
                }
            }
        };
    }

    private static <T> TransformEvaluator<TextIO.Read.Bound<T>> readText() {
        return new TransformEvaluator<TextIO.Read.Bound<T>>(){

            @Override
            public void evaluate(TextIO.Read.Bound<T> transform, EvaluationContext context) {
                String pattern = transform.getFilepattern();
                JavaRDD rdd = context.getSparkContext().textFile(pattern).map(WindowingHelpers.windowFunction());
                context.putDataset((PTransform<?, ?>)transform, (Dataset)new BoundedDataset(rdd));
            }
        };
    }

    private static <T> TransformEvaluator<TextIO.Write.Bound<T>> writeText() {
        return new TransformEvaluator<TextIO.Write.Bound<T>>(){

            @Override
            public void evaluate(TextIO.Write.Bound<T> transform, EvaluationContext context) {
                JavaPairRDD last = ((BoundedDataset)context.borrowDataset((PTransform<?, ?>)transform)).getRDD().map(WindowingHelpers.unwindowFunction()).mapToPair(new PairFunction<T, T, Void>(){

                    public Tuple2<T, Void> call(T t) throws Exception {
                        return new Tuple2(t, null);
                    }
                });
                ShardTemplateInformation shardTemplateInfo = new ShardTemplateInformation(transform.getNumShards(), transform.getShardTemplate(), transform.getFilenamePrefix(), transform.getFilenameSuffix());
                TransformTranslator.writeHadoopFile(last, new Configuration(), shardTemplateInfo, Text.class, NullWritable.class, TemplatedTextOutputFormat.class);
            }
        };
    }

    private static <T> TransformEvaluator<AvroIO.Read.Bound<T>> readAvro() {
        return new TransformEvaluator<AvroIO.Read.Bound<T>>(){

            @Override
            public void evaluate(AvroIO.Read.Bound<T> transform, EvaluationContext context) {
                String pattern = transform.getFilepattern();
                JavaSparkContext jsc = context.getSparkContext();
                JavaRDD avroFile = jsc.newAPIHadoopFile(pattern, AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, new Configuration()).keys();
                JavaRDD rdd = avroFile.map(new Function<AvroKey<T>, T>(){

                    public T call(AvroKey<T> key) {
                        return key.datum();
                    }
                }).map(WindowingHelpers.windowFunction());
                context.putDataset((PTransform<?, ?>)transform, (Dataset)new BoundedDataset(rdd));
            }
        };
    }

    private static <T> TransformEvaluator<AvroIO.Write.Bound<T>> writeAvro() {
        return new TransformEvaluator<AvroIO.Write.Bound<T>>(){

            @Override
            public void evaluate(AvroIO.Write.Bound<T> transform, EvaluationContext context) {
                Job job;
                try {
                    job = Job.getInstance();
                }
                catch (IOException e) {
                    throw new IllegalStateException(e);
                }
                AvroJob.setOutputKeySchema((Job)job, (Schema)transform.getSchema());
                JavaPairRDD last = ((BoundedDataset)context.borrowDataset((PTransform<?, ?>)transform)).getRDD().map(WindowingHelpers.unwindowFunction()).mapToPair(new PairFunction<T, AvroKey<T>, NullWritable>(){

                    public Tuple2<AvroKey<T>, NullWritable> call(T t) throws Exception {
                        return new Tuple2((Object)new AvroKey(t), (Object)NullWritable.get());
                    }
                });
                ShardTemplateInformation shardTemplateInfo = new ShardTemplateInformation(transform.getNumShards(), transform.getShardTemplate(), transform.getFilenamePrefix(), transform.getFilenameSuffix());
                TransformTranslator.writeHadoopFile(last, job.getConfiguration(), shardTemplateInfo, AvroKey.class, NullWritable.class, TemplatedAvroKeyOutputFormat.class);
            }
        };
    }

    private static <T> TransformEvaluator<Read.Bounded<T>> readBounded() {
        return new TransformEvaluator<Read.Bounded<T>>(){

            @Override
            public void evaluate(Read.Bounded<T> transform, EvaluationContext context) {
                JavaSparkContext jsc = context.getSparkContext();
                SparkRuntimeContext runtimeContext = context.getRuntimeContext();
                JavaRDD input = new SourceRDD.Bounded(jsc.sc(), transform.getSource(), runtimeContext).toJavaRDD();
                context.putDataset((PTransform<?, ?>)transform, (Dataset)new BoundedDataset(input.cache()));
            }
        };
    }

    private static <K, V> TransformEvaluator<HadoopIO.Read.Bound<K, V>> readHadoop() {
        return new TransformEvaluator<HadoopIO.Read.Bound<K, V>>(){

            @Override
            public void evaluate(HadoopIO.Read.Bound<K, V> transform, EvaluationContext context) {
                String pattern = transform.getFilepattern();
                JavaSparkContext jsc = context.getSparkContext();
                JavaPairRDD file = jsc.newAPIHadoopFile(pattern, transform.getFormatClass(), transform.getKeyClass(), transform.getValueClass(), new Configuration());
                JavaRDD rdd = file.map(new Function<Tuple2<K, V>, KV<K, V>>(){

                    public KV<K, V> call(Tuple2<K, V> t2) throws Exception {
                        return KV.of((Object)t2._1(), (Object)t2._2());
                    }
                }).map(WindowingHelpers.windowFunction());
                context.putDataset(transform, new BoundedDataset(rdd));
            }
        };
    }

    private static <K, V> TransformEvaluator<HadoopIO.Write.Bound<K, V>> writeHadoop() {
        return new TransformEvaluator<HadoopIO.Write.Bound<K, V>>(){

            @Override
            public void evaluate(HadoopIO.Write.Bound<K, V> transform, EvaluationContext context) {
                JavaPairRDD last = ((BoundedDataset)context.borrowDataset(transform)).getRDD().map(WindowingHelpers.unwindowFunction()).mapToPair(new PairFunction<KV<K, V>, K, V>(){

                    public Tuple2<K, V> call(KV<K, V> t) throws Exception {
                        return new Tuple2(t.getKey(), t.getValue());
                    }
                });
                ShardTemplateInformation shardTemplateInfo = new ShardTemplateInformation(transform.getNumShards(), transform.getShardTemplate(), transform.getFilenamePrefix(), transform.getFilenameSuffix());
                Configuration conf = new Configuration();
                for (Map.Entry<String, String> e : transform.getConfigurationProperties().entrySet()) {
                    conf.set(e.getKey(), e.getValue());
                }
                TransformTranslator.writeHadoopFile(last, conf, shardTemplateInfo, transform.getKeyClass(), transform.getValueClass(), transform.getFormatClass());
            }
        };
    }

    private static <K, V> void writeHadoopFile(JavaPairRDD<K, V> rdd, Configuration conf, ShardTemplateInformation shardTemplateInfo, Class<?> keyClass, Class<?> valueClass, Class<? extends FileOutputFormat> formatClass) {
        int numShards = shardTemplateInfo.getNumShards();
        String shardTemplate = shardTemplateInfo.getShardTemplate();
        String filenamePrefix = shardTemplateInfo.getFilenamePrefix();
        String filenameSuffix = shardTemplateInfo.getFilenameSuffix();
        if (numShards != 0) {
            rdd = rdd.repartition(numShards);
        }
        int actualNumShards = rdd.partitions().size();
        String template = ShardNameBuilder.replaceShardCount(shardTemplate, actualNumShards);
        String outputDir = ShardNameBuilder.getOutputDirectory(filenamePrefix, template);
        String filePrefix = ShardNameBuilder.getOutputFilePrefix(filenamePrefix, template);
        String fileTemplate = ShardNameBuilder.getOutputFileTemplate(filenamePrefix, template);
        conf.set("spark.beam.fileoutputformat.prefix", filePrefix);
        conf.set("spark.beam.fileoutputformat.template", fileTemplate);
        conf.set("spark.beam.fileoutputformat.suffix", filenameSuffix);
        rdd.saveAsNewAPIHadoopFile(outputDir, keyClass, valueClass, formatClass, conf);
    }

    private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
        return new TransformEvaluator<Window.Bound<T>>(){

            @Override
            public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
                JavaRDD inRDD = ((BoundedDataset)context.borrowDataset((PTransform<?, ?>)transform)).getRDD();
                if (TranslationUtils.skipAssignWindows(transform, context)) {
                    context.putDataset((PTransform<?, ?>)transform, (Dataset)new BoundedDataset(inRDD));
                } else {
                    WindowFn windowFn = transform.getWindowFn();
                    AssignWindowsDoFn addWindowsDoFn = new AssignWindowsDoFn(windowFn);
                    Accumulator<NamedAggregators> accum = SparkAggregators.getNamedAggregators(context.getSparkContext());
                    context.putDataset((PTransform<?, ?>)transform, (Dataset)new BoundedDataset(inRDD.mapPartitions(new DoFnFunction(accum, addWindowsDoFn, context.getRuntimeContext(), null, null))));
                }
            }
        };
    }

    private static <T> TransformEvaluator<Create.Values<T>> create() {
        return new TransformEvaluator<Create.Values<T>>(){

            @Override
            public void evaluate(Create.Values<T> transform, EvaluationContext context) {
                Iterable elems = transform.getElements();
                Coder coder = ((PCollection)context.getOutput(transform)).getCoder();
                context.putBoundedDatasetFromValues((PTransform<?, ?>)transform, elems, coder);
            }
        };
    }

    private static <T> TransformEvaluator<View.AsSingleton<T>> viewAsSingleton() {
        return new TransformEvaluator<View.AsSingleton<T>>(){

            @Override
            public void evaluate(View.AsSingleton<T> transform, EvaluationContext context) {
                Iterable iter = context.getWindowedValues((PCollection)context.getInput(transform));
                context.putPView((PValue)context.getOutput(transform), iter);
            }
        };
    }

    private static <T> TransformEvaluator<View.AsIterable<T>> viewAsIter() {
        return new TransformEvaluator<View.AsIterable<T>>(){

            @Override
            public void evaluate(View.AsIterable<T> transform, EvaluationContext context) {
                Iterable iter = context.getWindowedValues((PCollection)context.getInput(transform));
                context.putPView((PValue)context.getOutput(transform), iter);
            }
        };
    }

    private static <ReadT, WriteT> TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>> createPCollView() {
        return new TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>(){

            @Override
            public void evaluate(View.CreatePCollectionView<ReadT, WriteT> transform, EvaluationContext context) {
                Iterable iter = context.getWindowedValues((PCollection)context.getInput(transform));
                context.putPView((PValue)context.getOutput(transform), iter);
            }
        };
    }

    private static TransformEvaluator<StorageLevelPTransform> storageLevel() {
        return new TransformEvaluator<StorageLevelPTransform>(){

            @Override
            public void evaluate(StorageLevelPTransform transform, EvaluationContext context) {
                JavaRDD rdd = ((BoundedDataset)context.borrowDataset(transform)).getRDD();
                JavaSparkContext javaSparkContext = context.getSparkContext();
                WindowedValue.ValueOnlyWindowedValueCoder windowCoder = WindowedValue.getValueOnlyCoder((Coder)StringUtf8Coder.of());
                JavaRDD output = javaSparkContext.parallelize(CoderHelpers.toByteArrays(Collections.singletonList(rdd.getStorageLevel().description()), StringUtf8Coder.of())).map(CoderHelpers.fromByteFunction(windowCoder));
                context.putDataset(transform, new BoundedDataset(output));
            }
        };
    }

    static {
        EVALUATORS.put(Read.Bounded.class, TransformTranslator.readBounded());
        EVALUATORS.put(HadoopIO.Read.Bound.class, TransformTranslator.readHadoop());
        EVALUATORS.put(HadoopIO.Write.Bound.class, TransformTranslator.writeHadoop());
        EVALUATORS.put(ParDo.Bound.class, TransformTranslator.parDo());
        EVALUATORS.put(ParDo.BoundMulti.class, TransformTranslator.multiDo());
        EVALUATORS.put(GroupByKey.class, TransformTranslator.groupByKey());
        EVALUATORS.put(Combine.GroupedValues.class, TransformTranslator.combineGrouped());
        EVALUATORS.put(Combine.Globally.class, TransformTranslator.combineGlobally());
        EVALUATORS.put(Combine.PerKey.class, TransformTranslator.combinePerKey());
        EVALUATORS.put(Flatten.FlattenPCollectionList.class, TransformTranslator.flattenPColl());
        EVALUATORS.put(Create.Values.class, TransformTranslator.create());
        EVALUATORS.put(View.AsSingleton.class, TransformTranslator.viewAsSingleton());
        EVALUATORS.put(View.AsIterable.class, TransformTranslator.viewAsIter());
        EVALUATORS.put(View.CreatePCollectionView.class, TransformTranslator.createPCollView());
        EVALUATORS.put(Window.Bound.class, TransformTranslator.window());
        EVALUATORS.put(StorageLevelPTransform.class, TransformTranslator.storageLevel());
    }

    public static class Translator
    implements SparkPipelineTranslator {
        @Override
        public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
            return EVALUATORS.containsKey(clazz);
        }

        @Override
        public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateBounded(Class<TransformT> clazz) {
            TransformEvaluator transformEvaluator = (TransformEvaluator)EVALUATORS.get(clazz);
            Preconditions.checkState((transformEvaluator != null ? 1 : 0) != 0, (String)"No TransformEvaluator registered for BOUNDED transform %s", (Object[])new Object[]{clazz});
            return transformEvaluator;
        }

        @Override
        public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateUnbounded(Class<TransformT> clazz) {
            throw new IllegalStateException("TransformTranslator used in a batch pipeline only supports BOUNDED transforms.");
        }
    }

    private static final class ShardTemplateInformation {
        private final int numShards;
        private final String shardTemplate;
        private final String filenamePrefix;
        private final String filenameSuffix;

        private ShardTemplateInformation(int numShards, String shardTemplate, String filenamePrefix, String filenameSuffix) {
            this.numShards = numShards;
            this.shardTemplate = shardTemplate;
            this.filenamePrefix = filenamePrefix;
            this.filenameSuffix = filenameSuffix;
        }

        int getNumShards() {
            return this.numShards;
        }

        String getShardTemplate() {
            return this.shardTemplate;
        }

        String getFilenamePrefix() {
            return this.filenamePrefix;
        }

        String getFilenameSuffix() {
            return this.filenameSuffix;
        }
    }
}

