/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.samza.SamzaExecutionContext;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics;
import org.apache.beam.runners.samza.runtime.DoFnRunnerWithKeyedInternals;
import org.apache.beam.runners.samza.runtime.KeyedInternals;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.runners.samza.runtime.SamzaTimerInternalsFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.samza.context.Context;
import org.joda.time.Instant;

public class SamzaDoFnRunners {
    public static <InT, FnOutT> DoFnRunner<InT, FnOutT> create(SamzaPipelineOptions pipelineOptions, DoFn<InT, FnOutT> doFn, WindowingStrategy<?, ?> windowingStrategy, String stepName, String stateId, Context context, TupleTag<FnOutT> mainOutputTag, SideInputHandler sideInputHandler, SamzaTimerInternalsFactory<?> timerInternalsFactory, Coder<?> keyCoder, DoFnRunners.OutputManager outputManager, Coder<InT> inputCoder, List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, Coder<?>> outputCoders, DoFnSchemaInformation doFnSchemaInformation) {
        DoFnRunner doFnRunnerWithMetrics;
        TimerInternals timerInternals;
        StateInternals stateInternals;
        KeyedInternals keyedInternals;
        DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
        SamzaStoreStateInternals.Factory stateInternalsFactory = SamzaStoreStateInternals.createStateInternalFactory(stateId, keyCoder, context.getTaskContext(), pipelineOptions, signature);
        SamzaExecutionContext executionContext = (SamzaExecutionContext)context.getApplicationContainerContext();
        if (signature.usesState()) {
            keyedInternals = new KeyedInternals(stateInternalsFactory, timerInternalsFactory);
            stateInternals = keyedInternals.stateInternals();
            timerInternals = keyedInternals.timerInternals();
        } else {
            keyedInternals = null;
            stateInternals = stateInternalsFactory.stateInternalsForKey(null);
            timerInternals = timerInternalsFactory.timerInternalsForKey(null);
        }
        DoFnRunner underlyingRunner = DoFnRunners.simpleRunner((PipelineOptions)pipelineOptions, doFn, (SideInputReader)sideInputHandler, (DoFnRunners.OutputManager)outputManager, mainOutputTag, sideOutputTags, (StepContext)SamzaDoFnRunners.createStepContext(stateInternals, timerInternals), inputCoder, outputCoders, windowingStrategy, (DoFnSchemaInformation)doFnSchemaInformation);
        DoFnRunner doFnRunner = doFnRunnerWithMetrics = pipelineOptions.getEnableMetrics() != false ? DoFnRunnerWithMetrics.wrap(underlyingRunner, executionContext.getMetricsContainer(), stepName) : underlyingRunner;
        if (keyedInternals != null) {
            DoFnRunner statefulDoFnRunner = DoFnRunners.defaultStatefulDoFnRunner(doFn, doFnRunnerWithMetrics, windowingStrategy, (StatefulDoFnRunner.CleanupTimer)new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternals, windowingStrategy), SamzaDoFnRunners.createStateCleaner(doFn, windowingStrategy, keyedInternals.stateInternals()));
            return new DoFnRunnerWithKeyedInternals(statefulDoFnRunner, keyedInternals);
        }
        return doFnRunnerWithMetrics;
    }

    private static StepContext createStepContext(final StateInternals stateInternals, final TimerInternals timerInternals) {
        return new StepContext(){

            public StateInternals stateInternals() {
                return stateInternals;
            }

            public TimerInternals timerInternals() {
                return timerInternals;
            }
        };
    }

    private static <InT, FnOutT> StatefulDoFnRunner.StateCleaner<?> createStateCleaner(DoFn<InT, FnOutT> doFn, WindowingStrategy<?, ?> windowingStrategy, StateInternals stateInternals) {
        TypeDescriptor windowType = windowingStrategy.getWindowFn().getWindowTypeDescriptor();
        if (windowType.isSubtypeOf(TypeDescriptor.of(BoundedWindow.class))) {
            Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
            return new StatefulDoFnRunner.StateInternalsStateCleaner(doFn, stateInternals, windowCoder);
        }
        return null;
    }

    public static <InT, FnOutT> DoFnRunner<InT, FnOutT> createPortable(DoFnRunners.OutputManager outputManager, StageBundleFactory stageBundleFactory, TupleTag<FnOutT> mainOutputTag, Map<String, TupleTag<?>> idToTupleTagMap, Context context, String stepName) {
        SamzaExecutionContext executionContext = (SamzaExecutionContext)context.getApplicationContainerContext();
        SdkHarnessDoFnRunner sdkHarnessDoFnRunner = new SdkHarnessDoFnRunner(outputManager, stageBundleFactory, mainOutputTag, idToTupleTagMap);
        return DoFnRunnerWithMetrics.wrap(sdkHarnessDoFnRunner, executionContext.getMetricsContainer(), stepName);
    }

    private static class SdkHarnessDoFnRunner<InT, FnOutT>
    implements DoFnRunner<InT, FnOutT> {
        private final DoFnRunners.OutputManager outputManager;
        private final StageBundleFactory stageBundleFactory;
        private final TupleTag<FnOutT> mainOutputTag;
        private final Map<String, TupleTag<?>> idToTupleTagMap;
        private final LinkedBlockingQueue<KV<String, FnOutT>> outputQueue = new LinkedBlockingQueue();

        private SdkHarnessDoFnRunner(DoFnRunners.OutputManager outputManager, StageBundleFactory stageBundleFactory, TupleTag<FnOutT> mainOutputTag, Map<String, TupleTag<?>> idToTupleTagMap) {
            this.outputManager = outputManager;
            this.stageBundleFactory = stageBundleFactory;
            this.mainOutputTag = mainOutputTag;
            this.idToTupleTagMap = idToTupleTagMap;
        }

        public void startBundle() {
        }

        public void processElement(WindowedValue<InT> elem) {
            try {
                KV<String, FnOutT> result;
                OutputReceiverFactory receiverFactory = new OutputReceiverFactory(){

                    public FnDataReceiver<FnOutT> create(String pCollectionId) {
                        return receivedElement -> outputQueue.put(KV.of((Object)pCollectionId, (Object)receivedElement));
                    }
                };
                try (RemoteBundle bundle = this.stageBundleFactory.getBundle(receiverFactory, StateRequestHandler.unsupported(), BundleProgressHandler.ignored());){
                    ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept(elem);
                }
                while ((result = this.outputQueue.poll()) != null) {
                    this.outputManager.output(this.idToTupleTagMap.get(result.getKey()), (WindowedValue)result.getValue());
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
        }

        public void finishBundle() {
        }

        public DoFn<InT, FnOutT> getFn() {
            throw new UnsupportedOperationException();
        }
    }
}

