/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.v2.ReducingState;
import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;

@Experimental
public class AsyncCountTrigger<W extends Window>
extends AsyncTrigger<Object, W> {
    private static final long serialVersionUID = 1L;
    private final long maxCount;
    private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<Long>("count", new Sum(), LongSerializer.INSTANCE);

    private AsyncCountTrigger(long maxCount) {
        this.maxCount = maxCount;
    }

    @Override
    public StateFuture<TriggerResult> onElement(Object element, long timestamp, W window, AsyncTrigger.TriggerContext ctx) throws Exception {
        ReducingState count = (ReducingState)ctx.getPartitionedState(this.stateDesc);
        return count.asyncAdd(1L).thenCompose(ignore -> count.asyncGet()).thenCompose(cnt -> cnt >= this.maxCount ? count.asyncClear().thenCompose(ignore -> StateFutureUtils.completedFuture(TriggerResult.FIRE)) : StateFutureUtils.completedFuture(TriggerResult.CONTINUE));
    }

    @Override
    public StateFuture<TriggerResult> onEventTime(long time, W window, AsyncTrigger.TriggerContext ctx) {
        return StateFutureUtils.completedFuture(TriggerResult.CONTINUE);
    }

    @Override
    public StateFuture<TriggerResult> onProcessingTime(long time, W window, AsyncTrigger.TriggerContext ctx) throws Exception {
        return StateFutureUtils.completedFuture(TriggerResult.CONTINUE);
    }

    @Override
    public StateFuture<Void> clear(W window, AsyncTrigger.TriggerContext ctx) throws Exception {
        return ctx.getPartitionedState(this.stateDesc).asyncClear();
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(W window, AsyncTrigger.OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(this.stateDesc);
    }

    public String toString() {
        return "AsyncCountTrigger(" + this.maxCount + ")";
    }

    public static <W extends Window> AsyncCountTrigger<W> of(long maxCount) {
        return new AsyncCountTrigger<W>(maxCount);
    }

    private static class Sum
    implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private Sum() {
        }

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }
    }
}

