/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.Collection;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.shaded.guava33.com.google.common.base.Function;
import org.apache.flink.shaded.guava33.com.google.common.collect.FluentIterable;
import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

@Internal
public class EvictingWindowOperator<K, IN, OUT, W extends Window>
extends WindowOperator<K, IN, Iterable<IN>, OUT, W> {
    private static final long serialVersionUID = 1L;
    private final Evictor<? super IN, ? super W> evictor;
    private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> evictingWindowStateDescriptor;
    private transient EvictorContext evictorContext;
    private transient InternalListState<K, W, StreamRecord<IN>> evictingWindowState;

    public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor, InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger, Evictor<? super IN, ? super W> evictor, long allowedLateness, OutputTag<IN> lateDataOutputTag) {
        super(windowAssigner, windowSerializer, keySelector, keySerializer, null, windowFunction, trigger, allowedLateness, lateDataOutputTag);
        this.evictor = (Evictor)Preconditions.checkNotNull(evictor);
        this.evictingWindowStateDescriptor = (StateDescriptor)Preconditions.checkNotNull(windowStateDescriptor);
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        Collection elementWindows = this.windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), this.windowAssignerContext);
        boolean isSkippedElement = true;
        final Object key = this.getKeyedStateBackend().getCurrentKey();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            MergingWindowSet<Window> mergingWindows = this.getMergingWindowSet();
            for (Window window : elementWindows) {
                Window actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>(){

                    @Override
                    public void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception {
                        if (EvictingWindowOperator.this.windowAssigner.isEventTime() && ((Window)mergeResult).maxTimestamp() + EvictingWindowOperator.this.allowedLateness <= EvictingWindowOperator.this.internalTimerService.currentWatermark()) {
                            throw new UnsupportedOperationException("The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: " + EvictingWindowOperator.this.internalTimerService.currentWatermark() + " window: " + String.valueOf(mergeResult));
                        }
                        if (!EvictingWindowOperator.this.windowAssigner.isEventTime() && ((Window)mergeResult).maxTimestamp() <= EvictingWindowOperator.this.internalTimerService.currentProcessingTime()) {
                            throw new UnsupportedOperationException("The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: " + EvictingWindowOperator.this.internalTimerService.currentProcessingTime() + " window: " + String.valueOf(mergeResult));
                        }
                        EvictingWindowOperator.this.triggerContext.key = key;
                        EvictingWindowOperator.this.triggerContext.window = mergeResult;
                        EvictingWindowOperator.this.triggerContext.onMerge(mergedWindows);
                        for (Window m : mergedWindows) {
                            EvictingWindowOperator.this.triggerContext.window = m;
                            EvictingWindowOperator.this.triggerContext.clear();
                            EvictingWindowOperator.this.deleteCleanupTimer(m);
                        }
                        EvictingWindowOperator.this.evictingWindowState.mergeNamespaces(stateWindowResult, mergedStateWindows);
                    }
                });
                if (this.isWindowLate(actualWindow)) {
                    mergingWindows.retireWindow(actualWindow);
                    continue;
                }
                isSkippedElement = false;
                Window stateWindow = mergingWindows.getStateWindow(actualWindow);
                if (stateWindow == null) {
                    throw new IllegalStateException("Window " + String.valueOf(window) + " is not in in-flight window set.");
                }
                this.evictingWindowState.setCurrentNamespace(stateWindow);
                this.evictingWindowState.add(element);
                this.triggerContext.key = key;
                this.triggerContext.window = actualWindow;
                this.evictorContext.key = key;
                this.evictorContext.window = actualWindow;
                TriggerResult triggerResult = this.triggerContext.onElement(element);
                if (triggerResult.isFire()) {
                    Iterable contents = (Iterable)this.evictingWindowState.get();
                    if (contents == null) continue;
                    this.emitWindowContents(actualWindow, contents, this.evictingWindowState);
                }
                if (triggerResult.isPurge()) {
                    this.evictingWindowState.clear();
                }
                this.registerCleanupTimer(actualWindow);
            }
            mergingWindows.persist();
        } else {
            for (Window window : elementWindows) {
                if (this.isWindowLate(window)) continue;
                isSkippedElement = false;
                this.evictingWindowState.setCurrentNamespace(window);
                this.evictingWindowState.add(element);
                this.triggerContext.key = key;
                this.triggerContext.window = window;
                this.evictorContext.key = key;
                this.evictorContext.window = window;
                TriggerResult triggerResult = this.triggerContext.onElement(element);
                if (triggerResult.isFire()) {
                    Iterable contents = (Iterable)this.evictingWindowState.get();
                    if (contents == null) continue;
                    this.emitWindowContents(window, contents, this.evictingWindowState);
                }
                if (triggerResult.isPurge()) {
                    this.evictingWindowState.clear();
                }
                this.registerCleanupTimer(window);
            }
        }
        if (isSkippedElement && this.isElementLate(element)) {
            if (this.lateDataOutputTag != null) {
                this.sideOutput(element);
            } else {
                this.numLateRecordsDropped.inc();
            }
        }
    }

    @Override
    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
        Iterable contents;
        this.triggerContext.key = timer.getKey();
        this.triggerContext.window = (Window)timer.getNamespace();
        this.evictorContext.key = timer.getKey();
        this.evictorContext.window = (Window)timer.getNamespace();
        MergingWindowSet mergingWindows = null;
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            mergingWindows = this.getMergingWindowSet();
            Object stateWindow = mergingWindows.getStateWindow(this.triggerContext.window);
            if (stateWindow == null) {
                return;
            }
            this.evictingWindowState.setCurrentNamespace(stateWindow);
        } else {
            this.evictingWindowState.setCurrentNamespace(this.triggerContext.window);
        }
        TriggerResult triggerResult = this.triggerContext.onEventTime(timer.getTimestamp());
        if (triggerResult.isFire() && (contents = (Iterable)this.evictingWindowState.get()) != null) {
            this.emitWindowContents(this.triggerContext.window, contents, this.evictingWindowState);
        }
        if (triggerResult.isPurge()) {
            this.evictingWindowState.clear();
        }
        if (this.windowAssigner.isEventTime() && this.isCleanupTime(this.triggerContext.window, timer.getTimestamp())) {
            this.clearAllState(this.triggerContext.window, this.evictingWindowState, mergingWindows);
        }
        if (mergingWindows != null) {
            mergingWindows.persist();
        }
    }

    @Override
    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
        Iterable contents;
        this.triggerContext.key = timer.getKey();
        this.triggerContext.window = (Window)timer.getNamespace();
        this.evictorContext.key = timer.getKey();
        this.evictorContext.window = (Window)timer.getNamespace();
        MergingWindowSet mergingWindows = null;
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            mergingWindows = this.getMergingWindowSet();
            Object stateWindow = mergingWindows.getStateWindow(this.triggerContext.window);
            if (stateWindow == null) {
                return;
            }
            this.evictingWindowState.setCurrentNamespace(stateWindow);
        } else {
            this.evictingWindowState.setCurrentNamespace(this.triggerContext.window);
        }
        TriggerResult triggerResult = this.triggerContext.onProcessingTime(timer.getTimestamp());
        if (triggerResult.isFire() && (contents = (Iterable)this.evictingWindowState.get()) != null) {
            this.emitWindowContents(this.triggerContext.window, contents, this.evictingWindowState);
        }
        if (triggerResult.isPurge()) {
            this.evictingWindowState.clear();
        }
        if (!this.windowAssigner.isEventTime() && this.isCleanupTime(this.triggerContext.window, timer.getTimestamp())) {
            this.clearAllState(this.triggerContext.window, this.evictingWindowState, mergingWindows);
        }
        if (mergingWindows != null) {
            mergingWindows.persist();
        }
    }

    private void emitWindowContents(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {
        this.timestampedCollector.setAbsoluteTimestamp(((Window)window).maxTimestamp());
        FluentIterable recordsWithTimestamp = FluentIterable.from(contents).transform(new Function<StreamRecord<IN>, TimestampedValue<IN>>(){

            public TimestampedValue<IN> apply(StreamRecord<IN> input) {
                return TimestampedValue.from(input);
            }
        });
        this.evictorContext.evictBefore(recordsWithTimestamp, Iterables.size((Iterable)recordsWithTimestamp));
        FluentIterable projectedContents = recordsWithTimestamp.transform(new Function<TimestampedValue<IN>, IN>(){

            public IN apply(TimestampedValue<IN> input) {
                return input.getValue();
            }
        });
        this.processContext.window = this.triggerContext.window;
        ((InternalWindowFunction)this.userFunction).process(this.triggerContext.key, this.triggerContext.window, this.processContext, projectedContents, this.timestampedCollector);
        this.evictorContext.evictAfter(recordsWithTimestamp, Iterables.size((Iterable)recordsWithTimestamp));
        windowState.update(recordsWithTimestamp.stream().map(TimestampedValue::getStreamRecord).collect(Collectors.toList()));
    }

    private void clearAllState(W window, ListState<StreamRecord<IN>> windowState, MergingWindowSet<W> mergingWindows) throws Exception {
        windowState.clear();
        this.triggerContext.clear();
        this.processContext.window = window;
        this.processContext.clear();
        if (mergingWindows != null) {
            mergingWindows.retireWindow(window);
            mergingWindows.persist();
        }
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.evictorContext = new EvictorContext(this, null, null);
        this.evictingWindowState = (InternalListState)this.getOrCreateKeyedState(this.windowSerializer, this.evictingWindowStateDescriptor);
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.evictorContext = null;
    }

    @VisibleForTesting
    public Evictor<? super IN, ? super W> getEvictor() {
        return this.evictor;
    }

    @Override
    @VisibleForTesting
    public StateDescriptor<? extends AppendingState<IN, Iterable<IN>>, ?> getStateDescriptor() {
        return this.evictingWindowStateDescriptor;
    }

    static class EvictorContext
    implements Evictor.EvictorContext {
        protected K key;
        protected W window;
        final /* synthetic */ EvictingWindowOperator this$0;

        public EvictorContext(K key, W window) {
            this.this$0 = this$0;
            this.key = key;
            this.window = window;
        }

        @Override
        public long getCurrentProcessingTime() {
            return this.this$0.internalTimerService.currentProcessingTime();
        }

        @Override
        public long getCurrentWatermark() {
            return this.this$0.internalTimerService.currentWatermark();
        }

        @Override
        public MetricGroup getMetricGroup() {
            return this.this$0.getMetricGroup();
        }

        public K getKey() {
            return this.key;
        }

        void evictBefore(Iterable<TimestampedValue<IN>> elements, int size) {
            this.this$0.evictor.evictBefore(elements, size, this.window, this);
        }

        void evictAfter(Iterable<TimestampedValue<IN>> elements, int size) {
            this.this$0.evictor.evictAfter(elements, size, this.window, this);
        }
    }
}

