/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers;

import java.time.Duration;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.v2.ReducingState;
import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;

@Internal
public class AsyncContinuousEventTimeTrigger<W extends Window>
extends AsyncTrigger<Object, W> {
    private static final long serialVersionUID = 1L;
    private final long interval;
    private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor("fire-time", (ReduceFunction)new Min(), (TypeSerializer)LongSerializer.INSTANCE);

    private AsyncContinuousEventTimeTrigger(long interval) {
        this.interval = interval;
    }

    @Override
    public StateFuture<TriggerResult> onElement(Object element, long timestamp, W window, AsyncTrigger.TriggerContext ctx) throws Exception {
        if (((Window)window).maxTimestamp() <= ctx.getCurrentWatermark()) {
            return StateFutureUtils.completedFuture((Object)((Object)TriggerResult.FIRE));
        }
        ctx.registerEventTimeTimer(((Window)window).maxTimestamp());
        ReducingState fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc);
        return fireTimestampState.asyncGet().thenCompose(ts -> {
            if (ts == null) {
                this.registerNextFireTimestamp(timestamp - timestamp % this.interval, window, ctx, (ReducingState<Long>)fireTimestampState);
            }
            return StateFutureUtils.completedFuture((Object)((Object)TriggerResult.CONTINUE));
        });
    }

    @Override
    public StateFuture<TriggerResult> onEventTime(long time, W window, AsyncTrigger.TriggerContext ctx) throws Exception {
        if (time == ((Window)window).maxTimestamp()) {
            return StateFutureUtils.completedFuture((Object)((Object)TriggerResult.FIRE));
        }
        ReducingState fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc);
        return fireTimestampState.asyncGet().thenCompose(fireTimestamp -> {
            if (fireTimestamp != null && fireTimestamp == time) {
                return fireTimestampState.asyncClear().thenCompose(ignore -> this.registerNextFireTimestamp(time, window, ctx, (ReducingState<Long>)fireTimestampState)).thenApply(ignore -> TriggerResult.FIRE);
            }
            return StateFutureUtils.completedFuture((Object)((Object)TriggerResult.CONTINUE));
        });
    }

    @Override
    public StateFuture<TriggerResult> onProcessingTime(long time, W window, AsyncTrigger.TriggerContext ctx) throws Exception {
        return StateFutureUtils.completedFuture((Object)((Object)TriggerResult.CONTINUE));
    }

    @Override
    public StateFuture<Void> clear(W window, AsyncTrigger.TriggerContext ctx) throws Exception {
        ReducingState fireTimestamp = (ReducingState)ctx.getPartitionedState(this.stateDesc);
        return fireTimestamp.asyncGet().thenCompose(ts -> {
            if (ts != null) {
                ctx.deleteEventTimeTimer((long)ts);
                return fireTimestamp.asyncClear();
            }
            return StateFutureUtils.completedVoidFuture();
        });
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(W window, AsyncTrigger.OnMergeContext ctx) throws Exception {
        throw new RuntimeException("Merge window not support");
    }

    public String toString() {
        return "ContinuousEventTimeTrigger(" + this.interval + ")";
    }

    @VisibleForTesting
    public long getInterval() {
        return this.interval;
    }

    public static <W extends Window> AsyncContinuousEventTimeTrigger<W> of(Duration interval) {
        return new AsyncContinuousEventTimeTrigger<W>(interval.toMillis());
    }

    private StateFuture<Void> registerNextFireTimestamp(long time, W window, AsyncTrigger.TriggerContext ctx, ReducingState<Long> fireTimestampState) throws Exception {
        long nextFireTimestamp = Math.min(time + this.interval, ((Window)window).maxTimestamp());
        return fireTimestampState.asyncAdd((Object)nextFireTimestamp).thenAccept(ignore -> ctx.registerEventTimeTimer(nextFireTimestamp));
    }

    private static class Min
    implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private Min() {
        }

        public Long reduce(Long value1, Long value2) throws Exception {
            return Math.min(value1, value2);
        }
    }
}

