/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import com.google.auto.value.AutoValue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import org.apache.beam.repackaged.direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.direct_java.runners.core.KeyedWorkItems;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespaces;
import org.apache.beam.repackaged.direct_java.runners.core.StateTag;
import org.apache.beam.repackaged.direct_java.runners.core.StateTags;
import org.apache.beam.repackaged.direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.AutoValue_StatefulParDoEvaluatorFactory_AppliedPTransformOutputKeyAndWindow;
import org.apache.beam.runners.direct.CommittedBundle;
import org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.DirectTimerInternals;
import org.apache.beam.runners.direct.DoFnLifecycleManager;
import org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.ParDoEvaluator;
import org.apache.beam.runners.direct.ParDoEvaluatorFactory;
import org.apache.beam.runners.direct.ParDoMultiOverrideFactory;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.TransformResult;
import org.apache.beam.runners.direct.WatermarkManager;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering;
import org.joda.time.Instant;

final class StatefulParDoEvaluatorFactory<K, InputT, OutputT>
implements TransformEvaluatorFactory {
    private final ParDoEvaluatorFactory<KV<K, InputT>, OutputT> delegateFactory;
    private final EvaluationContext evaluationContext;

    StatefulParDoEvaluatorFactory(EvaluationContext evaluationContext, final PipelineOptions options) {
        this.delegateFactory = new ParDoEvaluatorFactory(evaluationContext, ParDoEvaluator.defaultRunnerFactory(), new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>(){

            public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> appliedStatefulParDo) throws Exception {
                ParDoMultiOverrideFactory.StatefulParDo statefulParDo = (ParDoMultiOverrideFactory.StatefulParDo)appliedStatefulParDo.getTransform();
                return DoFnLifecycleManager.of(statefulParDo.getDoFn(), options);
            }
        }, options);
        this.evaluationContext = evaluationContext;
    }

    public <T> TransformEvaluator<T> forApplication(AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception {
        TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> evaluator = this.createEvaluator(application, inputBundle);
        return evaluator;
    }

    @Override
    public void cleanup() throws Exception {
        this.delegateFactory.cleanup();
    }

    private TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> createEvaluator(AppliedPTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, ParDoMultiOverrideFactory.StatefulParDo<K, InputT, OutputT>> application, CommittedBundle<KeyedWorkItem<K, KV<K, InputT>>> inputBundle) throws Exception {
        DoFnLifecycleManagerRemovingTransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> delegateEvaluator = this.delegateFactory.createEvaluator(application, inputBundle.getPCollection(), inputBundle.getKey(), ((ParDoMultiOverrideFactory.StatefulParDo)application.getTransform()).getSideInputs(), ((ParDoMultiOverrideFactory.StatefulParDo)application.getTransform()).getMainOutputTag(), ((ParDoMultiOverrideFactory.StatefulParDo)application.getTransform()).getAdditionalOutputTags().getAll(), ((ParDoMultiOverrideFactory.StatefulParDo)application.getTransform()).getSchemaInformation(), ((ParDoMultiOverrideFactory.StatefulParDo)application.getTransform()).getSideInputMapping());
        DirectExecutionContext.DirectStepContext stepContext = this.evaluationContext.getExecutionContext(application, inputBundle.getKey()).getStepContext(this.evaluationContext.getStepName(application));
        ((CopyOnAccessInMemoryStateInternals)stepContext.stateInternals()).commit();
        return new StatefulParDoEvaluator<K, KV<K, InputT>>(delegateEvaluator, stepContext);
    }

    private static StateTag<WatermarkHoldState> setTimerTag(TimerInternals.TimerData timerData) {
        return StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("timer-" + timerData.getTimerId() + "+" + timerData.getTimerFamilyId(), TimestampCombiner.EARLIEST));
    }

    private static class StatefulParDoEvaluator<K, InputT>
    implements TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> {
        private final DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator;
        private final List<TimerInternals.TimerData> pushedBackTimers = new ArrayList<TimerInternals.TimerData>();
        private final DirectTimerInternals timerInternals;
        DirectExecutionContext.DirectStepContext stepContext;

        public StatefulParDoEvaluator(DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator, DirectExecutionContext.DirectStepContext stepContext) {
            this.delegateEvaluator = delegateEvaluator;
            this.timerInternals = delegateEvaluator.getParDoEvaluator().getStepContext().timerInternals();
            this.stepContext = stepContext;
        }

        @Override
        public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> gbkResult) throws Exception {
            for (WindowedValue windowedValue : ((KeyedWorkItem)gbkResult.getValue()).elementsIterable()) {
                this.delegateEvaluator.processElement(windowedValue);
            }
            PriorityQueue<TimerInternals.TimerData> toBeFiredTimers = new PriorityQueue<TimerInternals.TimerData>(Comparator.comparing(TimerInternals.TimerData::getTimestamp));
            Instant maxWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
            Instant maxProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
            Instant maxSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
            for (TimerInternals.TimerData timerData : ((KeyedWorkItem)gbkResult.getValue()).timersIterable()) {
                toBeFiredTimers.add(timerData);
                switch (timerData.getDomain()) {
                    case EVENT_TIME: {
                        maxWatermarkTime = (Instant)Ordering.natural().max((Object)maxWatermarkTime, (Object)timerData.getTimestamp());
                        break;
                    }
                    case PROCESSING_TIME: {
                        maxProcessingTime = (Instant)Ordering.natural().max((Object)maxProcessingTime, (Object)timerData.getTimestamp());
                        break;
                    }
                    case SYNCHRONIZED_PROCESSING_TIME: {
                        maxSynchronizedProcessingTime = (Instant)Ordering.natural().max((Object)maxSynchronizedProcessingTime, (Object)timerData.getTimestamp());
                    }
                }
            }
            while (!this.timerInternals.containsUpdateForTimeBefore(maxWatermarkTime, maxProcessingTime, maxSynchronizedProcessingTime) && !toBeFiredTimers.isEmpty()) {
                TimerInternals.TimerData timer = toBeFiredTimers.poll();
                Preconditions.checkState((boolean)(timer.getNamespace() instanceof StateNamespaces.WindowNamespace), (String)"Expected Timer %s to be in a %s, but got %s", (Object)timer, (Object)StateNamespaces.WindowNamespace.class.getSimpleName(), (Object)timer.getNamespace().getClass().getName());
                StateNamespaces.WindowNamespace windowNamespace = (StateNamespaces.WindowNamespace)timer.getNamespace();
                Object timerWindow = windowNamespace.getWindow();
                this.delegateEvaluator.onTimer(timer, ((KeyedWorkItem)gbkResult.getValue()).key(), (BoundedWindow)timerWindow);
                this.clearWatermarkHold(timer);
            }
            this.pushedBackTimers.addAll(toBeFiredTimers);
        }

        private void clearWatermarkHold(TimerInternals.TimerData timer) {
            StateTag timerWatermarkHoldTag = StatefulParDoEvaluatorFactory.setTimerTag(timer);
            ((WatermarkHoldState)this.stepContext.stateInternals().state(timer.getNamespace(), timerWatermarkHoldTag)).clear();
            ((CopyOnAccessInMemoryStateInternals)this.stepContext.stateInternals()).commit();
        }

        private void setWatermarkHold(TimerInternals.TimerData timer) {
            StateTag timerWatermarkHoldTag = StatefulParDoEvaluatorFactory.setTimerTag(timer);
            ((WatermarkHoldState)this.stepContext.stateInternals().state(timer.getNamespace(), timerWatermarkHoldTag)).add((Object)timer.getOutputTimestamp());
        }

        /*
         * WARNING - void declaration
         */
        @Override
        public TransformResult<KeyedWorkItem<K, KV<K, InputT>>> finishBundle() throws Exception {
            void var4_10;
            CopyOnAccessInMemoryStateInternals state;
            TransformResult<KV<K, InputT>> delegateResult = this.delegateEvaluator.finishBundle();
            boolean isTimerDeclared = false;
            for (TimerInternals.TimerData timerData : delegateResult.getTimerUpdate().getSetTimers()) {
                this.setWatermarkHold(timerData);
                isTimerDeclared = true;
            }
            for (TimerInternals.TimerData timerData : delegateResult.getTimerUpdate().getDeletedTimers()) {
                this.clearWatermarkHold(timerData);
            }
            if (isTimerDeclared && delegateResult.getState() != null) {
                state = delegateResult.getState();
                Instant instant = this.stepContext.commitState().getEarliestWatermarkHold();
            } else if (isTimerDeclared) {
                state = this.stepContext.commitState();
                Instant instant = state.getEarliestWatermarkHold();
            } else {
                state = delegateResult.getState();
                Instant instant = delegateResult.getWatermarkHold();
            }
            WatermarkManager.TimerUpdate timerUpdate = delegateResult.getTimerUpdate().withPushedBackTimers(this.pushedBackTimers);
            this.pushedBackTimers.clear();
            StepTransformResult.Builder regroupedResult = StepTransformResult.withHold(delegateResult.getTransform(), (Instant)var4_10).withTimerUpdate(timerUpdate).withState(state).withMetricUpdates(delegateResult.getLogicalMetricUpdates()).addOutput(Lists.newArrayList(delegateResult.getOutputBundles())).withBundleFinalizations(delegateResult.getBundleFinalizations());
            Iterator<WindowedValue<KV<K, InputT>>> iterator = delegateResult.getUnprocessedElements().iterator();
            while (iterator.hasNext()) {
                WindowedValue<KV<K, InputT>> untypedUnprocessed;
                WindowedValue<KV<K, InputT>> windowedKv = untypedUnprocessed = iterator.next();
                WindowedValue pushedBack = windowedKv.withValue(KeyedWorkItems.elementsWorkItem(((KV)windowedKv.getValue()).getKey(), Collections.singleton(windowedKv)));
                regroupedResult.addUnprocessedElements(pushedBack);
            }
            return regroupedResult.build();
        }
    }

    @AutoValue
    static abstract class AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> {
        AppliedPTransformOutputKeyAndWindow() {
        }

        abstract AppliedPTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, ParDoMultiOverrideFactory.StatefulParDo<K, InputT, OutputT>> getTransform();

        abstract StructuralKey<K> getKey();

        abstract BoundedWindow getWindow();

        static <K, InputT, OutputT> AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> create(AppliedPTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, ParDoMultiOverrideFactory.StatefulParDo<K, InputT, OutputT>> transform, StructuralKey<K> key, BoundedWindow w) {
            return new AutoValue_StatefulParDoEvaluatorFactory_AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT>(transform, key, w);
        }
    }
}

