/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.direct_java.runners.core;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.apache.beam.repackaged.direct_java.runners.core.DoFnRunner;
import org.apache.beam.repackaged.direct_java.runners.core.LateDataUtils;
import org.apache.beam.repackaged.direct_java.runners.core.StateInternals;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespace;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespaces;
import org.apache.beam.repackaged.direct_java.runners.core.StateTag;
import org.apache.beam.repackaged.direct_java.runners.core.StateTags;
import org.apache.beam.repackaged.direct_java.runners.core.StepContext;
import org.apache.beam.repackaged.direct_java.runners.core.TimerInternals;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
implements DoFnRunner<InputT, OutputT> {
    public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "StatefulParDoDropped";
    private static final String SORT_BUFFER_STATE = "sortBuffer";
    private static final String SORT_BUFFER_MIN_STAMP = "sortBufferMinStamp";
    private static final String SORT_FLUSH_TIMER = "__StatefulParDoSortFlushTimerId";
    private static final String SORT_FLUSH_WATERMARK_HOLD = "flushWatermarkHold";
    private final DoFnRunner<InputT, OutputT> doFnRunner;
    private final StepContext stepContext;
    private final WindowingStrategy<?, ?> windowingStrategy;
    private final Counter droppedDueToLateness = Metrics.counter(StatefulDoFnRunner.class, (String)"StatefulParDoDropped");
    private final CleanupTimer<InputT> cleanupTimer;
    private final StateCleaner stateCleaner;
    private final boolean requiresTimeSortedInput;
    private final Coder<BoundedWindow> windowCoder;
    private final StateTag<BagState<WindowedValue<InputT>>> sortBufferTag;
    private final StateTag<ValueState<Instant>> sortBufferMinStampTag = StateTags.makeSystemTagInternal(StateTags.value("sortBufferMinStamp", InstantCoder.of()));
    private final StateTag<WatermarkHoldState> watermarkHold = StateTags.watermarkStateInternal("flushWatermarkHold", TimestampCombiner.LATEST);

    public StatefulDoFnRunner(DoFnRunner<InputT, OutputT> doFnRunner, Coder<InputT> inputCoder, StepContext stepContext, WindowingStrategy<?, ?> windowingStrategy, CleanupTimer<InputT> cleanupTimer, StateCleaner<W> stateCleaner, boolean requiresTimeSortedInput) {
        Coder untypedCoder;
        this.doFnRunner = doFnRunner;
        this.stepContext = stepContext;
        this.windowingStrategy = windowingStrategy;
        this.cleanupTimer = cleanupTimer;
        this.stateCleaner = stateCleaner;
        this.requiresTimeSortedInput = requiresTimeSortedInput;
        WindowFn windowFn = windowingStrategy.getWindowFn();
        this.windowCoder = untypedCoder = windowFn.windowCoder();
        this.sortBufferTag = StateTags.makeSystemTagInternal(StateTags.bag(SORT_BUFFER_STATE, WindowedValue.getFullCoder(inputCoder, this.windowCoder)));
        this.rejectMergingWindowFn(windowFn);
    }

    private void rejectMergingWindowFn(WindowFn<?, ?> windowFn) {
        if (!(windowFn instanceof NonMergingWindowFn)) {
            throw new UnsupportedOperationException("MergingWindowFn is not supported for stateful DoFns, WindowFn is: " + windowFn);
        }
    }

    public List<StateTag<?>> getSystemStateTags() {
        return Arrays.asList(this.sortBufferTag, this.sortBufferMinStampTag, this.watermarkHold);
    }

    @Override
    public DoFn<InputT, OutputT> getFn() {
        return this.doFnRunner.getFn();
    }

    @Override
    public void startBundle() {
        this.doFnRunner.startBundle();
    }

    @Override
    public void finishBundle() {
        this.doFnRunner.finishBundle();
    }

    @Override
    public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
        this.doFnRunner.onWindowExpiration(window, timestamp, key);
    }

    @Override
    public void processElement(WindowedValue<InputT> input) {
        for (WindowedValue value : input.explodeWindows()) {
            BoundedWindow window = (BoundedWindow)value.getWindows().iterator().next();
            if (this.isLate(window)) {
                this.reportDroppedElement(value, window);
                continue;
            }
            if (this.requiresTimeSortedInput) {
                this.processElementOrdered(window, value);
                continue;
            }
            this.processElementUnordered(window, value);
        }
    }

    private void processElementUnordered(BoundedWindow window, WindowedValue<InputT> value) {
        this.cleanupTimer.setForWindow(value.getValue(), window);
        this.doFnRunner.processElement(value);
    }

    private void processElementOrdered(BoundedWindow window, WindowedValue<InputT> value) {
        StateInternals stateInternals = this.stepContext.stateInternals();
        TimerInternals timerInternals = this.stepContext.timerInternals();
        Instant inputWatermark = (Instant)MoreObjects.firstNonNull((Object)timerInternals.currentInputWatermarkTime(), (Object)BoundedWindow.TIMESTAMP_MIN_VALUE);
        if (!inputWatermark.isAfter((ReadableInstant)value.getTimestamp().plus((ReadableDuration)this.windowingStrategy.getAllowedLateness()))) {
            StateNamespace namespace = StateNamespaces.window(this.windowCoder, window);
            BagState<WindowedValue<InputT>> sortBuffer = stateInternals.state(namespace, this.sortBufferTag);
            ValueState<Instant> minStampState = stateInternals.state(namespace, this.sortBufferMinStampTag);
            sortBuffer.add(value);
            Instant minStamp = (Instant)MoreObjects.firstNonNull((Object)((Instant)minStampState.read()), (Object)BoundedWindow.TIMESTAMP_MAX_VALUE);
            if (value.getTimestamp().isBefore((ReadableInstant)minStamp)) {
                minStamp = value.getTimestamp();
                minStampState.write((Object)minStamp);
                this.setupFlushTimer(namespace, window, minStamp);
            }
        } else {
            this.reportDroppedElement(value, window);
        }
    }

    private boolean isLate(BoundedWindow window) {
        Instant gcTime = LateDataUtils.garbageCollectionTime(window, this.windowingStrategy);
        Instant inputWM = this.stepContext.timerInternals().currentInputWatermarkTime();
        return gcTime.isBefore((ReadableInstant)inputWM);
    }

    private void reportDroppedElement(WindowedValue<InputT> value, BoundedWindow window) {
        this.droppedDueToLateness.inc();
        WindowTracing.debug((String)"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} since too far behind inputWatermark:{}", (Object[])new Object[]{value.getTimestamp(), window, this.stepContext.timerInternals().currentInputWatermarkTime()});
    }

    @Override
    public <KeyT> void onTimer(String timerId, String timerFamilyId, KeyT key, BoundedWindow window, Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) {
        if (timerId.equals(SORT_FLUSH_TIMER)) {
            this.onSortFlushTimer(window, this.stepContext.timerInternals().currentInputWatermarkTime());
        } else if (this.cleanupTimer.isForWindow(timerId, window, timestamp, timeDomain)) {
            if (this.requiresTimeSortedInput) {
                this.onSortFlushTimer(window, BoundedWindow.TIMESTAMP_MAX_VALUE);
            }
            this.doFnRunner.onWindowExpiration(window, outputTimestamp, key);
            this.stateCleaner.clearForWindow(window);
        } else if (!timeDomain.equals((Object)TimeDomain.EVENT_TIME) && this.isLate(window)) {
            WindowTracing.debug((String)"StatefulDoFnRunner.onTimer: Ignoring processing-time timer at {}; window:{} since window is too far behind inputWatermark:{}", (Object[])new Object[]{timestamp, window, this.stepContext.timerInternals().currentInputWatermarkTime()});
        } else {
            this.doFnRunner.onTimer(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain);
        }
    }

    private void onSortFlushTimer(BoundedWindow window, Instant timestamp) {
        StateInternals stateInternals = this.stepContext.stateInternals();
        StateNamespace namespace = StateNamespaces.window(this.windowCoder, window);
        BagState<WindowedValue<InputT>> sortBuffer = stateInternals.state(namespace, this.sortBufferTag);
        ValueState<Instant> minStampState = stateInternals.state(namespace, this.sortBufferMinStampTag);
        ArrayList<WindowedValue> keep = new ArrayList<WindowedValue>();
        ArrayList<WindowedValue> flush = new ArrayList<WindowedValue>();
        Instant newMinStamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
        for (WindowedValue e2 : sortBuffer.read()) {
            if (!e2.getTimestamp().isAfter((ReadableInstant)timestamp)) {
                flush.add(e2);
                continue;
            }
            keep.add(e2);
            if (!e2.getTimestamp().isBefore((ReadableInstant)newMinStamp)) continue;
            newMinStamp = e2.getTimestamp();
        }
        flush.stream().sorted(Comparator.comparing(WindowedValue::getTimestamp)).forEachOrdered(e -> this.processElementUnordered(window, (WindowedValue<InputT>)e));
        sortBuffer.clear();
        keep.forEach(arg_0 -> sortBuffer.add(arg_0));
        minStampState.write((Object)newMinStamp);
        if (newMinStamp.isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            this.setupFlushTimer(namespace, window, newMinStamp);
        } else {
            this.clearWatermarkHold(namespace);
        }
    }

    private void setupFlushTimer(StateNamespace namespace, BoundedWindow window, Instant flush) {
        Instant windowGcTime;
        Instant flushWithLateness = flush.plus((ReadableDuration)this.windowingStrategy.getAllowedLateness());
        if (flushWithLateness.isAfter((ReadableInstant)(windowGcTime = LateDataUtils.garbageCollectionTime(window, this.windowingStrategy.getAllowedLateness())))) {
            flushWithLateness = windowGcTime;
        }
        WatermarkHoldState watermark = this.stepContext.stateInternals().state(namespace, this.watermarkHold);
        this.stepContext.timerInternals().setTimer(namespace, SORT_FLUSH_TIMER, SORT_FLUSH_TIMER, flushWithLateness, flush, TimeDomain.EVENT_TIME);
        if (!((Boolean)watermark.isEmpty().read()).booleanValue()) {
            watermark.clear();
        }
    }

    private void clearWatermarkHold(StateNamespace namespace) {
        this.stepContext.stateInternals().state(namespace, this.watermarkHold).clear();
    }

    public static class StateInternalsStateCleaner<W extends BoundedWindow>
    implements StateCleaner<W> {
        private final DoFn<?, ?> fn;
        private final DoFnSignature signature;
        private final StateInternals stateInternals;
        private final Coder<W> windowCoder;

        public StateInternalsStateCleaner(DoFn<?, ?> fn, StateInternals stateInternals, Coder<W> windowCoder) {
            this.fn = fn;
            this.signature = DoFnSignatures.getSignature(fn.getClass());
            this.stateInternals = stateInternals;
            this.windowCoder = windowCoder;
        }

        @Override
        public void clearForWindow(W window) {
            for (Map.Entry entry : this.signature.stateDeclarations().entrySet()) {
                try {
                    StateSpec spec = (StateSpec)((DoFnSignature.StateDeclaration)entry.getValue()).field().get(this.fn);
                    Object state = this.stateInternals.state(StateNamespaces.window(this.windowCoder, window), StateTags.tagForSpec((String)entry.getKey(), spec));
                    state.clear();
                }
                catch (IllegalAccessException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    public static class TimeInternalsCleanupTimer<InputT>
    implements CleanupTimer<InputT> {
        public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";
        public static final long GC_DELAY_MS = 1L;
        private final TimerInternals timerInternals;
        private final WindowingStrategy<?, ?> windowingStrategy;
        private final Coder<BoundedWindow> windowCoder;

        public TimeInternalsCleanupTimer(TimerInternals timerInternals, WindowingStrategy<?, ?> windowingStrategy) {
            this.windowingStrategy = windowingStrategy;
            WindowFn windowFn = windowingStrategy.getWindowFn();
            this.windowCoder = windowFn.windowCoder();
            this.timerInternals = timerInternals;
        }

        @Override
        public void setForWindow(InputT input, BoundedWindow window) {
            Instant gcTime = LateDataUtils.garbageCollectionTime(window, this.windowingStrategy);
            gcTime = gcTime.plus(1L);
            this.timerInternals.setTimer(StateNamespaces.window(this.windowCoder, window), GC_TIMER_ID, "", gcTime, window.maxTimestamp(), TimeDomain.EVENT_TIME);
        }

        @Override
        public boolean isForWindow(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
            boolean isEventTimer = timeDomain.equals((Object)TimeDomain.EVENT_TIME);
            Instant gcTime = LateDataUtils.garbageCollectionTime(window, this.windowingStrategy);
            gcTime = gcTime.plus(1L);
            return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals((Object)timestamp);
        }
    }

    public static interface StateCleaner<W extends BoundedWindow> {
        public void clearForWindow(W var1);
    }

    public static interface CleanupTimer<InputT> {
        public void setForWindow(InputT var1, BoundedWindow var2);

        public boolean isForWindow(String var1, BoundedWindow var2, Instant var3, TimeDomain var4);
    }
}

