/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.v2;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.RunnableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.v2.StateDescriptor;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.asyncprocessing.AsyncFutureImpl;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutor;
import org.apache.flink.runtime.asyncprocessing.EpochManager;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.asyncprocessing.StateExecutionController;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.testutils.statemigration.TestType;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.IOUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;

public abstract class StateBackendTestV2Base<B extends AbstractStateBackend> {
    protected MockEnvironment env;
    protected JobID jobID;
    private CheckpointStreamFactory checkpointStreamFactory;

    @BeforeEach
    void before() throws Exception {
        this.jobID = new JobID();
        this.env = this.buildMockEnv();
    }

    protected MockEnvironment buildMockEnv() throws Exception {
        MockEnvironment mockEnvironment = MockEnvironment.builder().setTaskStateManager(this.getTestTaskStateManager()).setJobID(this.jobID).build();
        mockEnvironment.setCheckpointStorageAccess(this.getCheckpointStorageAccess());
        return mockEnvironment;
    }

    protected TestTaskStateManager getTestTaskStateManager() throws IOException {
        return TestTaskStateManager.builder().build();
    }

    @AfterEach
    void after() {
        IOUtils.closeQuietly((AutoCloseable)this.env);
    }

    protected abstract ConfigurableStateBackend getStateBackend() throws Exception;

    protected abstract void restoreJob() throws Exception;

    protected CheckpointStorage getCheckpointStorage() throws Exception {
        ConfigurableStateBackend stateBackend = this.getStateBackend();
        if (stateBackend instanceof CheckpointStorage) {
            return (CheckpointStorage)stateBackend;
        }
        throw new IllegalStateException("The state backend under test does not implement CheckpointStorage.Please override 'createCheckpointStorage' and provide an appropriatecheckpoint storage instance");
    }

    protected CheckpointStorageAccess getCheckpointStorageAccess() throws Exception {
        return this.getCheckpointStorage().createCheckpointStorage(this.jobID);
    }

    protected CheckpointStreamFactory createStreamFactory() throws Exception {
        if (this.checkpointStreamFactory == null) {
            this.checkpointStreamFactory = this.getCheckpointStorage().createCheckpointStorage(this.jobID).resolveCheckpointStorageLocation(1L, CheckpointStorageLocationReference.getDefault());
        }
        return this.checkpointStreamFactory;
    }

    protected <K> AsyncKeyedStateBackend<K> createAsyncKeyedBackend(int subtaskId, int parallelism, TypeSerializer<K> keySerializer, KeyGroupRange keyGroupRange, Environment env) throws Exception {
        env.setCheckpointStorageAccess(this.getCheckpointStorageAccess());
        return this.getStateBackend().createAsyncKeyedStateBackend((StateBackend.KeyedStateBackendParameters)new KeyedStateBackendParametersImpl(env, this.jobID, String.format("test_op_%d_%d", subtaskId, parallelism), keySerializer, keyGroupRange.getNumberOfKeyGroups(), keyGroupRange, env.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, this.getMetricGroup(), this.getCustomInitializationMetrics(), Collections.emptyList(), new CloseableRegistry(), 1.0));
    }

    protected StateBackend.CustomInitializationMetrics getCustomInitializationMetrics() {
        return (name, value) -> {};
    }

    protected <K> AsyncKeyedStateBackend<K> restoreAsyncKeyedBackend(int subtaskId, int parallelism, TypeSerializer<K> keySerializer, KeyGroupRange keyGroupRange, List<KeyedStateHandle> state, Environment env) throws Exception {
        return this.getStateBackend().createAsyncKeyedStateBackend((StateBackend.KeyedStateBackendParameters)new KeyedStateBackendParametersImpl(env, this.jobID, String.format("test_op_%d_%d", subtaskId, parallelism), keySerializer, keyGroupRange.getNumberOfKeyGroups(), keyGroupRange, env.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, this.getMetricGroup(), this.getCustomInitializationMetrics(), state, new CloseableRegistry(), 1.0));
    }

    protected MetricGroup getMetricGroup() {
        return new UnregisteredMetricsGroup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testAsyncKeyedStateBackendSnapshot() throws Exception {
        KeyedStateHandle stateHandle;
        ValueState valueState;
        ValueStateDescriptor stateDescriptor;
        StateExecutionController aec;
        int mockRecordCount = 20;
        int jobMaxParallelism = 128;
        int aecBatchSize = 1;
        long aecBufferTimeout = 1L;
        int aecMaxInFlightRecords = 1000;
        ArrayList<RecordContext> recordContexts = new ArrayList<RecordContext>(mockRecordCount);
        AsyncKeyedStateBackend backend = null;
        TestAsyncFrameworkExceptionHandler testExceptionHandler = new TestAsyncFrameworkExceptionHandler();
        try {
            int i;
            int i2;
            backend = this.createAsyncKeyedBackend(0, 1, (TypeSerializer)IntSerializer.INSTANCE, new KeyGroupRange(0, jobMaxParallelism - 1), this.env);
            aec = new StateExecutionController((MailboxExecutor)new SyncMailboxExecutor(), (AsyncFutureImpl.AsyncFrameworkExceptionHandler)testExceptionHandler, (AsyncExecutor)backend.createStateExecutor(), new DeclarationManager(), EpochManager.ParallelMode.SERIAL_BETWEEN_EPOCH, jobMaxParallelism, aecBatchSize, aecBufferTimeout, aecMaxInFlightRecords, null, null);
            backend.setup((StateRequestHandler)aec);
            stateDescriptor = new ValueStateDescriptor("test", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
            valueState = (ValueState)backend.getOrCreateKeyedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescriptor);
            for (i2 = 0; i2 < mockRecordCount; ++i2) {
                recordContexts.add(aec.buildContext((Object)i2, (Object)i2));
                ((RecordContext)recordContexts.get(i2)).retain();
            }
            for (i2 = 0; i2 < mockRecordCount; ++i2) {
                aec.setCurrentContext((RecordContext)recordContexts.get(i2));
                valueState.update((Object)i2);
            }
            for (i2 = 0; i2 < mockRecordCount; ++i2) {
                aec.setCurrentContext((RecordContext)recordContexts.get(i2));
                Assertions.assertThat((Integer)((Integer)valueState.value())).isEqualTo(i2);
                ((RecordContext)recordContexts.get(i2)).release();
            }
            aec.drainInflightRecords(0);
            RunnableFuture snapshot = backend.snapshot(1L, System.currentTimeMillis(), this.createStreamFactory(), CheckpointOptions.forCheckpointWithDefaultLocation());
            if (!snapshot.isDone()) {
                snapshot.run();
            }
            SnapshotResult snapshotResult = (SnapshotResult)snapshot.get();
            stateHandle = (KeyedStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
            recordContexts.clear();
            for (i = 0; i < mockRecordCount; ++i) {
                recordContexts.add(aec.buildContext((Object)i, (Object)i));
                ((RecordContext)recordContexts.get(i)).retain();
            }
            for (i = 0; i < mockRecordCount; ++i) {
                aec.setCurrentContext((RecordContext)recordContexts.get(i));
                valueState.update((Object)(i + 1));
            }
            for (i = 0; i < mockRecordCount; ++i) {
                aec.setCurrentContext((RecordContext)recordContexts.get(i));
                Assertions.assertThat((Integer)((Integer)valueState.value())).isEqualTo(i + 1);
                ((RecordContext)recordContexts.get(i)).release();
            }
        }
        finally {
            if (null != backend) {
                IOUtils.closeQuietly(backend);
                backend.dispose();
            }
        }
        Assertions.assertThat((Object)stateHandle).isNotNull();
        backend = null;
        this.restoreJob();
        try {
            int i;
            backend = this.restoreAsyncKeyedBackend(0, 1, (TypeSerializer)IntSerializer.INSTANCE, new KeyGroupRange(0, jobMaxParallelism - 1), Collections.singletonList(stateHandle), this.env);
            aec = new StateExecutionController((MailboxExecutor)new SyncMailboxExecutor(), (AsyncFutureImpl.AsyncFrameworkExceptionHandler)testExceptionHandler, (AsyncExecutor)backend.createStateExecutor(), new DeclarationManager(), EpochManager.ParallelMode.SERIAL_BETWEEN_EPOCH, jobMaxParallelism, aecBatchSize, aecBufferTimeout, aecMaxInFlightRecords, null, null);
            backend.setup((StateRequestHandler)aec);
            stateDescriptor = new ValueStateDescriptor("test", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
            valueState = (ValueState)backend.getOrCreateKeyedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescriptor);
            recordContexts.clear();
            for (i = 0; i < mockRecordCount; ++i) {
                recordContexts.add(aec.buildContext((Object)i, (Object)i));
                ((RecordContext)recordContexts.get(i)).retain();
            }
            for (i = 0; i < mockRecordCount; ++i) {
                aec.setCurrentContext((RecordContext)recordContexts.get(i));
                Assertions.assertThat((Integer)((Integer)valueState.value())).isEqualTo(i);
                ((RecordContext)recordContexts.get(i)).release();
            }
        }
        finally {
            if (null != backend) {
                IOUtils.closeQuietly(backend);
                backend.dispose();
            }
        }
        Assertions.assertThat((Throwable)testExceptionHandler.exception).isNull();
    }

    @TestTemplate
    void testAsyncStateBackendScaleUp() throws Exception {
        this.testKeyGroupSnapshotRestore(2, 5, 10);
    }

    @TestTemplate
    void testAsyncStateBackendScaleDown() throws Exception {
        this.testKeyGroupSnapshotRestore(4, 3, 10);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testKeyGroupSnapshotRestore(int sourceParallelism, int targetParallelism, int maxParallelism) throws Exception {
        int i;
        StateExecutionController aec;
        int aecBatchSize = 1;
        long aecBufferTimeout = 1L;
        int aecMaxInFlightRecords = 1000;
        Random random = new Random();
        ArrayList<ValueStateDescriptor> stateDescriptors = new ArrayList<ValueStateDescriptor>(maxParallelism);
        ArrayList<Integer> keyInKeyGroups = new ArrayList<Integer>(maxParallelism);
        ArrayList<Integer> expectedValue = new ArrayList<Integer>(maxParallelism);
        for (int i2 = 0; i2 < maxParallelism; ++i2) {
            stateDescriptors.add(new ValueStateDescriptor("state" + i2, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO));
        }
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        TestAsyncFrameworkExceptionHandler testExceptionHandler = new TestAsyncFrameworkExceptionHandler();
        ArrayList<KeyedStateHandle> snapshots = new ArrayList<KeyedStateHandle>(sourceParallelism);
        for (int i3 = 0; i3 < sourceParallelism; ++i3) {
            KeyGroupRange range = KeyGroupRange.of((int)(maxParallelism * i3 / sourceParallelism), (int)(maxParallelism * (i3 + 1) / sourceParallelism - 1));
            AsyncKeyedStateBackend backend = this.createAsyncKeyedBackend(i3, sourceParallelism, (TypeSerializer)IntSerializer.INSTANCE, range, this.env);
            aec = new StateExecutionController((MailboxExecutor)new SyncMailboxExecutor(), (AsyncFutureImpl.AsyncFrameworkExceptionHandler)testExceptionHandler, (AsyncExecutor)backend.createStateExecutor(), new DeclarationManager(), EpochManager.ParallelMode.SERIAL_BETWEEN_EPOCH, maxParallelism, aecBatchSize, aecBufferTimeout, aecMaxInFlightRecords, null, null);
            backend.setup((StateRequestHandler)aec);
            try {
                for (int j = range.getStartKeyGroup(); j <= range.getEndKeyGroup(); ++j) {
                    ValueState state = (ValueState)backend.getOrCreateKeyedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescriptors.get(j));
                    int keyInKeyGroup = this.getKeyInKeyGroup(random, maxParallelism, KeyGroupRange.of((int)j, (int)j));
                    RecordContext recordContext = aec.buildContext((Object)keyInKeyGroup, (Object)keyInKeyGroup);
                    recordContext.retain();
                    aec.setCurrentContext(recordContext);
                    keyInKeyGroups.add(keyInKeyGroup);
                    state.update((Object)keyInKeyGroup);
                    expectedValue.add(keyInKeyGroup);
                    recordContext.release();
                }
                snapshots.add(StateBackendTestV2Base.runSnapshot(backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry));
                continue;
            }
            finally {
                IOUtils.closeQuietly(backend);
                backend.dispose();
            }
        }
        ArrayList<KeyGroupRange> keyGroupRangesRestore = new ArrayList<KeyGroupRange>();
        for (int i4 = 0; i4 < targetParallelism; ++i4) {
            keyGroupRangesRestore.add(KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex((int)maxParallelism, (int)targetParallelism, (int)i4));
        }
        ArrayList keyGroupStatesAfterDistribute = new ArrayList(targetParallelism);
        for (i = 0; i < targetParallelism; ++i) {
            ArrayList keyedStateHandles = new ArrayList();
            StateAssignmentOperation.extractIntersectingState(snapshots, (KeyGroupRange)((KeyGroupRange)keyGroupRangesRestore.get(i)), keyedStateHandles);
            keyGroupStatesAfterDistribute.add(keyedStateHandles);
        }
        for (i = 0; i < targetParallelism; ++i) {
            AsyncKeyedStateBackend backend = this.restoreAsyncKeyedBackend(i, targetParallelism, (TypeSerializer)IntSerializer.INSTANCE, (KeyGroupRange)keyGroupRangesRestore.get(i), (List)keyGroupStatesAfterDistribute.get(i), this.env);
            aec = new StateExecutionController((MailboxExecutor)new SyncMailboxExecutor(), (AsyncFutureImpl.AsyncFrameworkExceptionHandler)testExceptionHandler, (AsyncExecutor)backend.createStateExecutor(), new DeclarationManager(), EpochManager.ParallelMode.SERIAL_BETWEEN_EPOCH, maxParallelism, aecBatchSize, aecBufferTimeout, aecMaxInFlightRecords, null, null);
            backend.setup((StateRequestHandler)aec);
            try {
                KeyGroupRange range = (KeyGroupRange)keyGroupRangesRestore.get(i);
                for (int j = range.getStartKeyGroup(); j <= range.getEndKeyGroup(); ++j) {
                    ValueState state = (ValueState)backend.getOrCreateKeyedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescriptors.get(j));
                    RecordContext recordContext = aec.buildContext(keyInKeyGroups.get(j), (Object)((Integer)keyInKeyGroups.get(j)));
                    recordContext.retain();
                    aec.setCurrentContext(recordContext);
                    Assertions.assertThat((Integer)((Integer)state.value())).isEqualTo(expectedValue.get(j));
                    recordContext.release();
                }
                continue;
            }
            finally {
                IOUtils.closeQuietly(backend);
                backend.dispose();
            }
        }
    }

    @TestTemplate
    void testKeyGroupedInternalPriorityQueue() throws Exception {
        this.testKeyGroupedInternalPriorityQueue(false);
    }

    @TestTemplate
    void testKeyGroupedInternalPriorityQueueAddAll() throws Exception {
        this.testKeyGroupedInternalPriorityQueue(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void testKeyGroupedInternalPriorityQueue(boolean addAll) throws Exception {
        String fieldName = "key-grouped-priority-queue";
        AsyncKeyedStateBackend backend = this.createAsyncKeyedBackend(0, 1, (TypeSerializer)IntSerializer.INSTANCE, new KeyGroupRange(0, 127), this.env);
        try {
            KeyGroupedInternalPriorityQueue priorityQueue = backend.create(fieldName, (TypeSerializer)new TestType.V1TestTypeSerializer());
            TestType elementA42 = new TestType("a", 42);
            TestType elementA44 = new TestType("a", 44);
            TestType elementB1 = new TestType("b", 1);
            TestType elementB3 = new TestType("b", 3);
            TestType[] elements = new TestType[]{elementA44, elementB1, elementB1, elementB3, elementA42};
            if (addAll) {
                priorityQueue.addAll(Arrays.asList(elements));
            } else {
                Assertions.assertThat((boolean)priorityQueue.add((Object)elements[0])).isTrue();
                Assertions.assertThat((boolean)priorityQueue.add((Object)elements[1])).isTrue();
                Assertions.assertThat((boolean)priorityQueue.add((Object)elements[2])).isFalse();
                Assertions.assertThat((boolean)priorityQueue.add((Object)elements[3])).isFalse();
                Assertions.assertThat((boolean)priorityQueue.add((Object)elements[4])).isFalse();
            }
            Assertions.assertThat((boolean)priorityQueue.isEmpty()).isFalse();
            Assertions.assertThat((Collection)priorityQueue.getSubsetForKeyGroup(81)).containsExactlyInAnyOrder((Object[])new TestType[]{elementA42, elementA44});
            Assertions.assertThat((Collection)priorityQueue.getSubsetForKeyGroup(22)).containsExactlyInAnyOrder((Object[])new TestType[]{elementB1, elementB3});
            Assertions.assertThat((Object)((Object)((TestType)((Object)priorityQueue.peek())))).isEqualTo((Object)elementB1);
            Assertions.assertThat((Object)((Object)((TestType)((Object)priorityQueue.poll())))).isEqualTo((Object)elementB1);
            Assertions.assertThat((Object)((Object)((TestType)((Object)priorityQueue.peek())))).isEqualTo((Object)elementB3);
            ArrayList actualList = new ArrayList();
            try (CloseableIterator iterator = priorityQueue.iterator();){
                iterator.forEachRemaining(actualList::add);
            }
            Assertions.assertThat(actualList).containsExactlyInAnyOrder((Object[])new TestType[]{elementB3, elementA42, elementA44});
            Assertions.assertThat((int)priorityQueue.size()).isEqualTo(3);
            Assertions.assertThat((boolean)priorityQueue.remove((Object)elementB1)).isFalse();
            Assertions.assertThat((boolean)priorityQueue.remove((Object)elementB3)).isTrue();
            Assertions.assertThat((Object)((Object)((TestType)((Object)priorityQueue.peek())))).isEqualTo((Object)elementA42);
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testValueStateWorkWithTtl() throws Exception {
        TestAsyncFrameworkExceptionHandler testExceptionHandler = new TestAsyncFrameworkExceptionHandler();
        AsyncKeyedStateBackend backend = this.createAsyncKeyedBackend(0, 1, (TypeSerializer)LongSerializer.INSTANCE, new KeyGroupRange(0, 127), this.env);
        StateExecutionController aec = new StateExecutionController((MailboxExecutor)new SyncMailboxExecutor(), (AsyncFutureImpl.AsyncFrameworkExceptionHandler)testExceptionHandler, (AsyncExecutor)backend.createStateExecutor(), new DeclarationManager(), EpochManager.ParallelMode.SERIAL_BETWEEN_EPOCH, 128, 1, -1L, 1, null, null);
        backend.setup((StateRequestHandler)aec);
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", TypeInformation.of(Long.class));
            kvId.enableTimeToLive(StateTtlConfig.newBuilder((Duration)Duration.ofSeconds(1L)).build());
            ValueState state = (ValueState)backend.getOrCreateKeyedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            RecordContext recordContext = aec.buildContext((Object)"record-1", (Object)1L);
            recordContext.retain();
            aec.setCurrentContext(recordContext);
            state.update((Object)1L);
            Assertions.assertThat((Long)((Long)state.value())).isEqualTo(1L);
            Thread.sleep(1000L);
            Assertions.assertThat((Long)((Long)state.value())).isNull();
            recordContext.release();
            RecordContext recordContext1 = aec.buildContext((Object)"record-2", (Object)2L);
            aec.setCurrentContext(recordContext1);
            state.asyncUpdate((Object)2L).thenAccept(val -> state.asyncValue().thenAccept(val1 -> {
                Assertions.assertThat((Long)val1).isEqualTo(2L);
                Thread.sleep(1000L);
                state.asyncValue().thenAccept(val2 -> Assertions.assertThat((Long)val2).isNull());
            }));
            Thread.sleep(3000L);
            recordContext1.release();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    private int getKeyInKeyGroup(Random random, int maxParallelism, KeyGroupRange keyGroupRange) {
        int keyInKG = random.nextInt();
        int kg = KeyGroupRangeAssignment.assignToKeyGroup((Object)keyInKG, (int)maxParallelism);
        while (!keyGroupRange.contains(kg)) {
            keyInKG = random.nextInt();
            kg = KeyGroupRangeAssignment.assignToKeyGroup((Object)keyInKG, (int)maxParallelism);
        }
        return keyInKG;
    }

    private static KeyedStateHandle runSnapshot(RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture, SharedStateRegistry sharedStateRegistry) throws Exception {
        SnapshotResult snapshotResult;
        KeyedStateHandle jobManagerOwnedSnapshot;
        if (!snapshotRunnableFuture.isDone()) {
            snapshotRunnableFuture.run();
        }
        if ((jobManagerOwnedSnapshot = (KeyedStateHandle)(snapshotResult = (SnapshotResult)snapshotRunnableFuture.get()).getJobManagerOwnedSnapshot()) != null) {
            jobManagerOwnedSnapshot.registerSharedStates(sharedStateRegistry, 0L);
        }
        return jobManagerOwnedSnapshot;
    }

    static class TestAsyncFrameworkExceptionHandler
    implements AsyncFutureImpl.AsyncFrameworkExceptionHandler {
        String message = null;
        Throwable exception = null;

        TestAsyncFrameworkExceptionHandler() {
        }

        public void handleException(String message, Throwable exception) {
            this.message = message;
            this.exception = exception;
        }
    }
}

