/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.time.Duration;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.v2.ListStateDescriptor;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.StateIterator;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncEvictingWindowOperator;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncWindowOperator;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalAggregateProcessAsyncWindowFunction;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalAsyncWindowFunction;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalIterableAsyncWindowFunction;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalIterableProcessAsyncWindowFunction;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalSingleValueAsyncWindowFunction;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalSingleValueProcessAsyncWindowFunction;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncCountTrigger;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncEventTimeTrigger;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncProcessingTimeTrigger;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncPurgingTrigger;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
import org.apache.flink.streaming.api.functions.windowing.AggregateApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

public class WindowOperatorBuilder<T, K, W extends Window> {
    private static final String WINDOW_STATE_NAME = "window-contents";
    private final ExecutionConfig config;
    private final WindowAssigner<? super T, W> windowAssigner;
    private final TypeInformation<T> inputType;
    private final KeySelector<T, K> keySelector;
    private final TypeInformation<K> keyType;
    private Trigger<? super T, ? super W> trigger;
    private AsyncTrigger<? super T, ? super W> asyncTrigger;
    @Nullable
    private Evictor<? super T, ? super W> evictor;
    private long allowedLateness = 0L;
    @Nullable
    private OutputTag<T> lateDataOutputTag;

    public WindowOperatorBuilder(WindowAssigner<? super T, W> windowAssigner, Trigger<? super T, ? super W> trigger, ExecutionConfig config, TypeInformation<T> inputType, KeySelector<T, K> keySelector, TypeInformation<K> keyType) {
        this.windowAssigner = windowAssigner;
        this.config = config;
        this.inputType = inputType;
        this.keySelector = keySelector;
        this.keyType = keyType;
        this.trigger = trigger;
    }

    public void trigger(Trigger<? super T, ? super W> trigger) {
        Preconditions.checkNotNull(trigger, "Window triggers cannot be null");
        if (this.windowAssigner instanceof MergingWindowAssigner && !trigger.canMerge()) {
            throw new UnsupportedOperationException("A merging window assigner cannot be used with a trigger that does not support merging.");
        }
        this.trigger = trigger;
    }

    public WindowOperatorBuilder<T, K, W> asyncTrigger(AsyncTrigger<? super T, ? super W> asyncTrigger) {
        Preconditions.checkNotNull(asyncTrigger, "AsyncTrigger cannot be null");
        if (this.windowAssigner instanceof MergingWindowAssigner && !asyncTrigger.canMerge()) {
            throw new UnsupportedOperationException("A merging window assigner cannot be used with a trigger that does not support merging.");
        }
        this.asyncTrigger = asyncTrigger;
        return this;
    }

    public void allowedLateness(Duration lateness) {
        Preconditions.checkNotNull(lateness, "Allowed lateness cannot be null");
        long millis = lateness.toMillis();
        Preconditions.checkArgument(millis >= 0L, "The allowed lateness cannot be negative.");
        this.allowedLateness = millis;
    }

    public void sideOutputLateData(OutputTag<T> outputTag) {
        Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
        this.lateDataOutputTag = outputTag;
    }

    public void evictor(Evictor<? super T, ? super W> evictor) {
        Preconditions.checkNotNull(evictor, "Evictor cannot be null");
        this.evictor = evictor;
    }

    public <R> OneInputStreamOperator<T, R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
        Preconditions.checkNotNull(reduceFunction, "ReduceFunction cannot be null");
        Preconditions.checkNotNull(function, "WindowFunction cannot be null");
        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
        }
        if (this.evictor != null) {
            return this.buildEvictingWindowOperator(new InternalIterableWindowFunction<K, W, T, R>(new ReduceApplyWindowFunction<K, W, T, R>(reduceFunction, function)));
        }
        ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<T>(WINDOW_STATE_NAME, reduceFunction, this.inputType.createSerializer(this.config.getSerializerConfig()));
        return this.buildWindowOperator(stateDesc, new InternalSingleValueWindowFunction<T, R, K, W>(function));
    }

    public <R> OneInputStreamOperator<T, R> asyncReduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
        Preconditions.checkNotNull(reduceFunction, "ReduceFunction cannot be null");
        Preconditions.checkNotNull(function, "WindowFunction cannot be null");
        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
        }
        if (this.evictor != null) {
            return this.buildAsyncEvictingWindowOperator(new InternalIterableAsyncWindowFunction<K, W, T, R>(new ReduceApplyWindowFunction<K, W, T, R>(reduceFunction, function)));
        }
        org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> stateDesc = new org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T>(WINDOW_STATE_NAME, reduceFunction, this.inputType.createSerializer(this.config.getSerializerConfig()));
        return this.buildAsyncWindowOperator(stateDesc, new InternalSingleValueAsyncWindowFunction<T, R, K, W>(function));
    }

    public <R> OneInputStreamOperator<T, R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function) {
        Preconditions.checkNotNull(reduceFunction, "ReduceFunction cannot be null");
        Preconditions.checkNotNull(function, "ProcessWindowFunction cannot be null");
        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
        }
        if (this.evictor != null) {
            return this.buildEvictingWindowOperator(new InternalIterableProcessWindowFunction<K, W, T, R>(new ReduceApplyProcessWindowFunction<K, W, T, R>(reduceFunction, function)));
        }
        ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<T>(WINDOW_STATE_NAME, reduceFunction, this.inputType.createSerializer(this.config.getSerializerConfig()));
        return this.buildWindowOperator(stateDesc, new InternalSingleValueProcessWindowFunction<T, R, K, W>(function));
    }

    public <R> OneInputStreamOperator<T, R> asyncReduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function) {
        Preconditions.checkNotNull(reduceFunction, "ReduceFunction cannot be null");
        Preconditions.checkNotNull(function, "ProcessWindowFunction cannot be null");
        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
        }
        if (this.evictor != null) {
            return this.buildAsyncEvictingWindowOperator(new InternalIterableProcessAsyncWindowFunction<K, W, T, R>(new ReduceApplyProcessWindowFunction<K, W, T, R>(reduceFunction, function)));
        }
        org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> stateDesc = new org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T>(WINDOW_STATE_NAME, reduceFunction, this.inputType.createSerializer(this.config.getSerializerConfig()));
        return this.buildAsyncWindowOperator(stateDesc, new InternalSingleValueProcessAsyncWindowFunction<T, R, K, W>(function));
    }

    public <ACC, V, R> OneInputStreamOperator<T, R> aggregate(AggregateFunction<T, ACC, V> aggregateFunction, WindowFunction<V, R, K, W> windowFunction, TypeInformation<ACC> accumulatorType) {
        Preconditions.checkNotNull(aggregateFunction, "AggregateFunction cannot be null");
        Preconditions.checkNotNull(windowFunction, "WindowFunction cannot be null");
        if (aggregateFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction.");
        }
        if (this.evictor != null) {
            return this.buildEvictingWindowOperator(new InternalIterableWindowFunction(new AggregateApplyWindowFunction<K, W, T, ACC, V, R>(aggregateFunction, windowFunction)));
        }
        org.apache.flink.api.common.state.AggregatingStateDescriptor<T, ACC, V> stateDesc = new org.apache.flink.api.common.state.AggregatingStateDescriptor<T, ACC, V>(WINDOW_STATE_NAME, aggregateFunction, accumulatorType.createSerializer(this.config.getSerializerConfig()));
        return this.buildWindowOperator(stateDesc, new InternalSingleValueWindowFunction<V, R, K, W>(windowFunction));
    }

    public <ACC, V, R> OneInputStreamOperator<T, R> asyncAggregate(AggregateFunction<T, ACC, V> aggregateFunction, WindowFunction<V, R, K, W> windowFunction, TypeInformation<ACC> accumulatorType) {
        Preconditions.checkNotNull(aggregateFunction, "AggregateFunction cannot be null");
        Preconditions.checkNotNull(windowFunction, "WindowFunction cannot be null");
        if (aggregateFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction.");
        }
        if (this.evictor != null) {
            return this.buildAsyncEvictingWindowOperator(new InternalIterableAsyncWindowFunction(new AggregateApplyWindowFunction<K, W, T, ACC, V, R>(aggregateFunction, windowFunction)));
        }
        AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<T, ACC, V>(WINDOW_STATE_NAME, aggregateFunction, accumulatorType.createSerializer(this.config.getSerializerConfig()));
        return this.buildAsyncWindowOperator(stateDesc, new InternalSingleValueAsyncWindowFunction<V, R, K, W>(windowFunction));
    }

    public <ACC, V, R> OneInputStreamOperator<T, R> aggregate(AggregateFunction<T, ACC, V> aggregateFunction, ProcessWindowFunction<V, R, K, W> windowFunction, TypeInformation<ACC> accumulatorType) {
        Preconditions.checkNotNull(aggregateFunction, "AggregateFunction cannot be null");
        Preconditions.checkNotNull(windowFunction, "ProcessWindowFunction cannot be null");
        if (aggregateFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction.");
        }
        if (this.evictor != null) {
            return this.buildEvictingWindowOperator(new InternalAggregateProcessWindowFunction<T, ACC, V, R, K, W>(aggregateFunction, windowFunction));
        }
        org.apache.flink.api.common.state.AggregatingStateDescriptor<T, ACC, V> stateDesc = new org.apache.flink.api.common.state.AggregatingStateDescriptor<T, ACC, V>(WINDOW_STATE_NAME, aggregateFunction, accumulatorType.createSerializer(this.config.getSerializerConfig()));
        return this.buildWindowOperator(stateDesc, new InternalSingleValueProcessWindowFunction<V, R, K, W>(windowFunction));
    }

    public <ACC, V, R> OneInputStreamOperator<T, R> asyncAggregate(AggregateFunction<T, ACC, V> aggregateFunction, ProcessWindowFunction<V, R, K, W> windowFunction, TypeInformation<ACC> accumulatorType) {
        Preconditions.checkNotNull(aggregateFunction, "AggregateFunction cannot be null");
        Preconditions.checkNotNull(windowFunction, "ProcessWindowFunction cannot be null");
        if (aggregateFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction.");
        }
        if (this.evictor != null) {
            return this.buildAsyncEvictingWindowOperator(new InternalAggregateProcessAsyncWindowFunction<T, ACC, V, R, K, W>(aggregateFunction, windowFunction));
        }
        AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<T, ACC, V>(WINDOW_STATE_NAME, aggregateFunction, accumulatorType.createSerializer(this.config.getSerializerConfig()));
        return this.buildAsyncWindowOperator(stateDesc, new InternalSingleValueProcessAsyncWindowFunction<V, R, K, W>(windowFunction));
    }

    public <R> OneInputStreamOperator<T, R> apply(WindowFunction<T, R, K, W> function) {
        Preconditions.checkNotNull(function, "WindowFunction cannot be null");
        return this.apply(new InternalIterableWindowFunction<T, R, K, W>(function));
    }

    public <R> OneInputStreamOperator<T, R> process(ProcessWindowFunction<T, R, K, W> function) {
        Preconditions.checkNotNull(function, "ProcessWindowFunction cannot be null");
        return this.apply(new InternalIterableProcessWindowFunction<T, R, K, W>(function));
    }

    private <R> OneInputStreamOperator<T, R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function) {
        if (this.evictor != null) {
            return this.buildEvictingWindowOperator(function);
        }
        org.apache.flink.api.common.state.ListStateDescriptor<T> stateDesc = new org.apache.flink.api.common.state.ListStateDescriptor<T>(WINDOW_STATE_NAME, this.inputType.createSerializer(this.config.getSerializerConfig()));
        return this.buildWindowOperator(stateDesc, function);
    }

    public <R> OneInputStreamOperator<T, R> asyncApply(WindowFunction<T, R, K, W> function) {
        Preconditions.checkNotNull(function, "WindowFunction cannot be null");
        return this.asyncApply(new InternalIterableAsyncWindowFunction<T, R, K, W>(function));
    }

    public <R> OneInputStreamOperator<T, R> asyncProcess(ProcessWindowFunction<T, R, K, W> function) {
        Preconditions.checkNotNull(function, "ProcessWindowFunction cannot be null");
        return this.asyncApply(new InternalIterableProcessAsyncWindowFunction<T, R, K, W>(function));
    }

    private <R> OneInputStreamOperator<T, R> asyncApply(InternalAsyncWindowFunction<StateIterator<T>, R, K, W> function) {
        if (this.evictor != null) {
            return this.buildAsyncEvictingWindowOperator(function);
        }
        ListStateDescriptor<T> stateDesc = new ListStateDescriptor<T>(WINDOW_STATE_NAME, this.inputType.createSerializer(this.config.getSerializerConfig()));
        return this.buildAsyncWindowOperator(stateDesc, function);
    }

    private <ACC, R> WindowOperator<K, T, ACC, R, W> buildWindowOperator(StateDescriptor<? extends AppendingState<T, ACC>, ?> stateDesc, InternalWindowFunction<ACC, R, K, W> function) {
        return new WindowOperator<K, T, ACC, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.config), this.keySelector, this.keyType.createSerializer(this.config.getSerializerConfig()), stateDesc, function, this.trigger, this.allowedLateness, this.lateDataOutputTag);
    }

    private <R> WindowOperator<K, T, Iterable<T>, R, W> buildEvictingWindowOperator(InternalWindowFunction<Iterable<T>, R, K, W> function) {
        StreamElementSerializer<T> streamRecordSerializer = new StreamElementSerializer<T>(this.inputType.createSerializer(this.config.getSerializerConfig()));
        org.apache.flink.api.common.state.ListStateDescriptor<StreamElement> stateDesc = new org.apache.flink.api.common.state.ListStateDescriptor<StreamElement>(WINDOW_STATE_NAME, streamRecordSerializer);
        return new EvictingWindowOperator<K, T, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.config), this.keySelector, this.keyType.createSerializer(this.config.getSerializerConfig()), stateDesc, function, this.trigger, this.evictor, this.allowedLateness, this.lateDataOutputTag);
    }

    private <ACC, R> AsyncWindowOperator<K, T, ACC, R, W> buildAsyncWindowOperator(org.apache.flink.api.common.state.v2.StateDescriptor<?> stateDesc, InternalAsyncWindowFunction<ACC, R, K, W> function) {
        return new AsyncWindowOperator<K, T, ACC, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.config), this.keySelector, this.keyType.createSerializer(this.config.getSerializerConfig()), stateDesc, function, this.asyncTrigger == null ? AsyncTriggerConverter.convertToAsync(this.trigger) : this.asyncTrigger, this.allowedLateness, this.lateDataOutputTag);
    }

    private <R> AsyncWindowOperator<K, T, StateIterator<T>, R, W> buildAsyncEvictingWindowOperator(InternalAsyncWindowFunction<StateIterator<T>, R, K, W> function) {
        StreamElementSerializer<T> streamRecordSerializer = new StreamElementSerializer<T>(this.inputType.createSerializer(this.config.getSerializerConfig()));
        ListStateDescriptor<StreamElement> stateDesc = new ListStateDescriptor<StreamElement>(WINDOW_STATE_NAME, streamRecordSerializer);
        return new AsyncEvictingWindowOperator<K, T, R, W>(this.windowAssigner, this.windowAssigner.getWindowSerializer(this.config), this.keySelector, this.keyType.createSerializer(this.config.getSerializerConfig()), stateDesc, function, this.asyncTrigger == null ? AsyncTriggerConverter.convertToAsync(this.trigger) : this.asyncTrigger, this.evictor, this.allowedLateness, this.lateDataOutputTag);
    }

    protected static String generateFunctionName(Function function) {
        Class<?> functionClass = function.getClass();
        if (functionClass.isAnonymousClass()) {
            Class<?>[] interfaces = functionClass.getInterfaces();
            if (interfaces.length == 0) {
                Class<?> functionSuperClass = functionClass.getSuperclass();
                return functionSuperClass.getSimpleName() + functionClass.getName().substring(functionClass.getEnclosingClass().getName().length());
            }
            Class<?> functionInterface = functionClass.getInterfaces()[0];
            return functionInterface.getSimpleName() + functionClass.getName().substring(functionClass.getEnclosingClass().getName().length());
        }
        return functionClass.getSimpleName();
    }

    public String generateOperatorName() {
        return this.windowAssigner.getClass().getSimpleName();
    }

    public String generateOperatorDescription(Function function1, @Nullable Function function2) {
        return "Window(" + String.valueOf(this.windowAssigner) + ", " + this.trigger.getClass().getSimpleName() + ", " + (String)(this.evictor == null ? "" : this.evictor.getClass().getSimpleName() + ", ") + WindowOperatorBuilder.generateFunctionName(function1) + (String)(function2 == null ? "" : ", " + WindowOperatorBuilder.generateFunctionName(function2)) + ")";
    }

    @VisibleForTesting
    public long getAllowedLateness() {
        return this.allowedLateness;
    }

    private static class AsyncTriggerContextConvertor
    implements Trigger.TriggerContext {
        private final AsyncTrigger.TriggerContext asyncTriggerContext;

        private AsyncTriggerContextConvertor(AsyncTrigger.TriggerContext asyncTriggerContext) {
            this.asyncTriggerContext = asyncTriggerContext;
        }

        @Override
        public long getCurrentProcessingTime() {
            return this.asyncTriggerContext.getCurrentProcessingTime();
        }

        @Override
        public MetricGroup getMetricGroup() {
            return this.asyncTriggerContext.getMetricGroup();
        }

        @Override
        public long getCurrentWatermark() {
            return this.asyncTriggerContext.getCurrentWatermark();
        }

        @Override
        public void registerProcessingTimeTimer(long time) {
            this.asyncTriggerContext.registerProcessingTimeTimer(time);
        }

        @Override
        public void registerEventTimeTimer(long time) {
            this.asyncTriggerContext.registerEventTimeTimer(time);
        }

        @Override
        public void deleteProcessingTimeTimer(long time) {
            this.asyncTriggerContext.deleteProcessingTimeTimer(time);
        }

        @Override
        public void deleteEventTimeTimer(long time) {
            this.asyncTriggerContext.deleteEventTimeTimer(time);
        }

        @Override
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            throw new UnsupportedOperationException("Trigger is for state V1 APIs, window operator with async state enabled only accept state V2 APIs.");
        }

        public static Trigger.TriggerContext of(AsyncTrigger.TriggerContext asyncTriggerContext) {
            return new AsyncTriggerContextConvertor(asyncTriggerContext);
        }
    }

    private static class AsyncTriggerConverter {
        private AsyncTriggerConverter() {
        }

        public static <T, W extends Window> AsyncTrigger<T, W> convertToAsync(Trigger<T, W> trigger) {
            if (trigger instanceof CountTrigger) {
                return AsyncCountTrigger.of(((CountTrigger)trigger).getMaxCount());
            }
            if (trigger instanceof EventTimeTrigger) {
                return AsyncEventTimeTrigger.create();
            }
            if (trigger instanceof ProcessingTimeTrigger) {
                return AsyncProcessingTimeTrigger.create();
            }
            if (trigger instanceof PurgingTrigger) {
                return AsyncPurgingTrigger.of(AsyncTriggerConverter.convertToAsync(((PurgingTrigger)trigger).getNestedTrigger()));
            }
            return UserDefinedAsyncTrigger.of(trigger);
        }
    }

    private static class UserDefinedAsyncTrigger<T, W extends Window>
    extends AsyncTrigger<T, W> {
        private final Trigger<T, W> userDefinedTrigger;

        private UserDefinedAsyncTrigger(Trigger<T, W> userDefinedTrigger) {
            this.userDefinedTrigger = userDefinedTrigger;
        }

        @Override
        public StateFuture<TriggerResult> onElement(T element, long timestamp, W window, AsyncTrigger.TriggerContext ctx) throws Exception {
            return StateFutureUtils.completedFuture(this.userDefinedTrigger.onElement(element, timestamp, window, AsyncTriggerContextConvertor.of(ctx)));
        }

        @Override
        public StateFuture<TriggerResult> onProcessingTime(long time, W window, AsyncTrigger.TriggerContext ctx) throws Exception {
            return StateFutureUtils.completedFuture(this.userDefinedTrigger.onProcessingTime(time, window, AsyncTriggerContextConvertor.of(ctx)));
        }

        @Override
        public StateFuture<TriggerResult> onEventTime(long time, W window, AsyncTrigger.TriggerContext ctx) throws Exception {
            return StateFutureUtils.completedFuture(this.userDefinedTrigger.onEventTime(time, window, AsyncTriggerContextConvertor.of(ctx)));
        }

        @Override
        public StateFuture<Void> clear(W window, AsyncTrigger.TriggerContext ctx) throws Exception {
            this.userDefinedTrigger.clear(window, AsyncTriggerContextConvertor.of(ctx));
            return StateFutureUtils.completedVoidFuture();
        }

        @Override
        public boolean isEndOfStreamTrigger() {
            return this.userDefinedTrigger instanceof GlobalWindows.EndOfStreamTrigger;
        }

        public static <T, W extends Window> AsyncTrigger<T, W> of(Trigger<T, W> userDefinedTrigger) {
            return new UserDefinedAsyncTrigger<T, W>(userDefinedTrigger);
        }
    }
}

