/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation;

import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.repackaged.com.google.common.base.Function;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.Iterators;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.LinkedListMultimap;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.Multimap;
import org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics;
import org.apache.beam.runners.spark.translation.SparkProcessContext;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.runners.spark.util.SparkSideInputReader;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;

public class MultiDoFnFunction<InputT, OutputT>
implements PairFlatMapFunction<Iterator<WindowedValue<InputT>>, TupleTag<?>, WindowedValue<?>> {
    private final Accumulator<NamedAggregators> aggAccum;
    private final Accumulator<MetricsContainerStepMap> metricsAccum;
    private final String stepName;
    private final DoFn<InputT, OutputT> doFn;
    private final SparkRuntimeContext runtimeContext;
    private final TupleTag<OutputT> mainOutputTag;
    private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
    private final WindowingStrategy<?, ?> windowingStrategy;

    public MultiDoFnFunction(Accumulator<NamedAggregators> aggAccum, Accumulator<MetricsContainerStepMap> metricsAccum, String stepName, DoFn<InputT, OutputT> doFn, SparkRuntimeContext runtimeContext, TupleTag<OutputT> mainOutputTag, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs, WindowingStrategy<?, ?> windowingStrategy) {
        this.aggAccum = aggAccum;
        this.metricsAccum = metricsAccum;
        this.stepName = stepName;
        this.doFn = doFn;
        this.runtimeContext = runtimeContext;
        this.mainOutputTag = mainOutputTag;
        this.sideInputs = sideInputs;
        this.windowingStrategy = windowingStrategy;
    }

    public Iterable<Tuple2<TupleTag<?>, WindowedValue<?>>> call(Iterator<WindowedValue<InputT>> iter) throws Exception {
        DoFnOutputManager outputManager = new DoFnOutputManager();
        DoFnRunner doFnRunner = DoFnRunners.simpleRunner((PipelineOptions)this.runtimeContext.getPipelineOptions(), this.doFn, (SideInputReader)new SparkSideInputReader(this.sideInputs), (DoFnRunners.OutputManager)outputManager, this.mainOutputTag, Collections.emptyList(), (ExecutionContext.StepContext)new SparkProcessContext.NoOpStepContext(), this.windowingStrategy);
        DoFnRunnerWithMetrics doFnRunnerWithMetrics = new DoFnRunnerWithMetrics(this.stepName, doFnRunner, this.metricsAccum);
        return new SparkProcessContext(this.doFn, doFnRunnerWithMetrics, outputManager).processPartition(iter);
    }

    private class DoFnOutputManager
    implements SparkProcessContext.SparkOutputManager<Tuple2<TupleTag<?>, WindowedValue<?>>> {
        private final Multimap<TupleTag<?>, WindowedValue<?>> outputs = LinkedListMultimap.create();

        private DoFnOutputManager() {
        }

        @Override
        public void clear() {
            this.outputs.clear();
        }

        @Override
        public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> iterator() {
            Iterator<Map.Entry<TupleTag<?>, WindowedValue<?>>> entryIter = this.outputs.entries().iterator();
            return Iterators.transform(entryIter, this.entryToTupleFn());
        }

        private <K, V> Function<Map.Entry<K, V>, Tuple2<K, V>> entryToTupleFn() {
            return new Function<Map.Entry<K, V>, Tuple2<K, V>>(){

                @Override
                public Tuple2<K, V> apply(Map.Entry<K, V> en) {
                    return new Tuple2(en.getKey(), en.getValue());
                }
            };
        }

        public synchronized <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
            this.outputs.put(tag, output);
        }
    }
}

