/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.control.TimerReceiverFactory;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.samza.SamzaExecutionContext;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics;
import org.apache.beam.runners.samza.runtime.DoFnRunnerWithKeyedInternals;
import org.apache.beam.runners.samza.runtime.KeyedInternals;
import org.apache.beam.runners.samza.runtime.SamzaStateRequestHandlers;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.runners.samza.runtime.SamzaTimerInternalsFactory;
import org.apache.beam.runners.samza.util.StateUtils;
import org.apache.beam.runners.samza.util.WindowUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.samza.context.Context;
import org.joda.time.Instant;

public class SamzaDoFnRunners {
    public static <InT, FnOutT> DoFnRunner<InT, FnOutT> create(SamzaPipelineOptions pipelineOptions, DoFn<InT, FnOutT> doFn, WindowingStrategy<?, ?> windowingStrategy, String transformFullName, String transformId, Context context, TupleTag<FnOutT> mainOutputTag, SideInputHandler sideInputHandler, SamzaTimerInternalsFactory<?> timerInternalsFactory, Coder<?> keyCoder, DoFnRunners.OutputManager outputManager, Coder<InT> inputCoder, List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, Coder<?>> outputCoders, DoFnSchemaInformation doFnSchemaInformation, Map<String, PCollectionView<?>> sideInputMapping) {
        DoFnRunner doFnRunnerWithMetrics;
        TimerInternals timerInternals;
        StateInternals stateInternals;
        KeyedInternals keyedInternals;
        DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
        SamzaStoreStateInternals.Factory<?> stateInternalsFactory = SamzaStoreStateInternals.createStateInternalsFactory(transformId, keyCoder, context.getTaskContext(), pipelineOptions, signature);
        SamzaExecutionContext executionContext = (SamzaExecutionContext)context.getApplicationContainerContext();
        if (StateUtils.isStateful(doFn)) {
            keyedInternals = new KeyedInternals(stateInternalsFactory, timerInternalsFactory);
            stateInternals = keyedInternals.stateInternals();
            timerInternals = keyedInternals.timerInternals();
        } else {
            keyedInternals = null;
            stateInternals = stateInternalsFactory.stateInternalsForKey(null);
            timerInternals = timerInternalsFactory.timerInternalsForKey(null);
        }
        StepContext stepContext = SamzaDoFnRunners.createStepContext(stateInternals, timerInternals);
        DoFnRunner underlyingRunner = DoFnRunners.simpleRunner((PipelineOptions)pipelineOptions, doFn, (SideInputReader)sideInputHandler, (DoFnRunners.OutputManager)outputManager, mainOutputTag, sideOutputTags, (StepContext)stepContext, inputCoder, outputCoders, windowingStrategy, (DoFnSchemaInformation)doFnSchemaInformation, sideInputMapping);
        DoFnRunner doFnRunner = doFnRunnerWithMetrics = pipelineOptions.getEnableMetrics() != false ? DoFnRunnerWithMetrics.wrap(underlyingRunner, executionContext.getMetricsContainer(), transformFullName) : underlyingRunner;
        if (keyedInternals != null) {
            DoFnRunner statefulDoFnRunner = DoFnRunners.defaultStatefulDoFnRunner(doFn, inputCoder, doFnRunnerWithMetrics, (StepContext)stepContext, windowingStrategy, (StatefulDoFnRunner.CleanupTimer)new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternals, windowingStrategy), SamzaDoFnRunners.createStateCleaner(doFn, windowingStrategy, keyedInternals.stateInternals()));
            return new DoFnRunnerWithKeyedInternals(statefulDoFnRunner, keyedInternals);
        }
        return doFnRunnerWithMetrics;
    }

    private static StepContext createStepContext(final StateInternals stateInternals, final TimerInternals timerInternals) {
        return new StepContext(){

            public StateInternals stateInternals() {
                return stateInternals;
            }

            public TimerInternals timerInternals() {
                return timerInternals;
            }
        };
    }

    private static <InT, FnOutT> StatefulDoFnRunner.StateCleaner<?> createStateCleaner(DoFn<InT, FnOutT> doFn, WindowingStrategy<?, ?> windowingStrategy, StateInternals stateInternals) {
        TypeDescriptor windowType = windowingStrategy.getWindowFn().getWindowTypeDescriptor();
        if (windowType.isSubtypeOf(TypeDescriptor.of(BoundedWindow.class))) {
            Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
            return new StatefulDoFnRunner.StateInternalsStateCleaner(doFn, stateInternals, windowCoder);
        }
        return null;
    }

    public static <InT, FnOutT> DoFnRunner<InT, FnOutT> createPortable(String transformId, String stepName, String bundleStateId, Coder<WindowedValue<InT>> windowedValueCoder, ExecutableStage executableStage, Map<?, PCollectionView<?>> sideInputMapping, SideInputHandler sideInputHandler, SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory, SamzaTimerInternalsFactory<?> timerInternalsFactory, SamzaPipelineOptions pipelineOptions, DoFnRunners.OutputManager outputManager, StageBundleFactory stageBundleFactory, SamzaExecutionContext samzaExecutionContext, TupleTag<FnOutT> mainOutputTag, Map<String, TupleTag<?>> idToTupleTagMap, Context context, String transformFullName) {
        BagState bundledEventsBag = (BagState)nonKeyedStateInternalsFactory.stateInternalsForKey(null).state(StateNamespaces.global(), StateTags.bag((String)bundleStateId, windowedValueCoder));
        StateRequestHandler stateRequestHandler = SamzaStateRequestHandlers.of(transformId, context.getTaskContext(), pipelineOptions, executableStage, stageBundleFactory, sideInputMapping, sideInputHandler);
        SamzaExecutionContext executionContext = (SamzaExecutionContext)context.getApplicationContainerContext();
        SdkHarnessDoFnRunner underlyingRunner = new SdkHarnessDoFnRunner(stepName, timerInternalsFactory, WindowUtils.getWindowStrategy(executableStage.getInputPCollection().getId(), executableStage.getComponents()), outputManager, stageBundleFactory, idToTupleTagMap, bundledEventsBag, stateRequestHandler, samzaExecutionContext);
        return pipelineOptions.getEnableMetrics() != false ? DoFnRunnerWithMetrics.wrap(underlyingRunner, executionContext.getMetricsContainer(), transformFullName) : underlyingRunner;
    }

    private static class SdkHarnessDoFnRunner<InT, FnOutT>
    implements DoFnRunner<InT, FnOutT> {
        private static final int DEFAULT_METRIC_SAMPLE_RATE = 100;
        private final SamzaTimerInternalsFactory timerInternalsFactory;
        private final WindowingStrategy windowingStrategy;
        private final DoFnRunners.OutputManager outputManager;
        private final StageBundleFactory stageBundleFactory;
        private final Map<String, TupleTag<?>> idToTupleTagMap;
        private final LinkedBlockingQueue<KV<String, FnOutT>> outputQueue = new LinkedBlockingQueue();
        private final BagState<WindowedValue<InT>> bundledEventsBag;
        private RemoteBundle remoteBundle;
        private FnDataReceiver<WindowedValue<?>> inputReceiver;
        private final StateRequestHandler stateRequestHandler;
        private final SamzaExecutionContext samzaExecutionContext;
        private long startBundleTime;
        private final String metricName;

        private SdkHarnessDoFnRunner(String stepName, SamzaTimerInternalsFactory<?> timerInternalsFactory, WindowingStrategy windowingStrategy, DoFnRunners.OutputManager outputManager, StageBundleFactory stageBundleFactory, Map<String, TupleTag<?>> idToTupleTagMap, BagState<WindowedValue<InT>> bundledEventsBag, StateRequestHandler stateRequestHandler, SamzaExecutionContext samzaExecutionContext) {
            this.timerInternalsFactory = timerInternalsFactory;
            this.windowingStrategy = windowingStrategy;
            this.outputManager = outputManager;
            this.stageBundleFactory = stageBundleFactory;
            this.idToTupleTagMap = idToTupleTagMap;
            this.bundledEventsBag = bundledEventsBag;
            this.stateRequestHandler = stateRequestHandler;
            this.samzaExecutionContext = samzaExecutionContext;
            this.metricName = "ExecutableStage-" + stepName + "-process-ns";
        }

        private void timerDataConsumer(Timer<?> timerElement, TimerInternals.TimerData timerData) {
            TimerInternals timerInternals = this.timerInternalsFactory.timerInternalsForKey(timerElement.getUserKey());
            if (timerElement.getClearBit()) {
                timerInternals.deleteTimer(timerData);
            } else {
                timerInternals.setTimer(timerData);
            }
        }

        public void startBundle() {
            try {
                OutputReceiverFactory receiverFactory = new OutputReceiverFactory(){

                    public FnDataReceiver<FnOutT> create(String pCollectionId) {
                        return receivedElement -> outputQueue.put(KV.of((Object)pCollectionId, (Object)receivedElement));
                    }
                };
                Coder windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
                TimerReceiverFactory timerReceiverFactory = new TimerReceiverFactory(this.stageBundleFactory, this::timerDataConsumer, windowCoder);
                this.remoteBundle = this.stageBundleFactory.getBundle(receiverFactory, timerReceiverFactory, this.stateRequestHandler, BundleProgressHandler.ignored());
                this.startBundleTime = this.getStartBundleTime();
                this.inputReceiver = (FnDataReceiver)Iterables.getOnlyElement(this.remoteBundle.getInputReceivers().values());
                this.bundledEventsBag.read().forEach(elem -> {
                    try {
                        this.inputReceiver.accept(elem);
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private long getStartBundleTime() {
            return ThreadLocalRandom.current().nextInt() % 100 == 0 ? System.nanoTime() : 0L;
        }

        public void processElement(WindowedValue<InT> elem) {
            try {
                this.bundledEventsBag.add(elem);
                this.inputReceiver.accept(elem);
                this.emitResults();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private void emitResults() {
            KV<String, FnOutT> result;
            while ((result = this.outputQueue.poll()) != null) {
                this.outputManager.output(this.idToTupleTagMap.get(result.getKey()), (WindowedValue)result.getValue());
            }
        }

        private void emitMetrics() {
            if (this.startBundleTime <= 0L) {
                return;
            }
            long count = Iterables.size((Iterable)this.bundledEventsBag.read());
            if (count <= 0L) {
                return;
            }
            long finishBundleTime = System.nanoTime();
            long averageProcessTime = (finishBundleTime - this.startBundleTime) / count;
            this.samzaExecutionContext.getMetricsContainer().updateExecutableStageBundleMetric(this.metricName, averageProcessTime);
        }

        public <KeyT> void onTimer(String timerId, String timerFamilyId, KeyT key, BoundedWindow window, Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) {
            KV timerReceiverKey = TimerReceiverFactory.decodeTimerDataTimerId((String)timerFamilyId);
            FnDataReceiver timerReceiver = (FnDataReceiver)this.remoteBundle.getTimerReceivers().get(timerReceiverKey);
            Timer timerValue = Timer.of(key, (String)timerId, Collections.singletonList(window), (Instant)timestamp, (Instant)outputTimestamp, (PaneInfo)PaneInfo.NO_FIRING);
            try {
                timerReceiver.accept((Object)timerValue);
            }
            catch (Exception e) {
                throw new RuntimeException(String.format(Locale.ENGLISH, "Failed to process timer %s", timerReceiver), e);
            }
        }

        public void finishBundle() {
            try {
                this.remoteBundle.close();
                this.emitResults();
                this.emitMetrics();
                this.bundledEventsBag.clear();
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to finish remote bundle", e);
            }
            finally {
                this.remoteBundle = null;
                this.inputReceiver = null;
            }
        }

        public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
        }

        public DoFn<InT, FnOutT> getFn() {
            throw new UnsupportedOperationException();
        }
    }
}

