/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation;

import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.util.BroadcastHelper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.state.InMemoryStateInternals;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class SparkProcessContext<InputT, OutputT, ValueT>
extends DoFn.ProcessContext {
    private static final Logger LOG = LoggerFactory.getLogger(SparkProcessContext.class);
    private final DoFn<InputT, OutputT> fn;
    private final SparkRuntimeContext mRuntimeContext;
    private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
    protected WindowedValue<InputT> windowedValue;

    SparkProcessContext(DoFn<InputT, OutputT> fn, SparkRuntimeContext runtime, Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
        super(fn);
        this.fn = fn;
        this.mRuntimeContext = runtime;
        this.mSideInputs = sideInputs;
    }

    void setup() {
        this.setupDelegateAggregators();
    }

    public PipelineOptions getPipelineOptions() {
        return this.mRuntimeContext.getPipelineOptions();
    }

    public <T> T sideInput(PCollectionView<T> view) {
        BroadcastHelper<?> broadcastHelper = this.mSideInputs.get(view.getTagInternal());
        Iterable contents = (Iterable)broadcastHelper.getValue();
        return (T)view.fromIterableInternal(contents);
    }

    public abstract void output(OutputT var1);

    public abstract void output(WindowedValue<OutputT> var1);

    public <T> void sideOutput(TupleTag<T> tupleTag, T t) {
        String message = "sideOutput is an unsupported operation for doFunctions, use a MultiDoFunction instead.";
        LOG.warn(message);
        throw new UnsupportedOperationException(message);
    }

    public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) {
        String message = "sideOutputWithTimestamp is an unsupported operation for doFunctions, use a MultiDoFunction instead.";
        LOG.warn(message);
        throw new UnsupportedOperationException(message);
    }

    public <AggregatprInputT, AggregatorOutputT> Aggregator<AggregatprInputT, AggregatorOutputT> createAggregatorInternal(String named, Combine.CombineFn<AggregatprInputT, ?, AggregatorOutputT> combineFn) {
        return this.mRuntimeContext.createAggregator(named, combineFn);
    }

    public InputT element() {
        return (InputT)this.windowedValue.getValue();
    }

    public void outputWithTimestamp(OutputT output, Instant timestamp) {
        this.output(WindowedValue.of(output, (Instant)timestamp, (Collection)this.windowedValue.getWindows(), (PaneInfo)this.windowedValue.getPane()));
    }

    public Instant timestamp() {
        return this.windowedValue.getTimestamp();
    }

    public BoundedWindow window() {
        if (!(this.fn instanceof DoFn.RequiresWindowAccess)) {
            throw new UnsupportedOperationException("window() is only available in the context of a DoFn marked as RequiresWindowAccess.");
        }
        return (BoundedWindow)Iterables.getOnlyElement((Iterable)this.windowedValue.getWindows());
    }

    public PaneInfo pane() {
        return this.windowedValue.getPane();
    }

    public WindowingInternals<InputT, OutputT> windowingInternals() {
        return new WindowingInternals<InputT, OutputT>(){

            public Collection<? extends BoundedWindow> windows() {
                return SparkProcessContext.this.windowedValue.getWindows();
            }

            public void outputWindowedValue(OutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo paneInfo) {
                SparkProcessContext.this.output(WindowedValue.of(output, (Instant)timestamp, windows, (PaneInfo)paneInfo));
            }

            public StateInternals stateInternals() {
                return InMemoryStateInternals.forKey((Object)"DUMMY");
            }

            public TimerInternals timerInternals() {
                throw new UnsupportedOperationException("WindowingInternals#timerInternals() is not yet supported.");
            }

            public PaneInfo pane() {
                return SparkProcessContext.this.windowedValue.getPane();
            }

            public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
                throw new UnsupportedOperationException("WindowingInternals#writePCollectionViewData() is not yet supported.");
            }

            public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
                throw new UnsupportedOperationException("WindowingInternals#sideInput() is not yet supported.");
            }
        };
    }

    protected abstract void clearOutput();

    protected abstract Iterator<ValueT> getOutputIterator();

    protected Iterable<ValueT> getOutputIterable(final Iterator<WindowedValue<InputT>> iter, final DoFn<InputT, OutputT> doFn) {
        return new Iterable<ValueT>(){

            @Override
            public Iterator<ValueT> iterator() {
                return new ProcCtxtIterator(iter, doFn);
            }
        };
    }

    public static class SparkProcessException
    extends RuntimeException {
        SparkProcessException(Throwable t) {
            super(t);
        }
    }

    private class ProcCtxtIterator
    extends AbstractIterator<ValueT> {
        private final Iterator<WindowedValue<InputT>> inputIterator;
        private final DoFn<InputT, OutputT> doFn;
        private Iterator<ValueT> outputIterator;
        private boolean calledFinish;

        ProcCtxtIterator(Iterator<WindowedValue<InputT>> iterator, DoFn<InputT, OutputT> doFn) {
            this.inputIterator = iterator;
            this.doFn = doFn;
            this.outputIterator = SparkProcessContext.this.getOutputIterator();
        }

        protected ValueT computeNext() {
            while (true) {
                if (this.outputIterator.hasNext()) {
                    return this.outputIterator.next();
                }
                if (this.inputIterator.hasNext()) {
                    SparkProcessContext.this.clearOutput();
                    SparkProcessContext.this.windowedValue = this.inputIterator.next();
                    try {
                        this.doFn.processElement((DoFn.ProcessContext)SparkProcessContext.this);
                    }
                    catch (Exception e) {
                        throw new SparkProcessException(e);
                    }
                    this.outputIterator = SparkProcessContext.this.getOutputIterator();
                    continue;
                }
                if (this.calledFinish) break;
                SparkProcessContext.this.clearOutput();
                try {
                    this.calledFinish = true;
                    this.doFn.finishBundle((DoFn.Context)SparkProcessContext.this);
                }
                catch (Exception e) {
                    throw new SparkProcessException(e);
                }
                this.outputIterator = SparkProcessContext.this.getOutputIterator();
            }
            return this.endOfData();
        }
    }
}

