/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.direct_java.runners.core;

import java.util.List;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.direct_java.runners.core.KeyedWorkItemCoder;
import org.apache.beam.repackaged.direct_java.runners.core.SplittableProcessElementInvoker;
import org.apache.beam.repackaged.direct_java.runners.core.StateInternals;
import org.apache.beam.repackaged.direct_java.runners.core.StateInternalsFactory;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespace;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespaces;
import org.apache.beam.repackaged.direct_java.runners.core.StateTag;
import org.apache.beam.repackaged.direct_java.runners.core.StateTags;
import org.apache.beam.repackaged.direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.direct_java.runners.core.TimerInternalsFactory;
import org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformReplacements;
import org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation;
import org.apache.beam.repackaged.direct_java.runners.core.construction.ReplacementOutputs;
import org.apache.beam.repackaged.direct_java.runners.core.construction.SplittableParDo;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;

public class SplittableParDoViaKeyedWorkItems {

    @VisibleForTesting
    public static class ProcessFn<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>
    extends DoFn<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> {
        private static final StateTag<WatermarkHoldState> watermarkHoldTag = StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("hold", TimestampCombiner.LATEST));
        private final StateTag<ValueState<WindowedValue<InputT>>> elementTag;
        private StateTag<ValueState<RestrictionT>> restrictionTag;
        private StateTag<ValueState<WatermarkEstimatorStateT>> watermarkEstimatorStateTag;
        private final DoFn<InputT, OutputT> fn;
        private final Coder<InputT> elementCoder;
        private final Coder<RestrictionT> restrictionCoder;
        private final WindowingStrategy<InputT, ?> inputWindowingStrategy;
        private transient @Nullable StateInternalsFactory<byte[]> stateInternalsFactory;
        private transient @Nullable TimerInternalsFactory<byte[]> timerInternalsFactory;
        private transient @Nullable SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> processElementInvoker;
        private transient @Nullable DoFnInvoker<InputT, OutputT> invoker;

        public ProcessFn(DoFn<InputT, OutputT> fn, Coder<InputT> elementCoder, Coder<RestrictionT> restrictionCoder, Coder<WatermarkEstimatorStateT> watermarkEstimatorStateCoder, WindowingStrategy<InputT, ?> inputWindowingStrategy) {
            this.fn = fn;
            this.elementCoder = elementCoder;
            this.restrictionCoder = restrictionCoder;
            this.inputWindowingStrategy = inputWindowingStrategy;
            this.elementTag = StateTags.value("element", WindowedValue.getFullCoder(elementCoder, (Coder)inputWindowingStrategy.getWindowFn().windowCoder()));
            this.restrictionTag = StateTags.value("restriction", restrictionCoder);
            this.watermarkEstimatorStateTag = StateTags.value("watermarkEstimatorState", watermarkEstimatorStateCoder);
        }

        public void setStateInternalsFactory(StateInternalsFactory<byte[]> stateInternalsFactory) {
            this.stateInternalsFactory = stateInternalsFactory;
        }

        public void setTimerInternalsFactory(TimerInternalsFactory<byte[]> timerInternalsFactory) {
            this.timerInternalsFactory = timerInternalsFactory;
        }

        public void setProcessElementInvoker(SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> invoker) {
            this.processElementInvoker = invoker;
        }

        public DoFn<InputT, OutputT> getFn() {
            return this.fn;
        }

        public Coder<InputT> getElementCoder() {
            return this.elementCoder;
        }

        public Coder<RestrictionT> getRestrictionCoder() {
            return this.restrictionCoder;
        }

        public WindowingStrategy<InputT, ?> getInputWindowingStrategy() {
            return this.inputWindowingStrategy;
        }

        @DoFn.Setup
        public void setup(PipelineOptions options) throws Exception {
            this.invoker = DoFnInvokers.invokerFor(this.fn);
            this.invoker.invokeSetup(this.wrapOptionsAsSetup(options));
        }

        @DoFn.Teardown
        public void tearDown() throws Exception {
            this.invoker.invokeTeardown();
        }

        @DoFn.StartBundle
        public void startBundle(DoFn.StartBundleContext c) throws Exception {
            this.invoker.invokeStartBundle(this.wrapContextAsStartBundle(c));
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn.FinishBundleContext c) throws Exception {
            this.invoker.invokeFinishBundle(this.wrapContextAsFinishBundle(c));
        }

        @DoFn.ProcessElement
        public void processElement(final DoFn.ProcessContext c, final BoundedWindow boundedWindow) {
            Object watermarkEstimatorStateT;
            KV elementAndRestriction;
            StateNamespace stateNamespace;
            boolean isSeedCall;
            byte[] key = (byte[])((KeyedWorkItem)c.element()).key();
            StateInternals stateInternals = this.stateInternalsFactory.stateInternalsForKey(key);
            TimerInternals timerInternals = this.timerInternalsFactory.timerInternalsForKey(key);
            TimerInternals.TimerData timer = (TimerInternals.TimerData)Iterables.getOnlyElement(((KeyedWorkItem)c.element()).timersIterable(), null);
            boolean bl = isSeedCall = timer == null;
            if (isSeedCall) {
                WindowedValue windowedValue = (WindowedValue)Iterables.getOnlyElement(((KeyedWorkItem)c.element()).elementsIterable());
                BoundedWindow window = (BoundedWindow)Iterables.getOnlyElement((Iterable)windowedValue.getWindows());
                stateNamespace = StateNamespaces.window(this.inputWindowingStrategy.getWindowFn().windowCoder(), window);
            } else {
                stateNamespace = timer.getNamespace();
            }
            ValueState<WindowedValue<InputT>> elementState = stateInternals.state(stateNamespace, this.elementTag);
            ValueState<RestrictionT> restrictionState = stateInternals.state(stateNamespace, this.restrictionTag);
            ValueState<WatermarkEstimatorStateT> watermarkEstimatorState = stateInternals.state(stateNamespace, this.watermarkEstimatorStateTag);
            WatermarkHoldState holdState = stateInternals.state(stateNamespace, watermarkHoldTag);
            if (isSeedCall) {
                WindowedValue windowedValue = (WindowedValue)Iterables.getOnlyElement(((KeyedWorkItem)c.element()).elementsIterable());
                WindowedValue element = windowedValue.withValue(((KV)windowedValue.getValue()).getKey());
                elementState.write((Object)element);
                elementAndRestriction = KV.of((Object)element, (Object)((KV)windowedValue.getValue()).getValue());
                watermarkEstimatorStateT = this.invoker.invokeGetInitialWatermarkEstimatorState((DoFnInvoker.ArgumentProvider)new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>(){

                    public InputT element(DoFn<InputT, OutputT> doFn) {
                        return ((WindowedValue)elementAndRestriction.getKey()).getValue();
                    }

                    public Object restriction() {
                        return elementAndRestriction.getValue();
                    }

                    public Instant timestamp(DoFn<InputT, OutputT> doFn) {
                        return c.timestamp();
                    }

                    public PipelineOptions pipelineOptions() {
                        return c.getPipelineOptions();
                    }

                    public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
                        return c.pane();
                    }

                    public BoundedWindow window() {
                        return boundedWindow;
                    }

                    public String getErrorContext() {
                        return ProcessFn.class.getSimpleName() + ".invokeGetInitialWatermarkEstimatorState";
                    }
                });
            } else {
                elementState.readLater();
                restrictionState.readLater();
                watermarkEstimatorState.readLater();
                elementAndRestriction = KV.of((Object)((WindowedValue)elementState.read()), (Object)restrictionState.read());
                watermarkEstimatorStateT = watermarkEstimatorState.read();
            }
            WatermarkEstimator watermarkEstimator = this.invoker.invokeNewWatermarkEstimator((DoFnInvoker.ArgumentProvider)new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>(){

                public InputT element(DoFn<InputT, OutputT> doFn) {
                    return ((WindowedValue)elementAndRestriction.getKey()).getValue();
                }

                public Object restriction() {
                    return elementAndRestriction.getValue();
                }

                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
                    return c.timestamp();
                }

                public PipelineOptions pipelineOptions() {
                    return c.getPipelineOptions();
                }

                public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
                    return c.pane();
                }

                public BoundedWindow window() {
                    return boundedWindow;
                }

                public Object watermarkEstimatorState() {
                    return watermarkEstimatorStateT;
                }

                public String getErrorContext() {
                    return ProcessFn.class.getSimpleName() + ".invokeNewWatermarkEstimator";
                }
            });
            RestrictionTracker tracker = this.invoker.invokeNewTracker((DoFnInvoker.ArgumentProvider)new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>(){

                public InputT element(DoFn<InputT, OutputT> doFn) {
                    return ((WindowedValue)elementAndRestriction.getKey()).getValue();
                }

                public Object restriction() {
                    return elementAndRestriction.getValue();
                }

                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
                    return c.timestamp();
                }

                public PipelineOptions pipelineOptions() {
                    return c.getPipelineOptions();
                }

                public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
                    return c.pane();
                }

                public BoundedWindow window() {
                    return boundedWindow;
                }

                public String getErrorContext() {
                    return ProcessFn.class.getSimpleName() + ".invokeNewTracker";
                }
            });
            SplittableProcessElementInvoker.Result result = this.processElementInvoker.invokeProcessElement(this.invoker, (WindowedValue)elementAndRestriction.getKey(), tracker, watermarkEstimator);
            if (result.getResidualRestriction() == null) {
                elementState.clear();
                restrictionState.clear();
                watermarkEstimatorState.clear();
                holdState.clear();
                return;
            }
            restrictionState.write(result.getResidualRestriction());
            watermarkEstimatorState.write(result.getFutureWatermarkEstimatorState());
            @Nullable Instant futureOutputWatermark = result.getFutureOutputWatermark();
            if (futureOutputWatermark == null) {
                futureOutputWatermark = ((WindowedValue)elementAndRestriction.getKey()).getTimestamp();
            }
            Instant wakeupTime = timerInternals.currentProcessingTime().plus((ReadableDuration)result.getContinuation().resumeDelay());
            holdState.add((Object)futureOutputWatermark);
            timerInternals.setTimer(TimerInternals.TimerData.of(stateNamespace, wakeupTime, wakeupTime, TimeDomain.PROCESSING_TIME));
        }

        private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapOptionsAsSetup(final PipelineOptions options) {
            return new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>(){

                public PipelineOptions pipelineOptions() {
                    return options;
                }

                public String getErrorContext() {
                    return "SplittableParDoViaKeyedWorkItems/Setup";
                }
            };
        }

        private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapContextAsStartBundle(final DoFn.StartBundleContext baseContext) {
            return new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>(){

                public DoFn.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
                    return new DoFn.StartBundleContext(fn){

                        public PipelineOptions getPipelineOptions() {
                            return baseContext.getPipelineOptions();
                        }
                    };
                }

                public PipelineOptions pipelineOptions() {
                    return baseContext.getPipelineOptions();
                }

                public String getErrorContext() {
                    return "SplittableParDoViaKeyedWorkItems/StartBundle";
                }
            };
        }

        private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapContextAsFinishBundle(final DoFn.FinishBundleContext baseContext) {
            return new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>(){

                public DoFn.FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
                    return new DoFn.FinishBundleContext(fn){

                        public void output(OutputT output, Instant timestamp, BoundedWindow window) {
                            this.throwUnsupportedOutput();
                        }

                        public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
                            this.throwUnsupportedOutput();
                        }

                        public PipelineOptions getPipelineOptions() {
                            return baseContext.getPipelineOptions();
                        }

                        private void throwUnsupportedOutput() {
                            throw new UnsupportedOperationException(String.format("KWI Splittable DoFn can only output from @%s", DoFn.ProcessElement.class.getSimpleName()));
                        }
                    };
                }

                public PipelineOptions pipelineOptions() {
                    return baseContext.getPipelineOptions();
                }

                public String getErrorContext() {
                    return "SplittableParDoViaKeyedWorkItems/FinishBundle";
                }
            };
        }
    }

    public static class ProcessElements<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>
    extends PTransform<PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple> {
        private final SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT, WatermarkEstimatorStateT> original;

        public ProcessElements(SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT, WatermarkEstimatorStateT> original) {
            this.original = original;
        }

        public ProcessFn<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> newProcessFn(DoFn<InputT, OutputT> fn) {
            return new ProcessFn(fn, this.original.getElementCoder(), this.original.getRestrictionCoder(), this.original.getWatermarkEstimatorStateCoder(), this.original.getInputWindowingStrategy());
        }

        public DoFn<InputT, OutputT> getFn() {
            return this.original.getFn();
        }

        public List<PCollectionView<?>> getSideInputs() {
            return this.original.getSideInputs();
        }

        public TupleTag<OutputT> getMainOutputTag() {
            return this.original.getMainOutputTag();
        }

        public TupleTagList getAdditionalOutputTags() {
            return this.original.getAdditionalOutputTags();
        }

        public PCollectionTuple expand(PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> input) {
            return SplittableParDo.ProcessKeyedElements.createPrimitiveOutputFor(input, this.original.getFn(), this.original.getMainOutputTag(), this.original.getAdditionalOutputTags(), this.original.getOutputTagsToCoders(), this.original.getInputWindowingStrategy());
        }
    }

    public static class SplittableProcessViaKeyedWorkItems<InputT, OutputT, RestrictionT, WatermarkEstimatorStateT>
    extends PTransform<PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple> {
        private final SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT, WatermarkEstimatorStateT> original;

        public SplittableProcessViaKeyedWorkItems(SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT, WatermarkEstimatorStateT> original) {
            this.original = original;
        }

        public PCollectionTuple expand(PCollection<KV<byte[], KV<InputT, RestrictionT>>> input) {
            return (PCollectionTuple)((PCollection)input.apply(new GBKIntoKeyedWorkItems())).setCoder(KeyedWorkItemCoder.of(ByteArrayCoder.of(), ((KvCoder)input.getCoder()).getValueCoder(), (Coder<? extends BoundedWindow>)input.getWindowingStrategy().getWindowFn().windowCoder())).apply(new ProcessElements(this.original));
        }
    }

    public static class OverrideFactory<InputT, OutputT, RestrictionT, WatermarkEstimatorStateT>
    implements PTransformOverrideFactory<PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple, SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT, WatermarkEstimatorStateT>> {
        public PTransformOverrideFactory.PTransformReplacement<PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple> getReplacementTransform(AppliedPTransform<PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple, SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT, WatermarkEstimatorStateT>> transform) {
            return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), new SplittableProcessViaKeyedWorkItems((SplittableParDo.ProcessKeyedElements)transform.getTransform()));
        }

        public Map<PCollection<?>, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PCollection<?>> outputs, PCollectionTuple newOutput) {
            return ReplacementOutputs.tagged(outputs, (POutput)newOutput);
        }
    }

    public static class GBKIntoKeyedWorkItems<KeyT, InputT>
    extends PTransformTranslation.RawPTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
        @Override
        public PCollection<KeyedWorkItem<KeyT, InputT>> expand(PCollection<KV<KeyT, InputT>> input) {
            KvCoder kvCoder = (KvCoder)input.getCoder();
            return PCollection.createPrimitiveOutputInternal((Pipeline)input.getPipeline(), (WindowingStrategy)WindowingStrategy.globalDefault(), (PCollection.IsBounded)input.isBounded(), KeyedWorkItemCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder(), (Coder<? extends BoundedWindow>)input.getWindowingStrategy().getWindowFn().windowCoder()));
        }

        @Override
        public String getUrn() {
            return "beam:runners_core:transforms:splittable_gbkikwi:v1";
        }

        @Override
        public RunnerApi.FunctionSpec getSpec() {
            throw new UnsupportedOperationException(String.format("%s should never be serialized to proto", ((Object)((Object)this)).getClass().getSimpleName()));
        }
    }
}

