/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing;

import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequest;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.v2.InternalValueState;
import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class AsyncExecutionControllerTest {
    AsyncExecutionController aec;
    TestUnderlyingState underlyingState;
    AtomicInteger output;
    TestValueState valueState;
    final Runnable userCode = () -> this.valueState.asyncValue().thenCompose(val -> {
        int updated = val == null ? 1 : val + 1;
        return this.valueState.asyncUpdate(updated).thenCompose(o -> StateFutureUtils.completedFuture((Object)updated));
    }).thenAccept(val -> this.output.set((int)val));

    AsyncExecutionControllerTest() {
    }

    @BeforeEach
    void setup() {
        this.aec = new AsyncExecutionController((MailboxExecutor)new SyncMailboxExecutor(), this.createStateExecutor());
        this.underlyingState = new TestUnderlyingState();
        this.valueState = new TestValueState((AsyncExecutionController<String>)this.aec, this.underlyingState);
        this.output = new AtomicInteger();
    }

    @Test
    void testBasicRun() {
        String record1 = "key1-r1";
        String key1 = "key1";
        RecordContext recordContext1 = this.aec.buildContext((Object)record1, (Object)key1);
        this.aec.setCurrentContext(recordContext1);
        this.userCode.run();
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.output.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)recordContext1.getReferenceCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(0);
        String record2 = "key1-r2";
        String key2 = "key1";
        RecordContext recordContext2 = this.aec.buildContext((Object)record2, (Object)key2);
        this.aec.setCurrentContext(recordContext2);
        this.userCode.run();
        String record3 = "key1-r3";
        String key3 = "key1";
        RecordContext recordContext3 = this.aec.buildContext((Object)record3, (Object)key3);
        this.aec.setCurrentContext(recordContext3);
        this.userCode.run();
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(2);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(2);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.output.get()).isEqualTo(2);
        AssertionsForClassTypes.assertThat((int)recordContext2.getReferenceCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.output.get()).isEqualTo(3);
        AssertionsForClassTypes.assertThat((int)recordContext3.getReferenceCount()).isEqualTo(0);
        String record4 = "key3-r3";
        String key4 = "key3";
        RecordContext recordContext4 = this.aec.buildContext((Object)record4, (Object)key4);
        this.aec.setCurrentContext(recordContext4);
        this.userCode.run();
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.output.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)recordContext4.getReferenceCount()).isEqualTo(0);
    }

    @Test
    void testRecordsRunInOrder() {
        String record1 = "key1-r1";
        String key1 = "key1";
        RecordContext recordContext1 = this.aec.buildContext((Object)record1, (Object)key1);
        this.aec.setCurrentContext(recordContext1);
        this.userCode.run();
        String record2 = "key2-r1";
        String key2 = "key2";
        RecordContext recordContext2 = this.aec.buildContext((Object)record2, (Object)key2);
        this.aec.setCurrentContext(recordContext2);
        this.userCode.run();
        String record3 = "key1-r2";
        String key3 = "key1";
        RecordContext recordContext3 = this.aec.buildContext((Object)record3, (Object)key3);
        this.aec.setCurrentContext(recordContext3);
        this.userCode.run();
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(2);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(2);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(3);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(2);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(2);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(3);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.output.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)recordContext1.getReferenceCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)recordContext2.getReferenceCount()).isEqualTo(0);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)this.output.get()).isEqualTo(2);
        AssertionsForClassTypes.assertThat((int)recordContext3.getReferenceCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(0);
    }

    @Test
    void testInFlightRecordControl() {
        RecordContext recordContext;
        String key;
        int i;
        int batchSize = 5;
        int maxInFlight = 10;
        this.aec = new AsyncExecutionController((MailboxExecutor)new SyncMailboxExecutor(), (StateExecutor)new TestStateExecutor(), 5, 10);
        this.valueState = new TestValueState((AsyncExecutionController<String>)this.aec, this.underlyingState);
        AtomicInteger output = new AtomicInteger();
        Runnable userCode = () -> this.valueState.asyncValue().thenCompose(val -> {
            int updated = val == null ? 1 : val + 1;
            return this.valueState.asyncUpdate(updated).thenCompose(o -> StateFutureUtils.completedFuture((Object)updated));
        }).thenAccept(val -> output.set((int)val));
        for (int round = 0; round < 10; ++round) {
            for (int i2 = 0; i2 < 5; ++i2) {
                String record = String.format("key%d-r%d", round * 5 + i2, round * 5 + i2);
                String key2 = String.format("key%d", round * 5 + i2);
                RecordContext recordContext2 = this.aec.buildContext((Object)record, (Object)key2);
                this.aec.setCurrentContext(recordContext2);
                userCode.run();
            }
            AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(0);
            AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
            AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        }
        for (i = 0; i < 10; ++i) {
            String record = String.format("sameKey-r%d", i, i);
            key = "sameKey";
            recordContext = this.aec.buildContext((Object)record, (Object)key);
            this.aec.setCurrentContext(recordContext);
            userCode.run();
        }
        AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(10);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(9);
        for (i = 10; i < 100; ++i) {
            String record = String.format("sameKey-r%d", i, i);
            key = "sameKey";
            recordContext = this.aec.buildContext((Object)record, (Object)key);
            this.aec.setCurrentContext(recordContext);
            userCode.run();
            AssertionsForClassTypes.assertThat((int)this.aec.inFlightRecordNum.get()).isEqualTo(11);
            AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
            AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(10);
        }
    }

    @Test
    public void testSyncPoint() {
        AtomicInteger counter = new AtomicInteger(0);
        RecordContext recordContext = this.aec.buildContext((Object)"record", (Object)"key");
        this.aec.setCurrentContext(recordContext);
        recordContext.retain();
        this.aec.syncPointRequestWithCallback(counter::incrementAndGet);
        AssertionsForClassTypes.assertThat((int)counter.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)recordContext.getReferenceCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        recordContext.release();
        AssertionsForClassTypes.assertThat((int)this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
        counter.set(0);
        RecordContext recordContext1 = this.aec.buildContext((Object)"record1", (Object)"occupied");
        this.aec.setCurrentContext(recordContext1);
        this.userCode.run();
        RecordContext recordContext2 = this.aec.buildContext((Object)"record2", (Object)"occupied");
        this.aec.setCurrentContext(recordContext2);
        this.aec.syncPointRequestWithCallback(counter::incrementAndGet);
        recordContext2.retain();
        AssertionsForClassTypes.assertThat((int)counter.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)recordContext2.getReferenceCount()).isGreaterThan(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)counter.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)recordContext2.getReferenceCount()).isGreaterThan(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat((int)counter.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)recordContext2.getReferenceCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat((int)this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        recordContext2.release();
    }

    private StateExecutor createStateExecutor() {
        TestAsyncStateBackend testAsyncStateBackend = new TestAsyncStateBackend();
        AssertionsForClassTypes.assertThat((boolean)testAsyncStateBackend.supportsAsyncKeyedStateBackend()).isTrue();
        return testAsyncStateBackend.createAsyncKeyedStateBackend(null).createStateExecutor();
    }

    static class TestStateExecutor
    implements StateExecutor {
        public CompletableFuture<Boolean> executeBatchRequests(Iterable<StateRequest<?, ?, ?>> processingRequests) {
            CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
            for (StateRequest<?, ?, ?> request : processingRequests) {
                TestValueState state;
                if (request.getRequestType() == StateRequestType.VALUE_GET) {
                    Preconditions.checkState((request.getState() != null ? 1 : 0) != 0);
                    state = (TestValueState)request.getState();
                    Integer val = state.underlyingState.get((String)request.getRecordContext().getKey());
                    request.getFuture().complete((Object)val);
                    continue;
                }
                if (request.getRequestType() == StateRequestType.VALUE_UPDATE) {
                    Preconditions.checkState((request.getState() != null ? 1 : 0) != 0);
                    state = (TestValueState)request.getState();
                    state.underlyingState.update((String)request.getRecordContext().getKey(), (Integer)request.getPayload());
                    request.getFuture().complete(null);
                    continue;
                }
                throw new UnsupportedOperationException("Unsupported request type");
            }
            future.complete(true);
            return future;
        }
    }

    static class TestAsyncStateBackend
    implements StateBackend {
        TestAsyncStateBackend() {
        }

        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters) throws Exception {
            throw new UnsupportedOperationException("Don't support createKeyedStateBackend yet");
        }

        public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters parameters) throws Exception {
            throw new UnsupportedOperationException("Don't support createOperatorStateBackend yet");
        }

        public boolean supportsAsyncKeyedStateBackend() {
            return true;
        }

        public <K> AsyncKeyedStateBackend createAsyncKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters) {
            return new AsyncKeyedStateBackend(){

                public StateExecutor createStateExecutor() {
                    return new TestStateExecutor();
                }

                public void dispose() {
                }
            };
        }
    }

    static class TestValueState
    extends InternalValueState<String, Integer> {
        private final TestUnderlyingState underlyingState;

        public TestValueState(AsyncExecutionController<String> aec, TestUnderlyingState underlyingState) {
            super(aec, new ValueStateDescriptor("test-value-state", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO));
            this.underlyingState = underlyingState;
            AssertionsForClassTypes.assertThat((Object)this.getValueSerializer()).isEqualTo((Object)IntSerializer.INSTANCE);
        }
    }

    static class TestUnderlyingState {
        private final HashMap<String, Integer> hashMap = new HashMap();

        public Integer get(String key) {
            return this.hashMap.get(key);
        }

        public void update(String key, Integer val) {
            this.hashMap.put(key, val);
        }
    }
}

