package org.apache.flink.streaming.runtime.operators.windowing;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.internal.InternalMergingState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.TestInternalTimerService;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.class */
public class TriggerTestHarness<T, W extends Window> {
    private static final Integer KEY = 1;
    private final Trigger<T, W> trigger;
    private final TypeSerializer<W> windowSerializer;
    private final HeapKeyedStateBackend<Integer> stateBackend;
    private final TestInternalTimerService<Integer, W> internalTimerService;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness$TestOnMergeContext.class */
    private static class TestOnMergeContext<K, W extends Window> extends TestTriggerContext<K, W> implements Trigger.OnMergeContext {
        private final Collection<W> mergedWindows;

        public TestOnMergeContext(K k, W w, Collection<W> collection, InternalTimerService<W> internalTimerService, KeyedStateBackend<Integer> keyedStateBackend, TypeSerializer<W> typeSerializer) {
            super(k, w, internalTimerService, keyedStateBackend, typeSerializer);
            this.mergedWindows = collection;
        }

        public <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            try {
                InternalMergingState internalMergingState = (MergingState) this.stateBackend.getOrCreateKeyedState(this.windowSerializer, stateDescriptor);
                if (!(internalMergingState instanceof InternalMergingState)) {
                    throw new IllegalArgumentException("The given state descriptor does not refer to a mergeable state (MergingState)");
                }
                internalMergingState.mergeNamespaces(this.window, this.mergedWindows);
            } catch (Exception e) {
                throw new RuntimeException("Error while merging state.", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness$TestTriggerContext.class */
    public static class TestTriggerContext<K, W extends Window> implements Trigger.TriggerContext {
        protected final InternalTimerService<W> timerService;
        protected final KeyedStateBackend<Integer> stateBackend;
        protected final K key;
        protected final W window;
        protected final TypeSerializer<W> windowSerializer;

        TestTriggerContext(K k, W w, InternalTimerService<W> internalTimerService, KeyedStateBackend<Integer> keyedStateBackend, TypeSerializer<W> typeSerializer) {
            this.key = k;
            this.window = w;
            this.timerService = internalTimerService;
            this.stateBackend = keyedStateBackend;
            this.windowSerializer = typeSerializer;
        }

        public long getCurrentProcessingTime() {
            return this.timerService.currentProcessingTime();
        }

        public MetricGroup getMetricGroup() {
            return null;
        }

        public long getCurrentWatermark() {
            return this.timerService.currentWatermark();
        }

        public void registerProcessingTimeTimer(long j) {
            this.timerService.registerProcessingTimeTimer(this.window, j);
        }

        public void registerEventTimeTimer(long j) {
            this.timerService.registerEventTimeTimer(this.window, j);
        }

        public void deleteProcessingTimeTimer(long j) {
            this.timerService.deleteProcessingTimeTimer(this.window, j);
        }

        public void deleteEventTimeTimer(long j) {
            this.timerService.deleteEventTimeTimer(this.window, j);
        }

        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            try {
                return (S) this.stateBackend.getPartitionedState(this.window, this.windowSerializer, stateDescriptor);
            } catch (Exception e) {
                throw new RuntimeException("Error getting state", e);
            }
        }

        public <S extends Serializable> ValueState<S> getKeyValueState(String str, Class<S> cls, S s) {
            return getPartitionedState(new ValueStateDescriptor(str, cls, s));
        }

        public <S extends Serializable> ValueState<S> getKeyValueState(String str, TypeInformation<S> typeInformation, S s) {
            return getPartitionedState(new ValueStateDescriptor(str, typeInformation, s));
        }
    }

    public TriggerTestHarness(Trigger<T, W> trigger, TypeSerializer<W> typeSerializer) throws Exception {
        this.trigger = trigger;
        this.windowSerializer = typeSerializer;
        DummyEnvironment dummyEnvironment = new DummyEnvironment("test", 1, 0);
        this.stateBackend = new MemoryStateBackend().createKeyedStateBackend(new KeyedStateBackendParametersImpl(dummyEnvironment, new JobID(), "test_op", IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()), TtlTimeProvider.DEFAULT, new UnregisteredMetricsGroup(), Collections.emptyList(), new CloseableRegistry()));
        this.stateBackend.setCurrentKey(KEY);
        this.internalTimerService = new TestInternalTimerService<>(new KeyContext() { // from class: org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness.1
            public void setCurrentKey(Object obj) {
            }

            public Object getCurrentKey() {
                return TriggerTestHarness.KEY;
            }
        });
    }

    public int numProcessingTimeTimers() {
        return this.internalTimerService.numProcessingTimeTimers();
    }

    public int numProcessingTimeTimers(W w) {
        return this.internalTimerService.numProcessingTimeTimers(w);
    }

    public int numEventTimeTimers() {
        return this.internalTimerService.numEventTimeTimers();
    }

    public int numEventTimeTimers(W w) {
        return this.internalTimerService.numEventTimeTimers(w);
    }

    public int numStateEntries() {
        return this.stateBackend.numKeyValueStateEntries();
    }

    public int numStateEntries(W w) {
        return this.stateBackend.numKeyValueStateEntries(w);
    }

    public TriggerResult processElement(StreamRecord<T> streamRecord, W w) throws Exception {
        return this.trigger.onElement(streamRecord.getValue(), streamRecord.getTimestamp(), w, new TestTriggerContext(KEY, w, this.internalTimerService, this.stateBackend, this.windowSerializer));
    }

    public TriggerResult advanceProcessingTime(long j, W w) throws Exception {
        Collection<Tuple2<W, TriggerResult>> advanceProcessingTime = advanceProcessingTime(j);
        if (advanceProcessingTime.size() != 1) {
            throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + advanceProcessingTime);
        }
        Tuple2<W, TriggerResult> next = advanceProcessingTime.iterator().next();
        if (((Window) next.f0).equals(w)) {
            return (TriggerResult) next.f1;
        }
        throw new IllegalStateException("Trigger fired for another window.");
    }

    public TriggerResult advanceWatermark(long j, W w) throws Exception {
        Collection<Tuple2<W, TriggerResult>> advanceWatermark = advanceWatermark(j);
        if (advanceWatermark.size() != 1) {
            throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + advanceWatermark);
        }
        Tuple2<W, TriggerResult> next = advanceWatermark.iterator().next();
        if (((Window) next.f0).equals(w)) {
            return (TriggerResult) next.f1;
        }
        throw new IllegalStateException("Trigger fired for another window.");
    }

    public Collection<Tuple2<W, TriggerResult>> advanceProcessingTime(long j) throws Exception {
        Collection<TestInternalTimerService.Timer<Integer, W>> advanceProcessingTime = this.internalTimerService.advanceProcessingTime(j);
        ArrayList arrayList = new ArrayList();
        for (TestInternalTimerService.Timer<Integer, W> timer : advanceProcessingTime) {
            arrayList.add(new Tuple2(timer.getNamespace(), this.trigger.onProcessingTime(timer.getTimestamp(), timer.getNamespace(), new TestTriggerContext(KEY, timer.getNamespace(), this.internalTimerService, this.stateBackend, this.windowSerializer))));
        }
        return arrayList;
    }

    public Collection<Tuple2<W, TriggerResult>> advanceWatermark(long j) throws Exception {
        Collection<TestInternalTimerService.Timer<Integer, W>> advanceWatermark = this.internalTimerService.advanceWatermark(j);
        ArrayList arrayList = new ArrayList();
        for (TestInternalTimerService.Timer<Integer, W> timer : advanceWatermark) {
            arrayList.add(new Tuple2(timer.getNamespace(), invokeOnEventTime(timer)));
        }
        return arrayList;
    }

    private TriggerResult invokeOnEventTime(TestInternalTimerService.Timer<Integer, W> timer) throws Exception {
        return this.trigger.onEventTime(timer.getTimestamp(), timer.getNamespace(), new TestTriggerContext(KEY, timer.getNamespace(), this.internalTimerService, this.stateBackend, this.windowSerializer));
    }

    public TriggerResult invokeOnEventTime(long j, W w) throws Exception {
        return invokeOnEventTime(new TestInternalTimerService.Timer<>(j, KEY, w));
    }

    public void mergeWindows(W w, Collection<W> collection) throws Exception {
        this.trigger.onMerge(w, new TestOnMergeContext(KEY, w, collection, this.internalTimerService, this.stateBackend, this.windowSerializer));
        Iterator<W> it = collection.iterator();
        while (it.hasNext()) {
            clearTriggerState(it.next());
        }
    }

    public void clearTriggerState(W w) throws Exception {
        this.trigger.clear(w, new TestTriggerContext(KEY, w, this.internalTimerService, this.stateBackend, this.windowSerializer));
    }
}
