/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequest;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.TimerSerializer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class InternalTimerServiceAsyncImplTest {
    private AsyncExecutionController asyncExecutionController;
    private TestKeyContext keyContext;
    private TestProcessingTimeService processingTimeService;
    private InternalTimerServiceAsyncImpl<Integer, String> service;

    InternalTimerServiceAsyncImplTest() {
    }

    @BeforeEach
    void setup() throws Exception {
        this.asyncExecutionController = new AsyncExecutionController((MailboxExecutor)new SyncMailboxExecutor(), (StateExecutor)new TestStateExecutor(), 128, 2, 1000L, 10);
        int totalKeyGroups = 128;
        KeyGroupRange testKeyGroupList = new KeyGroupRange(0, totalKeyGroups - 1);
        this.keyContext = new TestKeyContext();
        this.processingTimeService = new TestProcessingTimeService();
        this.processingTimeService.setCurrentTime(0L);
        HeapPriorityQueueSetFactory factory = new HeapPriorityQueueSetFactory(testKeyGroupList, totalKeyGroups, 128);
        this.service = InternalTimerServiceAsyncImplTest.createInternalTimerService(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), testKeyGroupList, this.keyContext, (ProcessingTimeService)this.processingTimeService, IntSerializer.INSTANCE, StringSerializer.INSTANCE, (PriorityQueueSetFactory)factory, this.asyncExecutionController);
        TestTriggerable.processingTriggerCount = 0;
        TestTriggerable.eventTriggerCount = 0;
    }

    @Test
    void testTimerWithSameKey() throws Exception {
        this.keyContext.setCurrentKey("key-1");
        this.service.registerProcessingTimeTimer((Object)"processing-timer-1", 1L);
        this.service.registerProcessingTimeTimer((Object)"processing-timer-2", 1L);
        TestTriggerable testTriggerable = new TestTriggerable();
        this.service.startTimerService((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, (Triggerable)testTriggerable);
        Assertions.assertThat((int)TestTriggerable.processingTriggerCount).isEqualTo(0);
        this.processingTimeService.advance(1L);
        Assertions.assertThat((int)TestTriggerable.processingTriggerCount).isEqualTo(2);
    }

    @Test
    void testProcessingTimerFireOrder() throws Exception {
        this.keyContext.setCurrentKey("key-1");
        this.service.registerProcessingTimeTimer((Object)"processing-timer-1", 1L);
        TestTriggerable testTriggerable = new TestTriggerable();
        this.service.startTimerService((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, (Triggerable)testTriggerable);
        Assertions.assertThat((int)TestTriggerable.processingTriggerCount).isEqualTo(0);
        this.processingTimeService.advance(1L);
        Assertions.assertThat((int)TestTriggerable.processingTriggerCount).isEqualTo(1);
        this.keyContext.setCurrentKey("key-2");
        this.service.registerProcessingTimeTimer((Object)"processing-timer-2", 2L);
        Assertions.assertThat((int)TestTriggerable.processingTriggerCount).isEqualTo(1);
        RecordContext recordContext = this.asyncExecutionController.buildContext((Object)"record2", (Object)"key-2");
        this.asyncExecutionController.setCurrentContext(recordContext);
        this.asyncExecutionController.handleRequest(null, StateRequestType.VALUE_GET, null);
        this.processingTimeService.advance(1L);
        Assertions.assertThat((int)TestTriggerable.processingTriggerCount).isEqualTo(1);
        recordContext.release();
        this.processingTimeService.advance(1L);
        Assertions.assertThat((int)TestTriggerable.processingTriggerCount).isEqualTo(2);
    }

    @Test
    void testEventTimerFireOrder() throws Exception {
        this.keyContext.setCurrentKey("key-1");
        this.service.registerEventTimeTimer((Object)"event-timer-1", 1L);
        TestTriggerable testTriggerable = new TestTriggerable();
        this.service.startTimerService((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, (Triggerable)testTriggerable);
        Assertions.assertThat((int)TestTriggerable.eventTriggerCount).isEqualTo(0);
        this.service.advanceWatermark(1L);
        Assertions.assertThat((int)TestTriggerable.eventTriggerCount).isEqualTo(1);
        this.keyContext.setCurrentKey("key-2");
        this.service.registerEventTimeTimer((Object)"event-timer-2", 2L);
        Assertions.assertThat((int)TestTriggerable.eventTriggerCount).isEqualTo(1);
        RecordContext recordContext = this.asyncExecutionController.buildContext((Object)"record2", (Object)"key-2");
        this.asyncExecutionController.setCurrentContext(recordContext);
        this.asyncExecutionController.handleRequest(null, StateRequestType.VALUE_GET, null);
        this.service.advanceWatermark(2L);
        Assertions.assertThat((int)TestTriggerable.eventTriggerCount).isEqualTo(1);
        recordContext.release();
        this.service.advanceWatermark(3L);
        Assertions.assertThat((int)TestTriggerable.eventTriggerCount).isEqualTo(2);
    }

    private static <K, N> InternalTimerServiceAsyncImpl<K, N> createInternalTimerService(TaskIOMetricGroup taskIOMetricGroup, KeyGroupRange keyGroupsList, KeyContext keyContext, ProcessingTimeService processingTimeService, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, PriorityQueueSetFactory priorityQueueSetFactory, AsyncExecutionController asyncExecutionController) {
        TimerSerializer timerSerializer = new TimerSerializer(keySerializer, namespaceSerializer);
        return new InternalTimerServiceAsyncImpl(taskIOMetricGroup, keyGroupsList, keyContext, processingTimeService, priorityQueueSetFactory.create("__async_processing_timers", (TypeSerializer)timerSerializer), priorityQueueSetFactory.create("__async_event_timers", (TypeSerializer)timerSerializer), StreamTaskCancellationContext.alwaysRunning(), asyncExecutionController);
    }

    private static class TestStateExecutor
    implements StateExecutor {
        public CompletableFuture<Boolean> executeBatchRequests(Iterable<StateRequest<?, ?, ?>> processingRequests) {
            return CompletableFuture.completedFuture(true);
        }
    }

    private static class TestKeyContext
    implements KeyContext {
        private Object key;

        private TestKeyContext() {
        }

        public void setCurrentKey(Object key) {
            this.key = key;
        }

        public Object getCurrentKey() {
            return this.key;
        }
    }

    private static class TestTriggerable
    implements Triggerable<Integer, String> {
        private static int eventTriggerCount = 0;
        private static int processingTriggerCount = 0;

        private TestTriggerable() {
        }

        public void onEventTime(InternalTimer<Integer, String> timer) throws Exception {
            ++eventTriggerCount;
        }

        public void onProcessingTime(InternalTimer<Integer, String> timer) throws Exception {
            ++processingTriggerCount;
        }
    }
}

