/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.asyncprocessing;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperator;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class AbstractAsyncStateStreamOperatorTest {
    protected KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness(int maxParalelism, int numSubtasks, int subtaskIndex, ElementOrder elementOrder) throws Exception {
        TestOperator testOperator = new TestOperator(elementOrder);
        return new KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>((OneInputStreamOperator<Tuple2<Integer, String>, String>)testOperator, (KeySelector<Tuple2<Integer, String>, Integer>)new TestKeySelector(), (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO, maxParalelism, numSubtasks, subtaskIndex);
    }

    @Test
    public void testCreateAsyncExecutionController() throws Exception {
        try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);){
            testHarness.open();
            Assertions.assertThat(testHarness.getOperator()).isInstanceOf(AbstractAsyncStateStreamOperator.class);
            Assertions.assertThat((Object)((AbstractAsyncStateStreamOperator)testHarness.getOperator()).getAsyncExecutionController()).isNotNull();
        }
    }

    @Test
    public void testRecordProcessorWithFirstStateOrder() throws Exception {
        try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.FIRST_STATE_ORDER);){
            testHarness.open();
            TestOperator testOperator = (TestOperator)testHarness.getOperator();
            ThrowingConsumer processor = RecordProcessorUtils.getRecordProcessor((Input)testOperator);
            ExecutorService anotherThread = Executors.newSingleThreadExecutor();
            anotherThread.execute(() -> {
                try {
                    processor.accept((Object)new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
                }
                catch (Exception exception) {
                    // empty catch block
                }
            });
            Thread.sleep(1000L);
            Assertions.assertThat((int)testOperator.getProcessed()).isEqualTo(1);
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(1);
            testOperator.proceed();
            anotherThread.shutdown();
            Thread.sleep(1000L);
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
        }
    }

    @Test
    public void testRecordProcessorWithRecordOrder() throws Exception {
        try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);){
            testHarness.open();
            TestOperator testOperator = (TestOperator)testHarness.getOperator();
            ThrowingConsumer processor = RecordProcessorUtils.getRecordProcessor((Input)testOperator);
            ExecutorService anotherThread = Executors.newSingleThreadExecutor();
            anotherThread.execute(() -> {
                try {
                    processor.accept((Object)new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
                }
                catch (Exception exception) {
                    // empty catch block
                }
            });
            Thread.sleep(1000L);
            Assertions.assertThat((int)testOperator.getProcessed()).isEqualTo(1);
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isGreaterThan(1);
            testOperator.proceed();
            anotherThread.shutdown();
            Thread.sleep(1000L);
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
        }
    }

    static class TestKeySelector
    implements KeySelector<Tuple2<Integer, String>, Integer> {
        private static final long serialVersionUID = 1L;

        TestKeySelector() {
        }

        public Integer getKey(Tuple2<Integer, String> value) {
            return (Integer)value.f0;
        }
    }

    private static class TestOperator
    extends AbstractAsyncStateStreamOperator<String>
    implements OneInputStreamOperator<Tuple2<Integer, String>, String>,
    Triggerable<Integer, VoidNamespace> {
        private static final long serialVersionUID = 1L;
        private final ElementOrder elementOrder;
        final AtomicInteger processed = new AtomicInteger(0);
        final Object objectToWait = new Object();

        TestOperator(ElementOrder elementOrder) {
            this.elementOrder = elementOrder;
        }

        public void open() throws Exception {
            super.open();
        }

        public ElementOrder getElementOrder() {
            return this.elementOrder;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
            this.processed.incrementAndGet();
            Object object = this.objectToWait;
            synchronized (object) {
                this.objectToWait.wait();
            }
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
        }

        public int getProcessed() {
            return this.processed.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void proceed() {
            Object object = this.objectToWait;
            synchronized (object) {
                this.objectToWait.notify();
            }
        }
    }
}

