/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.util.ArrayList;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.legacy.YieldingOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class StreamTaskOperatorTimerTest {
    private static final String TRIGGER_PREFIX = "trigger:";
    private static final String RESULT_PREFIX = "timer:";

    StreamTaskOperatorTimerTest() {
    }

    @Test
    void testOperatorYieldExecutesSelectedTimers() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOperatorChain(new OperatorID(), (StreamOperatorFactory<?>)new TestOperatorFactory()).chain(new OperatorID(), (OneInputStreamOperatorFactory)new TestOperatorFactory(), (TypeSerializer)StringSerializer.INSTANCE).finish();
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        String trigger = "trigger:42";
        testHarness.processElement(new StreamRecord((Object)"trigger:42"));
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        ArrayList events = new ArrayList();
        testHarness.getOutput().forEach(element -> events.add((String)((StreamRecord)element).getValue()));
        Assertions.assertThat(events).containsExactly((Object[])new String[]{"trigger:42", "timer:1:0", "timer:0:0"});
    }

    private static class TestOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        private final transient MailboxExecutor mailboxExecutor;
        private final int chainIndex;
        private transient int count;

        TestOperator(StreamOperatorParameters<String> parameters, int chainIndex, MailboxExecutor mailboxExecutor) {
            super(parameters);
            this.chainIndex = chainIndex;
            this.mailboxExecutor = mailboxExecutor;
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            if (!TestOperator.isTriggerEvent(element)) {
                this.output.collect(element);
                return;
            }
            int index = this.count++;
            ProcessingTimeService processingTimeService = this.getProcessingTimeService();
            processingTimeService.registerTimer(processingTimeService.getCurrentProcessingTime() + 1000L, timestamp -> {
                this.output.collect((Object)new StreamRecord((Object)(StreamTaskOperatorTimerTest.RESULT_PREFIX + this.chainIndex + ":" + index)));
                --this.count;
            });
            this.output.collect(element);
            while (this.count > 0) {
                this.mailboxExecutor.yield();
            }
        }

        private static boolean isTriggerEvent(StreamRecord<String> element) {
            if (element.isRecord()) {
                return ((String)element.getValue()).startsWith(StreamTaskOperatorTimerTest.TRIGGER_PREFIX);
            }
            return false;
        }
    }

    private static class TestOperatorFactory
    extends AbstractStreamOperatorFactory<String>
    implements OneInputStreamOperatorFactory<String, String>,
    YieldingOperatorFactory<String> {
        private MailboxExecutor mailboxExecutor;

        private TestOperatorFactory() {
        }

        public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
            this.mailboxExecutor = mailboxExecutor;
        }

        public <Operator extends StreamOperator<String>> Operator createStreamOperator(StreamOperatorParameters<String> parameters) {
            TestOperator operator = new TestOperator(parameters, parameters.getStreamConfig().getChainIndex(), this.mailboxExecutor);
            return (Operator)((Object)operator);
        }

        public void setChainingStrategy(ChainingStrategy strategy) {
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return TestOperator.class;
        }
    }
}

