/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.spark.translation.SparkProcessContext;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.util.BroadcastHelper;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.spark.api.java.function.FlatMapFunction;

public class DoFnFunction<InputT, OutputT>
implements FlatMapFunction<Iterator<WindowedValue<InputT>>, WindowedValue<OutputT>> {
    private final DoFn<InputT, OutputT> mFunction;
    private final SparkRuntimeContext mRuntimeContext;
    private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;

    public DoFnFunction(DoFn<InputT, OutputT> fn, SparkRuntimeContext runtime, Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
        this.mFunction = fn;
        this.mRuntimeContext = runtime;
        this.mSideInputs = sideInputs;
    }

    public Iterable<WindowedValue<OutputT>> call(Iterator<WindowedValue<InputT>> iter) throws Exception {
        ProcCtxt ctxt = new ProcCtxt(this.mFunction, this.mRuntimeContext, this.mSideInputs);
        ctxt.setup();
        this.mFunction.startBundle((DoFn.Context)ctxt);
        return ctxt.getOutputIterable(iter, this.mFunction);
    }

    private class ProcCtxt
    extends SparkProcessContext<InputT, OutputT, WindowedValue<OutputT>> {
        private final List<WindowedValue<OutputT>> outputs;

        ProcCtxt(DoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
            super(fn, runtimeContext, sideInputs);
            this.outputs = new LinkedList();
        }

        @Override
        public synchronized void output(OutputT o) {
            this.outputs.add(this.windowedValue != null ? this.windowedValue.withValue(o) : WindowedValue.valueInGlobalWindow(o));
        }

        @Override
        public synchronized void output(WindowedValue<OutputT> o) {
            this.outputs.add(o);
        }

        @Override
        protected void clearOutput() {
            this.outputs.clear();
        }

        @Override
        protected Iterator<WindowedValue<OutputT>> getOutputIterator() {
            return this.outputs.iterator();
        }
    }
}

