/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.operators.StreamTaskTimerITCase;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.junit.Assert;
import org.junit.Test;

public class TestProcessingTimeServiceTest {
    @Test
    public void testCustomTimeServiceProvider() throws Throwable {
        TestProcessingTimeService tp = new TestProcessingTimeService();
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(env -> new OneInputStreamTask(env, (TimerService)tp), BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamMap mapOperator = new StreamMap(new StreamTaskTimerITCase.DummyMapFunction());
        streamConfig.setStreamOperator((StreamOperator)mapOperator);
        streamConfig.setOperatorID(new OperatorID());
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        ProcessingTimeService processingTimeService = ((StreamMap)testHarness.getHeadOperator()).getProcessingTimeService();
        Assert.assertEquals((long)Long.MIN_VALUE, (long)processingTimeService.getCurrentProcessingTime());
        tp.setCurrentTime(11L);
        Assert.assertEquals((long)processingTimeService.getCurrentProcessingTime(), (long)11L);
        tp.setCurrentTime(15L);
        tp.setCurrentTime(16L);
        Assert.assertEquals((long)processingTimeService.getCurrentProcessingTime(), (long)16L);
        processingTimeService.registerTimer(30L, timestamp -> {});
        processingTimeService.registerTimer(40L, timestamp -> {});
        Assert.assertEquals((long)2L, (long)tp.getNumActiveTimers());
        tp.setCurrentTime(35L);
        Assert.assertEquals((long)1L, (long)tp.getNumActiveTimers());
        tp.setCurrentTime(40L);
        Assert.assertEquals((long)0L, (long)tp.getNumActiveTimers());
        tp.shutdownService();
    }
}

