/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.ArrayList;
import java.util.Collection;
import java.util.stream.Collectors;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.StateDescriptor;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.asyncprocessing.InternalAsyncFuture;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.TestInternalTimerService;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;

public class AsyncTriggerTestHarness<T, W extends Window>
extends TriggerTestHarness<T, W> {
    private final AsyncTrigger<T, W> trigger;
    private final AsyncKeyedStateBackend<Integer> asyncStateBackend;

    public AsyncTriggerTestHarness(AsyncTrigger<T, W> trigger, TypeSerializer<W> windowSerializer) throws Exception {
        super(null, windowSerializer);
        this.trigger = trigger;
        this.asyncStateBackend = new AsyncKeyedStateBackendAdaptor((CheckpointableKeyedStateBackend)this.stateBackend);
    }

    @Override
    public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception {
        return AsyncTriggerTestHarness.completeStateFuture(this.asyncProcessElement(element, window));
    }

    @Override
    public TriggerResult advanceProcessingTime(long time, W window) throws Exception {
        return AsyncTriggerTestHarness.completeStateFuture(this.asyncAdvanceProcessingTime(time, window));
    }

    @Override
    public TriggerResult advanceWatermark(long time, W window) throws Exception {
        return AsyncTriggerTestHarness.completeStateFuture(this.asyncAdvanceWatermark(time, window));
    }

    @Override
    public Collection<Tuple2<W, TriggerResult>> advanceProcessingTime(long time) throws Exception {
        return this.asyncAdvanceProcessingTime(time).stream().map(f -> Tuple2.of((Object)((Window)f.f0), (Object)((TriggerResult)AsyncTriggerTestHarness.completeStateFuture((StateFuture)f.f1)))).collect(Collectors.toList());
    }

    @Override
    public Collection<Tuple2<W, TriggerResult>> advanceWatermark(long time) throws Exception {
        return this.asyncAdvanceWatermark(time).stream().map(f -> Tuple2.of((Object)((Window)f.f0), (Object)((TriggerResult)AsyncTriggerTestHarness.completeStateFuture((StateFuture)f.f1)))).collect(Collectors.toList());
    }

    @Override
    public TriggerResult invokeOnEventTime(long timestamp, W window) throws Exception {
        return AsyncTriggerTestHarness.completeStateFuture(this.asyncInvokeOnEventTime(timestamp, window));
    }

    @Override
    public void mergeWindows(W targetWindow, Collection<W> mergedWindows) throws Exception {
        throw new UnsupportedOperationException();
    }

    @Override
    public void clearTriggerState(W window) throws Exception {
        AsyncTriggerTestHarness.completeStateFuture(this.asyncClearTriggerState(window));
    }

    StateFuture<TriggerResult> asyncProcessElement(StreamRecord<T> element, W window) throws Exception {
        TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<Integer, W>(KEY, window, this.internalTimerService, this.asyncStateBackend, this.windowSerializer);
        return this.trigger.onElement(element.getValue(), element.getTimestamp(), window, triggerContext);
    }

    StateFuture<TriggerResult> asyncAdvanceProcessingTime(long time, W window) throws Exception {
        Collection<Tuple2<W, StateFuture<TriggerResult>>> firings = this.asyncAdvanceProcessingTime(time);
        if (firings.size() != 1) {
            throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + String.valueOf(firings));
        }
        Tuple2<W, StateFuture<TriggerResult>> firing = firings.iterator().next();
        if (!((Window)firing.f0).equals(window)) {
            throw new IllegalStateException("Trigger fired for another window.");
        }
        return (StateFuture)firing.f1;
    }

    StateFuture<TriggerResult> asyncAdvanceWatermark(long time, W window) throws Exception {
        Collection<Tuple2<W, StateFuture<TriggerResult>>> firings = this.asyncAdvanceWatermark(time);
        if (firings.size() != 1) {
            throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + String.valueOf(firings));
        }
        Tuple2<W, StateFuture<TriggerResult>> firing = firings.iterator().next();
        if (!((Window)firing.f0).equals(window)) {
            throw new IllegalStateException("Trigger fired for another window.");
        }
        return (StateFuture)firing.f1;
    }

    Collection<Tuple2<W, StateFuture<TriggerResult>>> asyncAdvanceProcessingTime(long time) throws Exception {
        Collection firedTimers = this.internalTimerService.advanceProcessingTime(time);
        ArrayList<Tuple2<W, StateFuture<TriggerResult>>> result = new ArrayList<Tuple2<W, StateFuture<TriggerResult>>>();
        for (TestInternalTimerService.Timer timer : firedTimers) {
            TestTriggerContext<Integer, Window> triggerContext = new TestTriggerContext<Integer, Window>(KEY, (Window)timer.getNamespace(), this.internalTimerService, this.asyncStateBackend, this.windowSerializer);
            StateFuture triggerResult = this.trigger.onProcessingTime(timer.getTimestamp(), (Window)timer.getNamespace(), triggerContext);
            result.add(new Tuple2((Object)((Window)timer.getNamespace()), (Object)triggerResult));
        }
        return result;
    }

    Collection<Tuple2<W, StateFuture<TriggerResult>>> asyncAdvanceWatermark(long time) throws Exception {
        Collection firedTimers = this.internalTimerService.advanceWatermark(time);
        ArrayList<Tuple2<W, StateFuture<TriggerResult>>> result = new ArrayList<Tuple2<W, StateFuture<TriggerResult>>>();
        for (TestInternalTimerService.Timer timer : firedTimers) {
            StateFuture<TriggerResult> triggerResult = this.asyncInvokeOnEventTime(timer);
            result.add(new Tuple2((Object)((Window)timer.getNamespace()), triggerResult));
        }
        return result;
    }

    private StateFuture<TriggerResult> asyncInvokeOnEventTime(TestInternalTimerService.Timer<Integer, W> timer) throws Exception {
        TestTriggerContext<Integer, Window> triggerContext = new TestTriggerContext<Integer, Window>(KEY, (Window)timer.getNamespace(), this.internalTimerService, this.asyncStateBackend, this.windowSerializer);
        return this.trigger.onEventTime(timer.getTimestamp(), (Window)timer.getNamespace(), triggerContext);
    }

    StateFuture<TriggerResult> asyncInvokeOnEventTime(long timestamp, W window) throws Exception {
        TestInternalTimerService.Timer<Integer, W> timer = new TestInternalTimerService.Timer<Integer, W>(timestamp, KEY, window);
        return this.asyncInvokeOnEventTime(timer);
    }

    StateFuture<Void> asyncClearTriggerState(W window) throws Exception {
        TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<Integer, W>(KEY, window, this.internalTimerService, this.asyncStateBackend, this.windowSerializer);
        return this.trigger.clear(window, triggerContext);
    }

    static <T> T completeStateFuture(StateFuture<T> future) {
        InternalAsyncFuture internalAsyncFuture = (InternalAsyncFuture)future;
        Preconditions.checkArgument((boolean)internalAsyncFuture.isDone());
        return (T)internalAsyncFuture.get();
    }

    private static class TestTriggerContext<K, W extends Window>
    implements AsyncTrigger.TriggerContext {
        protected final InternalTimerService<W> timerService;
        protected final AsyncKeyedStateBackend<Integer> stateBackend;
        protected final K key;
        protected final W window;
        protected final TypeSerializer<W> windowSerializer;

        TestTriggerContext(K key, W window, InternalTimerService<W> timerService, AsyncKeyedStateBackend<Integer> stateBackend, TypeSerializer<W> windowSerializer) {
            this.key = key;
            this.window = window;
            this.timerService = timerService;
            this.stateBackend = stateBackend;
            this.windowSerializer = windowSerializer;
        }

        public long getCurrentProcessingTime() {
            return this.timerService.currentProcessingTime();
        }

        public MetricGroup getMetricGroup() {
            return null;
        }

        public long getCurrentWatermark() {
            return this.timerService.currentWatermark();
        }

        public void registerProcessingTimeTimer(long time) {
            this.timerService.registerProcessingTimeTimer(this.window, time);
        }

        public void registerEventTimeTimer(long time) {
            this.timerService.registerEventTimeTimer(this.window, time);
        }

        public void deleteProcessingTimeTimer(long time) {
            this.timerService.deleteProcessingTimeTimer(this.window, time);
        }

        public void deleteEventTimeTimer(long time) {
            this.timerService.deleteEventTimeTimer(this.window, time);
        }

        public <T, S extends State> S getPartitionedState(StateDescriptor<T> stateDescriptor) {
            try {
                return (S)this.stateBackend.getOrCreateKeyedState(this.window, this.windowSerializer, stateDescriptor);
            }
            catch (Exception e) {
                throw new RuntimeException("Could not retrieve state", e);
            }
        }
    }
}

