/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.repackaged.runners.core;

import java.util.List;
import java.util.UUID;
import org.apache.beam.runners.direct.repackaged.runners.core.ElementAndRestriction;
import org.apache.beam.runners.direct.repackaged.runners.core.ElementAndRestrictionCoder;
import org.apache.beam.runners.direct.repackaged.runners.core.KeyedWorkItem;
import org.apache.beam.runners.direct.repackaged.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.direct.repackaged.runners.core.SplittableProcessElementInvoker;
import org.apache.beam.runners.direct.repackaged.runners.core.StateInternals;
import org.apache.beam.runners.direct.repackaged.runners.core.StateInternalsFactory;
import org.apache.beam.runners.direct.repackaged.runners.core.StateNamespace;
import org.apache.beam.runners.direct.repackaged.runners.core.StateNamespaces;
import org.apache.beam.runners.direct.repackaged.runners.core.StateTag;
import org.apache.beam.runners.direct.repackaged.runners.core.StateTags;
import org.apache.beam.runners.direct.repackaged.runners.core.TimerInternals;
import org.apache.beam.runners.direct.repackaged.runners.core.TimerInternalsFactory;
import org.apache.beam.runners.direct.repackaged.runners.core.java.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.runners.direct.repackaged.runners.core.java.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.direct.repackaged.runners.core.java.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;

@Experimental(value=Experimental.Kind.SPLITTABLE_DO_FN)
public class SplittableParDo<InputT, OutputT, RestrictionT>
extends PTransform<PCollection<InputT>, PCollectionTuple> {
    private final ParDo.MultiOutput<InputT, OutputT> parDo;

    public SplittableParDo(ParDo.MultiOutput<InputT, OutputT> parDo) {
        Preconditions.checkNotNull(parDo, "parDo must not be null");
        this.parDo = parDo;
        Preconditions.checkArgument(DoFnSignatures.getSignature(parDo.getFn().getClass()).processElement().isSplittable(), "fn must be a splittable DoFn");
    }

    public PCollectionTuple expand(PCollection<InputT> input) {
        return this.applyTyped(input);
    }

    private PCollectionTuple applyTyped(PCollection<InputT> input) {
        DoFn fn = this.parDo.getFn();
        Coder restrictionCoder = DoFnInvokers.invokerFor((DoFn)fn).invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry());
        PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> keyedWorkItems = SplittableParDo.applySplitIntoKeyedWorkItems(input, fn, restrictionCoder);
        return (PCollectionTuple)keyedWorkItems.apply("Process", new ProcessElements(fn, input.getCoder(), restrictionCoder, input.getWindowingStrategy(), this.parDo.getSideInputs(), this.parDo.getMainOutputTag(), this.parDo.getAdditionalOutputTags()));
    }

    private static <InputT, OutputT, RestrictionT> PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> applySplitIntoKeyedWorkItems(PCollection<InputT> input, DoFn<InputT, OutputT> fn, Coder<RestrictionT> restrictionCoder) {
        ElementAndRestrictionCoder splitCoder = ElementAndRestrictionCoder.of(input.getCoder(), restrictionCoder);
        PCollection keyedWorkItems = ((PCollection)((PCollection)((PCollection)((PCollection)((PCollection)input.apply("Pair with initial restriction", (PTransform)ParDo.of(new PairWithRestrictionFn(fn)))).setCoder(splitCoder).apply("Split restriction", (PTransform)ParDo.of(new SplitRestrictionFn(fn)))).setCoder(splitCoder).apply("Explode windows", (PTransform)ParDo.of(new ExplodeWindowsFn()))).apply("Assign unique key", (PTransform)WithKeys.of(new RandomUniqueKeyFn()))).apply("Group by key", new GBKIntoKeyedWorkItems())).setCoder(KeyedWorkItemCoder.of(StringUtf8Coder.of(), splitCoder, (Coder<? extends BoundedWindow>)input.getWindowingStrategy().getWindowFn().windowCoder()));
        Preconditions.checkArgument(keyedWorkItems.getWindowingStrategy().getWindowFn() instanceof GlobalWindows, "GBKIntoKeyedWorkItems must produce a globally windowed collection, but windowing strategy was: %s", (Object)keyedWorkItems.getWindowingStrategy());
        return keyedWorkItems;
    }

    private static class SplitRestrictionFn<InputT, RestrictionT>
    extends DoFn<ElementAndRestriction<InputT, RestrictionT>, ElementAndRestriction<InputT, RestrictionT>> {
        private final DoFn<InputT, ?> splittableFn;
        private transient DoFnInvoker<InputT, ?> invoker;

        SplitRestrictionFn(DoFn<InputT, ?> splittableFn) {
            this.splittableFn = splittableFn;
        }

        @DoFn.Setup
        public void setup() {
            this.invoker = DoFnInvokers.invokerFor(this.splittableFn);
        }

        @DoFn.ProcessElement
        public void processElement(final DoFn.ProcessContext c) {
            final Object element = ((ElementAndRestriction)c.element()).element();
            this.invoker.invokeSplitRestriction(element, ((ElementAndRestriction)c.element()).restriction(), new DoFn.OutputReceiver<RestrictionT>(){

                public void output(RestrictionT part) {
                    c.output(ElementAndRestriction.of(element, part));
                }
            });
        }
    }

    @VisibleForTesting
    public static class ProcessFn<InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
    extends DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {
        private static final StateTag<WatermarkHoldState> watermarkHoldTag = StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("hold", TimestampCombiner.LATEST));
        private final StateTag<ValueState<WindowedValue<InputT>>> elementTag;
        private StateTag<ValueState<RestrictionT>> restrictionTag;
        private final DoFn<InputT, OutputT> fn;
        private final Coder<InputT> elementCoder;
        private final Coder<RestrictionT> restrictionCoder;
        private final WindowingStrategy<InputT, ?> inputWindowingStrategy;
        private transient StateInternalsFactory<String> stateInternalsFactory;
        private transient TimerInternalsFactory<String> timerInternalsFactory;
        private transient SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT> processElementInvoker;
        private transient DoFnInvoker<InputT, OutputT> invoker;

        public ProcessFn(DoFn<InputT, OutputT> fn, Coder<InputT> elementCoder, Coder<RestrictionT> restrictionCoder, WindowingStrategy<InputT, ?> inputWindowingStrategy) {
            this.fn = fn;
            this.elementCoder = elementCoder;
            this.restrictionCoder = restrictionCoder;
            this.inputWindowingStrategy = inputWindowingStrategy;
            this.elementTag = StateTags.value("element", WindowedValue.getFullCoder(elementCoder, (Coder)inputWindowingStrategy.getWindowFn().windowCoder()));
            this.restrictionTag = StateTags.value("restriction", restrictionCoder);
        }

        public void setStateInternalsFactory(StateInternalsFactory<String> stateInternalsFactory) {
            this.stateInternalsFactory = stateInternalsFactory;
        }

        public void setTimerInternalsFactory(TimerInternalsFactory<String> timerInternalsFactory) {
            this.timerInternalsFactory = timerInternalsFactory;
        }

        public void setProcessElementInvoker(SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT> invoker) {
            this.processElementInvoker = invoker;
        }

        public DoFn<InputT, OutputT> getFn() {
            return this.fn;
        }

        public Coder<InputT> getElementCoder() {
            return this.elementCoder;
        }

        public Coder<RestrictionT> getRestrictionCoder() {
            return this.restrictionCoder;
        }

        public WindowingStrategy<InputT, ?> getInputWindowingStrategy() {
            return this.inputWindowingStrategy;
        }

        @DoFn.Setup
        public void setup() throws Exception {
            this.invoker = DoFnInvokers.invokerFor(this.fn);
            this.invoker.invokeSetup();
        }

        @DoFn.Teardown
        public void tearDown() throws Exception {
            this.invoker.invokeTeardown();
        }

        @DoFn.StartBundle
        public void startBundle(DoFn.StartBundleContext c) throws Exception {
            this.invoker.invokeStartBundle(this.wrapContextAsStartBundle(c));
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn.FinishBundleContext c) throws Exception {
            this.invoker.invokeFinishBundle(this.wrapContextAsFinishBundle(c));
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            ElementAndRestriction<Object, Object> elementAndRestriction;
            StateNamespace stateNamespace;
            boolean isSeedCall;
            String key = (String)((KeyedWorkItem)c.element()).key();
            StateInternals stateInternals = this.stateInternalsFactory.stateInternalsForKey(key);
            TimerInternals timerInternals = this.timerInternalsFactory.timerInternalsForKey(key);
            TimerInternals.TimerData timer = Iterables.getOnlyElement(((KeyedWorkItem)c.element()).timersIterable(), null);
            boolean bl = isSeedCall = timer == null;
            if (isSeedCall) {
                WindowedValue windowedValue = Iterables.getOnlyElement(((KeyedWorkItem)c.element()).elementsIterable());
                BoundedWindow window = (BoundedWindow)Iterables.getOnlyElement(windowedValue.getWindows());
                stateNamespace = StateNamespaces.window(this.inputWindowingStrategy.getWindowFn().windowCoder(), window);
            } else {
                stateNamespace = timer.getNamespace();
            }
            ValueState<WindowedValue<InputT>> elementState = stateInternals.state(stateNamespace, this.elementTag);
            ValueState<RestrictionT> restrictionState = stateInternals.state(stateNamespace, this.restrictionTag);
            WatermarkHoldState holdState = stateInternals.state(stateNamespace, watermarkHoldTag);
            if (isSeedCall) {
                WindowedValue windowedValue = Iterables.getOnlyElement(((KeyedWorkItem)c.element()).elementsIterable());
                WindowedValue element = windowedValue.withValue(((ElementAndRestriction)windowedValue.getValue()).element());
                elementState.write((Object)element);
                elementAndRestriction = ElementAndRestriction.of(element, ((ElementAndRestriction)windowedValue.getValue()).restriction());
            } else {
                elementState.readLater();
                restrictionState.readLater();
                elementAndRestriction = ElementAndRestriction.of(elementState.read(), restrictionState.read());
            }
            RestrictionTracker tracker = this.invoker.invokeNewTracker(elementAndRestriction.restriction());
            SplittableProcessElementInvoker.Result result = this.processElementInvoker.invokeProcessElement(this.invoker, (WindowedValue)elementAndRestriction.element(), tracker);
            if (result.getResidualRestriction() == null) {
                elementState.clear();
                restrictionState.clear();
                holdState.clear();
                return;
            }
            restrictionState.write(result.getResidualRestriction());
            Instant futureOutputWatermark = result.getFutureOutputWatermark();
            if (futureOutputWatermark == null) {
                futureOutputWatermark = ((WindowedValue)elementAndRestriction.element()).getTimestamp();
            }
            holdState.add((Object)futureOutputWatermark);
            timerInternals.setTimer(TimerInternals.TimerData.of(stateNamespace, timerInternals.currentProcessingTime(), TimeDomain.PROCESSING_TIME));
        }

        private DoFn.StartBundleContext wrapContextAsStartBundle(final DoFn.StartBundleContext baseContext) {
            DoFn<InputT, OutputT> doFn = this.fn;
            doFn.getClass();
            return new DoFn.StartBundleContext(doFn){

                public PipelineOptions getPipelineOptions() {
                    return baseContext.getPipelineOptions();
                }

                private void throwUnsupportedOutput() {
                    throw new UnsupportedOperationException(String.format("Splittable DoFn can only output from @%s", DoFn.ProcessElement.class.getSimpleName()));
                }
            };
        }

        private DoFn.FinishBundleContext wrapContextAsFinishBundle(final DoFn.FinishBundleContext baseContext) {
            DoFn<InputT, OutputT> doFn = this.fn;
            doFn.getClass();
            return new DoFn.FinishBundleContext(doFn){

                public void output(OutputT output, Instant timestamp, BoundedWindow window) {
                    this.throwUnsupportedOutput();
                }

                public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
                    this.throwUnsupportedOutput();
                }

                public PipelineOptions getPipelineOptions() {
                    return baseContext.getPipelineOptions();
                }

                private void throwUnsupportedOutput() {
                    throw new UnsupportedOperationException(String.format("Splittable DoFn can only output from @%s", DoFn.ProcessElement.class.getSimpleName()));
                }
            };
        }
    }

    private static class PairWithRestrictionFn<InputT, OutputT, RestrictionT>
    extends DoFn<InputT, ElementAndRestriction<InputT, RestrictionT>> {
        private DoFn<InputT, OutputT> fn;
        private transient DoFnInvoker<InputT, OutputT> invoker;

        PairWithRestrictionFn(DoFn<InputT, OutputT> fn) {
            this.fn = fn;
        }

        @DoFn.Setup
        public void setup() {
            this.invoker = DoFnInvokers.invokerFor(this.fn);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context) {
            context.output(ElementAndRestriction.of(context.element(), this.invoker.invokeGetInitialRestriction(context.element())));
        }
    }

    private static class RandomUniqueKeyFn<T>
    implements SerializableFunction<T, String> {
        private RandomUniqueKeyFn() {
        }

        public String apply(T input) {
            return UUID.randomUUID().toString();
        }
    }

    public static class ProcessElements<InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
    extends PTransform<PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>, PCollectionTuple> {
        private final DoFn<InputT, OutputT> fn;
        private final Coder<InputT> elementCoder;
        private final Coder<RestrictionT> restrictionCoder;
        private final WindowingStrategy<InputT, ?> windowingStrategy;
        private final List<PCollectionView<?>> sideInputs;
        private final TupleTag<OutputT> mainOutputTag;
        private final TupleTagList additionalOutputTags;

        public ProcessElements(DoFn<InputT, OutputT> fn, Coder<InputT> elementCoder, Coder<RestrictionT> restrictionCoder, WindowingStrategy<InputT, ?> windowingStrategy, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, TupleTagList additionalOutputTags) {
            this.fn = fn;
            this.elementCoder = elementCoder;
            this.restrictionCoder = restrictionCoder;
            this.windowingStrategy = windowingStrategy;
            this.sideInputs = sideInputs;
            this.mainOutputTag = mainOutputTag;
            this.additionalOutputTags = additionalOutputTags;
        }

        public DoFn<InputT, OutputT> getFn() {
            return this.fn;
        }

        public List<PCollectionView<?>> getSideInputs() {
            return this.sideInputs;
        }

        public TupleTag<OutputT> getMainOutputTag() {
            return this.mainOutputTag;
        }

        public TupleTagList getAdditionalOutputTags() {
            return this.additionalOutputTags;
        }

        public ProcessFn<InputT, OutputT, RestrictionT, TrackerT> newProcessFn(DoFn<InputT, OutputT> fn) {
            return new ProcessFn(fn, this.elementCoder, this.restrictionCoder, this.windowingStrategy);
        }

        public PCollectionTuple expand(PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> input) {
            DoFnSignature signature = DoFnSignatures.getSignature(this.fn.getClass());
            PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal((Pipeline)input.getPipeline(), (TupleTagList)TupleTagList.of(this.mainOutputTag).and(this.additionalOutputTags.getAll()), this.windowingStrategy, (PCollection.IsBounded)input.isBounded().and(signature.isBoundedPerElement()));
            outputs.get(this.mainOutputTag).setTypeDescriptor(this.fn.getOutputTypeDescriptor());
            return outputs;
        }

        public <T> Coder<T> getDefaultOutputCoder(PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> input, PCollection<T> output) throws CannotProvideCoderException {
            KeyedWorkItemCoder kwiCoder = (KeyedWorkItemCoder)input.getCoder();
            Coder inputCoder = ((ElementAndRestrictionCoder)kwiCoder.getElementCoder()).getElementCoder();
            return input.getPipeline().getCoderRegistry().getCoder(output.getTypeDescriptor(), this.fn.getInputTypeDescriptor(), inputCoder);
        }
    }

    public static class GBKIntoKeyedWorkItems<KeyT, InputT>
    extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
        public PCollection<KeyedWorkItem<KeyT, InputT>> expand(PCollection<KV<KeyT, InputT>> input) {
            return PCollection.createPrimitiveOutputInternal((Pipeline)input.getPipeline(), (WindowingStrategy)WindowingStrategy.globalDefault(), (PCollection.IsBounded)input.isBounded());
        }
    }

    private static class ExplodeWindowsFn<InputT>
    extends DoFn<InputT, InputT> {
        private ExplodeWindowsFn() {
        }

        @DoFn.ProcessElement
        public void process(DoFn.ProcessContext c, BoundedWindow window) {
            c.output(c.element());
        }
    }
}

