/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing.operators.windowing;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.StateDescriptor;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.StateIterator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncWindowOperator;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalAsyncWindowFunction;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
import org.apache.flink.runtime.state.v2.adaptor.CompleteStateIterator;
import org.apache.flink.runtime.state.v2.internal.InternalListState;
import org.apache.flink.shaded.guava32.com.google.common.collect.FluentIterable;
import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

@Internal
public class AsyncEvictingWindowOperator<K, IN, OUT, W extends Window>
extends AsyncWindowOperator<K, IN, StateIterator<IN>, OUT, W> {
    private static final long serialVersionUID = 1L;
    private final Evictor<? super IN, ? super W> evictor;
    private final StateDescriptor<StreamRecord<IN>> evictingWindowStateDescriptor;
    private transient EvictorContext evictorContext;
    private transient InternalListState<K, W, StreamRecord<IN>> evictingWindowState;

    public AsyncEvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, StateDescriptor<StreamRecord<IN>> windowStateDescriptor, InternalAsyncWindowFunction<StateIterator<IN>, OUT, K, W> windowFunction, AsyncTrigger<? super IN, ? super W> trigger, Evictor<? super IN, ? super W> evictor, long allowedLateness, OutputTag<IN> lateDataOutputTag) {
        super(windowAssigner, windowSerializer, keySelector, keySerializer, null, windowFunction, trigger, allowedLateness, lateDataOutputTag);
        this.evictor = (Evictor)Preconditions.checkNotNull(evictor);
        this.evictingWindowStateDescriptor = (StateDescriptor)Preconditions.checkNotNull(windowStateDescriptor);
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        Collection elementWindows = this.windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), this.windowAssignerContext);
        Object key = this.getCurrentKey();
        ArrayList<StateFuture> windowFutures = new ArrayList<StateFuture>();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            throw new UnsupportedOperationException("Async WindowOperator not support merging window (e.g. session window) yet.");
        }
        for (Window window : elementWindows) {
            if (this.isWindowLate(window)) continue;
            AtomicReference triggerResult = new AtomicReference();
            this.windowDeclaredVariable.set(window);
            windowFutures.add(this.evictingWindowState.asyncAdd(element).thenCompose(ignore -> this.triggerContext.onElement(element).thenAccept(triggerResult::set)).thenConditionallyCompose(ignore -> ((TriggerResult)((Object)((Object)triggerResult.get()))).isFire(), ignore -> this.evictingWindowState.asyncGet().thenConditionallyCompose(Objects::nonNull, contents -> this.emitWindowContents((K)key, (W)window, (StateIterator<StreamRecord<IN>>)contents, (ListState<StreamRecord<IN>>)this.evictingWindowState))).thenConditionallyCompose(ignore -> ((TriggerResult)((Object)((Object)triggerResult.get()))).isPurge(), ignore -> this.evictingWindowState.asyncClear()).thenAccept(ignore -> this.registerCleanupTimer(window)));
        }
        if (windowFutures.isEmpty() && this.isElementLate(element)) {
            if (this.lateDataOutputTag != null) {
                this.sideOutput(element);
            } else {
                this.numLateRecordsDropped.inc();
            }
        }
    }

    @Override
    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
        this.windowDeclaredVariable.set((Window)timer.getNamespace());
        AtomicReference triggerResult = new AtomicReference();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            throw new UnsupportedOperationException("Async WindowOperator not support merging window (e.g. session window) yet.");
        }
        this.triggerContext.onEventTime(timer.getTimestamp()).thenAccept(triggerResult::set).thenConditionallyCompose(ignore -> ((TriggerResult)((Object)((Object)triggerResult.get()))).isFire(), ignore -> this.evictingWindowState.asyncGet().thenConditionallyCompose(Objects::nonNull, contents -> this.emitWindowContents(timer.getKey(), (W)((Window)timer.getNamespace()), (StateIterator<StreamRecord<IN>>)contents, (ListState<StreamRecord<IN>>)this.evictingWindowState))).thenConditionallyCompose(ignore -> ((TriggerResult)((Object)((Object)triggerResult.get()))).isPurge(), ignore -> this.evictingWindowState.asyncClear()).thenConditionallyCompose(ignore -> this.windowAssigner.isEventTime() && this.isCleanupTime((Window)timer.getNamespace(), timer.getTimestamp()), ignore -> this.clearAllState((Window)timer.getNamespace(), this.evictingWindowState));
    }

    @Override
    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
        this.windowDeclaredVariable.set((Window)timer.getNamespace());
        AtomicReference triggerResult = new AtomicReference();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            throw new UnsupportedOperationException("Async WindowOperator not support merging window (e.g. session window) yet.");
        }
        this.triggerContext.onProcessingTime(timer.getTimestamp()).thenAccept(triggerResult::set).thenConditionallyCompose(ignore -> ((TriggerResult)((Object)((Object)triggerResult.get()))).isFire(), ignore -> this.evictingWindowState.asyncGet().thenConditionallyCompose(Objects::nonNull, contents -> this.emitWindowContents(timer.getKey(), (W)((Window)timer.getNamespace()), (StateIterator<StreamRecord<IN>>)contents, (ListState<StreamRecord<IN>>)this.evictingWindowState))).thenConditionallyCompose(ignore -> ((TriggerResult)((Object)((Object)triggerResult.get()))).isPurge(), ignore -> this.evictingWindowState.asyncClear()).thenConditionallyCompose(ignore -> !this.windowAssigner.isEventTime() && this.isCleanupTime((Window)timer.getNamespace(), timer.getTimestamp()), ignore -> this.clearAllState((Window)timer.getNamespace(), this.evictingWindowState));
    }

    private StateFuture<Void> emitWindowContents(K key, W window, StateIterator<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {
        this.timestampedCollector.setAbsoluteTimestamp(((Window)window).maxTimestamp());
        ArrayList elements = new ArrayList();
        return contents.onNext(item -> elements.add(item)).thenApply(ignore -> {
            FluentIterable recordsWithTimestamp = FluentIterable.from((Iterable)elements).transform(TimestampedValue::from);
            this.evictorContext.evictBefore(recordsWithTimestamp, Iterables.size((Iterable)recordsWithTimestamp));
            FluentIterable projectedContents = recordsWithTimestamp.transform(TimestampedValue::getValue);
            CompleteStateIterator projectedContentIter = new CompleteStateIterator(projectedContents);
            return Tuple2.of((Object)recordsWithTimestamp, projectedContentIter);
        }).thenCompose(tuple -> ((InternalAsyncWindowFunction)this.userFunction).process(key, window, this.processContext, (StateIterator)tuple.f1, this.timestampedCollector).thenCompose(ignore -> {
            this.evictorContext.evictAfter((Iterable)tuple.f0, Iterables.size((Iterable)((Iterable)tuple.f0)));
            return windowState.asyncUpdate(((FluentIterable)tuple.f0).stream().map(TimestampedValue::getStreamRecord).collect(Collectors.toList()));
        }));
    }

    private StateFuture<Void> clearAllState(W window, ListState<StreamRecord<IN>> windowState) {
        return windowState.asyncClear().thenCompose(ignore -> this.triggerContext.clear()).thenCompose(ignore -> this.processContext.clear());
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.evictorContext = new EvictorContext(this.windowDeclaredVariable);
        this.evictingWindowState = (InternalListState)this.getOrCreateKeyedState((Window)this.windowSerializer.createInstance(), this.windowSerializer, this.evictingWindowStateDescriptor);
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.evictorContext = null;
    }

    @VisibleForTesting
    public Evictor<? super IN, ? super W> getEvictor() {
        return this.evictor;
    }

    @Override
    @VisibleForTesting
    public StateDescriptor<?> getStateDescriptor() {
        return this.evictingWindowStateDescriptor;
    }

    class EvictorContext
    implements Evictor.EvictorContext {
        protected DeclaredVariable<W> window;

        public EvictorContext(DeclaredVariable<W> window) {
            this.window = window;
        }

        @Override
        public long getCurrentProcessingTime() {
            return AsyncEvictingWindowOperator.this.internalTimerService.currentProcessingTime();
        }

        @Override
        public long getCurrentWatermark() {
            return AsyncEvictingWindowOperator.this.internalTimerService.currentWatermark();
        }

        @Override
        public MetricGroup getMetricGroup() {
            return AsyncEvictingWindowOperator.this.getMetricGroup();
        }

        void evictBefore(Iterable<TimestampedValue<IN>> elements, int size) {
            AsyncEvictingWindowOperator.this.evictor.evictBefore(elements, size, (Window)this.window.get(), this);
        }

        void evictAfter(Iterable<TimestampedValue<IN>> elements, int size) {
            AsyncEvictingWindowOperator.this.evictor.evictAfter(elements, size, (Window)this.window.get(), this);
        }
    }
}

