/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.util.List;
import java.util.Map;
import org.apache.beam.repackaged.beam_runners_samza.com.google.common.base.Preconditions;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics;
import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
import org.apache.beam.runners.samza.runtime.KeyedInternals;
import org.apache.beam.runners.samza.runtime.KeyedTimerData;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.runners.samza.runtime.SamzaTimerInternalsFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;

public class DoFnRunnerWithKeyedInternals<InputT, OutputT>
implements DoFnRunner<InputT, OutputT> {
    private final DoFnRunner<InputT, OutputT> underlying;
    private final KeyedInternals keyedInternals;

    public static <InputT, OutputT> DoFnRunner<InputT, OutputT> of(PipelineOptions options, DoFn<InputT, OutputT> doFn, SideInputReader sideInputReader, DoFnRunners.OutputManager outputManager, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, Coder<InputT> inputCoder, Map<TupleTag<?>, Coder<?>> outputCoders, SamzaStoreStateInternals.Factory<?> stateInternalsFactory, SamzaTimerInternalsFactory<?> timerInternalsFactory, WindowingStrategy<?, ?> windowingStrategy, SamzaMetricsContainer metricsContainer, String stepName) {
        TimerInternals timerInternals;
        StateInternals stateInternals;
        KeyedInternals keyedInternals;
        DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
        if (signature.usesState()) {
            keyedInternals = new KeyedInternals(stateInternalsFactory, timerInternalsFactory);
            stateInternals = keyedInternals.stateInternals();
            timerInternals = keyedInternals.timerInternals();
        } else {
            keyedInternals = null;
            stateInternals = stateInternalsFactory.stateInternalsForKey(null);
            timerInternals = timerInternalsFactory.timerInternalsForKey(null);
        }
        DoFnRunner doFnRunner = DoFnRunners.simpleRunner((PipelineOptions)options, doFn, (SideInputReader)sideInputReader, (DoFnRunners.OutputManager)outputManager, mainOutputTag, additionalOutputTags, (StepContext)DoFnRunnerWithKeyedInternals.createStepContext(stateInternals, timerInternals), inputCoder, outputCoders, windowingStrategy);
        DoFnRunner doFnRunnerWithMetrics = DoFnRunnerWithMetrics.wrap(doFnRunner, metricsContainer, stepName);
        if (keyedInternals != null) {
            DoFnRunner statefulDoFnRunner = DoFnRunners.defaultStatefulDoFnRunner(doFn, doFnRunnerWithMetrics, windowingStrategy, (StatefulDoFnRunner.CleanupTimer)new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternals, windowingStrategy), DoFnRunnerWithKeyedInternals.createStateCleaner(doFn, windowingStrategy, keyedInternals.stateInternals()));
            return new DoFnRunnerWithKeyedInternals<InputT, OutputT>(statefulDoFnRunner, keyedInternals);
        }
        return doFnRunnerWithMetrics;
    }

    private static StepContext createStepContext(final StateInternals stateInternals, final TimerInternals timerInternals) {
        return new StepContext(){

            public StateInternals stateInternals() {
                return stateInternals;
            }

            public TimerInternals timerInternals() {
                return timerInternals;
            }
        };
    }

    private static <InputT, OutputT> StatefulDoFnRunner.StateCleaner<?> createStateCleaner(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, StateInternals stateInternals) {
        TypeDescriptor windowType = windowingStrategy.getWindowFn().getWindowTypeDescriptor();
        if (windowType.isSubtypeOf(TypeDescriptor.of(BoundedWindow.class))) {
            Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
            return new StatefulDoFnRunner.StateInternalsStateCleaner(doFn, stateInternals, windowCoder);
        }
        return null;
    }

    private DoFnRunnerWithKeyedInternals(DoFnRunner<InputT, OutputT> doFnRunner, KeyedInternals keyedInternals) {
        this.underlying = doFnRunner;
        this.keyedInternals = keyedInternals;
    }

    public void startBundle() {
        this.underlying.startBundle();
    }

    public void processElement(WindowedValue<InputT> elem) {
        this.setKeyedInternals(elem.getValue());
        try {
            this.underlying.processElement(elem);
        }
        finally {
            this.clearKeyedInternals();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTimer(KeyedTimerData keyedTimerData, BoundedWindow window) {
        this.setKeyedInternals(keyedTimerData);
        try {
            TimerInternals.TimerData timer = keyedTimerData.getTimerData();
            this.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
        }
        finally {
            this.clearKeyedInternals();
        }
    }

    public void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
        Preconditions.checkState(this.keyedInternals.getKey() != null, "Key is not set for timer");
        this.underlying.onTimer(timerId, window, timestamp, timeDomain);
    }

    public void finishBundle() {
        this.underlying.finishBundle();
    }

    public DoFn<InputT, OutputT> getFn() {
        return this.underlying.getFn();
    }

    private void setKeyedInternals(Object value) {
        if (value instanceof KeyedWorkItem) {
            this.keyedInternals.setKey(((KeyedWorkItem)value).key());
        } else if (value instanceof KeyedTimerData) {
            Object key = ((KeyedTimerData)value).getKey();
            if (key != null) {
                this.keyedInternals.setKey(key);
            }
        } else {
            this.keyedInternals.setKey(((KV)value).getKey());
        }
    }

    private void clearKeyedInternals() {
        this.keyedInternals.clearKey();
    }
}

