package org.apache.flink.runtime.state;

import com.esotericsoftware.kryo.serializers.JavaSerializer;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
import org.apache.flink.runtime.util.TestingUserCodeClassLoader;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/state/OperatorStateBackendTest.class */
class OperatorStateBackendTest {

    @RegisterExtension
    private static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = new TestExecutorExtension<>(Executors::newCachedThreadPool);
    private final ClassLoader classLoader = getClass().getClassLoader();
    private final Collection<OperatorStateHandle> emptyStateHandles = Collections.emptyList();

    /* loaded from: input_file:org/apache/flink/runtime/state/OperatorStateBackendTest$MutableType.class */
    static final class MutableType implements Serializable {
        private static final long serialVersionUID = 1;
        private int value;

        public MutableType() {
            this(0);
        }

        public MutableType(int i) {
            this.value = i;
        }

        public int getValue() {
            return this.value;
        }

        public void setValue(int i) {
            this.value = i;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.value == ((MutableType) obj).value;
        }

        public int hashCode() {
            return this.value;
        }

        static MutableType of(int i) {
            return new MutableType(i);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/OperatorStateBackendTest$VerifyingIntSerializer.class */
    public static final class VerifyingIntSerializer extends TypeSerializer<Integer> {
        private static final long serialVersionUID = -5344563614550163898L;
        private transient ClassLoader classLoader;
        private transient AtomicInteger atomicInteger;

        private VerifyingIntSerializer(ClassLoader classLoader, AtomicInteger atomicInteger) {
            this.classLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
            this.atomicInteger = (AtomicInteger) Preconditions.checkNotNull(atomicInteger);
        }

        public boolean isImmutableType() {
            return false;
        }

        public TypeSerializer<Integer> duplicate() {
            return this;
        }

        /* renamed from: createInstance, reason: merged with bridge method [inline-methods] */
        public Integer m593createInstance() {
            return 0;
        }

        public Integer copy(Integer num) {
            Assertions.assertThat(this.classLoader).isEqualTo(Thread.currentThread().getContextClassLoader());
            this.atomicInteger.incrementAndGet();
            return IntSerializer.INSTANCE.copy(num);
        }

        public Integer copy(Integer num, Integer num2) {
            Assertions.assertThat(this.classLoader).isEqualTo(Thread.currentThread().getContextClassLoader());
            this.atomicInteger.incrementAndGet();
            return IntSerializer.INSTANCE.copy(num, num2);
        }

        public int getLength() {
            return IntSerializer.INSTANCE.getLength();
        }

        public void serialize(Integer num, DataOutputView dataOutputView) throws IOException {
            IntSerializer.INSTANCE.serialize(num, dataOutputView);
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Integer m592deserialize(DataInputView dataInputView) throws IOException {
            return IntSerializer.INSTANCE.deserialize(dataInputView);
        }

        public Integer deserialize(Integer num, DataInputView dataInputView) throws IOException {
            return IntSerializer.INSTANCE.deserialize(num, dataInputView);
        }

        public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
            Assertions.assertThat(this.classLoader).isEqualTo(Thread.currentThread().getContextClassLoader());
            this.atomicInteger.incrementAndGet();
            IntSerializer.INSTANCE.copy(dataInputView, dataOutputView);
        }

        public boolean equals(Object obj) {
            return obj instanceof VerifyingIntSerializer;
        }

        public int hashCode() {
            return getClass().hashCode();
        }

        public TypeSerializerSnapshot<Integer> snapshotConfiguration() {
            return new VerifyingIntSerializerSnapshot();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/OperatorStateBackendTest$VerifyingIntSerializerSnapshot.class */
    public static class VerifyingIntSerializerSnapshot extends SimpleTypeSerializerSnapshot<Integer> {
        public VerifyingIntSerializerSnapshot() {
            super(() -> {
                return new VerifyingIntSerializer(Thread.currentThread().getContextClassLoader(), new AtomicInteger());
            });
        }
    }

    OperatorStateBackendTest() {
    }

    @Test
    void testCreateOnAbstractStateBackend() throws Exception {
        OperatorStateBackend createOperatorStateBackend = new MemoryStateBackend().createOperatorStateBackend(new OperatorStateBackendParametersImpl(createMockEnvironment(), "test-operator", this.emptyStateHandles, new CloseableRegistry()));
        Assertions.assertThat(createOperatorStateBackend).isNotNull();
        Assertions.assertThat(createOperatorStateBackend.getRegisteredStateNames()).isEmpty();
        Assertions.assertThat(createOperatorStateBackend.getRegisteredBroadcastStateNames()).isEmpty();
    }

    @Test
    void testRegisterStatesWithoutTypeSerializer() throws Exception {
        Assertions.assertThat(new KryoSerializer(File.class, new SerializerConfigImpl()).getKryo().getDefaultSerializer(FutureTask.class) instanceof JavaSerializer).isFalse();
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.getSerializerConfig().registerTypeWithKryoSerializer(FutureTask.class, JavaSerializer.class);
        DefaultOperatorStateBackend build = new DefaultOperatorStateBackendBuilder(this.classLoader, executionConfig, false, this.emptyStateHandles, new CloseableRegistry()).build();
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("test", File.class);
        ListStateDescriptor listStateDescriptor2 = new ListStateDescriptor("test2", String.class);
        PartitionableListState listState = build.getListState(listStateDescriptor);
        Assertions.assertThat(listState).isNotNull();
        ListState listState2 = build.getListState(listStateDescriptor2);
        Assertions.assertThat(listState2).isNotNull();
        Assertions.assertThat(build.getRegisteredStateNames()).hasSize(2);
        KryoSerializer partitionStateSerializer = listState.getStateMetaInfo().getPartitionStateSerializer();
        Assertions.assertThat(partitionStateSerializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat(partitionStateSerializer.getKryo().getSerializer(FutureTask.class)).isInstanceOf(JavaSerializer.class);
        Assertions.assertThat(((Iterable) listState2.get()).iterator()).isExhausted();
        listState2.add("kevin");
        listState2.add("sunny");
        Iterator it = ((Iterable) listState2.get()).iterator();
        Assertions.assertThat((String) it.next()).isEqualTo("kevin");
        Assertions.assertThat((String) it.next()).isEqualTo("sunny");
        Assertions.assertThat(it).isExhausted();
    }

    @Test
    void testRegisterStates() throws Exception {
        DefaultOperatorStateBackend build = new DefaultOperatorStateBackendBuilder(this.classLoader, new ExecutionConfig(), false, this.emptyStateHandles, new CloseableRegistry()).build();
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("test1", new JavaSerializer());
        ListStateDescriptor listStateDescriptor2 = new ListStateDescriptor("test2", new JavaSerializer());
        ListStateDescriptor listStateDescriptor3 = new ListStateDescriptor("test3", new JavaSerializer());
        ListState listState = build.getListState(listStateDescriptor);
        Assertions.assertThat(listState).isNotNull();
        Assertions.assertThat(build.getRegisteredStateNames()).hasSize(1);
        Assertions.assertThat(((Iterable) listState.get()).iterator()).isExhausted();
        listState.add(42);
        listState.add(4711);
        Iterator it = ((Iterable) listState.get()).iterator();
        Assertions.assertThat(it.next()).isEqualTo(42);
        Assertions.assertThat(it.next()).isEqualTo(4711);
        Assertions.assertThat(it).isExhausted();
        ListState listState2 = build.getListState(listStateDescriptor2);
        Assertions.assertThat(listState2).isNotNull();
        Assertions.assertThat(build.getRegisteredStateNames()).hasSize(2);
        Assertions.assertThat(it).isExhausted();
        listState2.add(7);
        listState2.add(13);
        listState2.add(23);
        Iterator it2 = ((Iterable) listState2.get()).iterator();
        Assertions.assertThat(it2.next()).isEqualTo(7);
        Assertions.assertThat(it2.next()).isEqualTo(13);
        Assertions.assertThat(it2.next()).isEqualTo(23);
        Assertions.assertThat(it2).isExhausted();
        ListState unionListState = build.getUnionListState(listStateDescriptor3);
        Assertions.assertThat(unionListState).isNotNull();
        Assertions.assertThat(build.getRegisteredStateNames()).hasSize(3);
        Assertions.assertThat(it2).isExhausted();
        unionListState.add(17);
        unionListState.add(3);
        unionListState.add(123);
        Iterator it3 = ((Iterable) unionListState.get()).iterator();
        Assertions.assertThat(it3.next()).isEqualTo(17);
        Assertions.assertThat(it3.next()).isEqualTo(3);
        Assertions.assertThat(it3.next()).isEqualTo(123);
        Assertions.assertThat(it3).isExhausted();
        ListState listState3 = build.getListState(listStateDescriptor);
        Assertions.assertThat(listState3).isNotNull();
        listState3.add(123);
        Iterator it4 = ((Iterable) listState3.get()).iterator();
        Assertions.assertThat(it4.next()).isEqualTo(42);
        Assertions.assertThat(it4.next()).isEqualTo(4711);
        Assertions.assertThat(it4.next()).isEqualTo(123);
        Assertions.assertThat(it4).isExhausted();
        Iterator it5 = ((Iterable) listState.get()).iterator();
        Assertions.assertThat(it5.next()).isEqualTo(42);
        Assertions.assertThat(it5.next()).isEqualTo(4711);
        Assertions.assertThat(it5.next()).isEqualTo(123);
        Assertions.assertThat(it5).isExhausted();
        Iterator it6 = ((Iterable) listState3.get()).iterator();
        Assertions.assertThat(it6.next()).isEqualTo(42);
        Assertions.assertThat(it6.next()).isEqualTo(4711);
        Assertions.assertThat(it6.next()).isEqualTo(123);
        Assertions.assertThat(it6).isExhausted();
        Assertions.assertThatThrownBy(() -> {
            build.getUnionListState(listStateDescriptor2);
        }).isInstanceOf(IllegalStateException.class);
        Assertions.assertThatThrownBy(() -> {
            build.getListState(listStateDescriptor3);
        }).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testCorrectClassLoaderUsedOnSnapshot() throws Exception {
        MemoryStateBackend memoryStateBackend = new MemoryStateBackend(4096);
        Environment createMockEnvironment = createMockEnvironment();
        OperatorStateBackend createOperatorStateBackend = memoryStateBackend.createOperatorStateBackend(new OperatorStateBackendParametersImpl(createMockEnvironment, "test-op-name", this.emptyStateHandles, new CloseableRegistry()));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        createOperatorStateBackend.getListState(new ListStateDescriptor("test", new VerifyingIntSerializer(createMockEnvironment.getUserCodeClassLoader().asClassLoader(), atomicInteger))).add(42);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        AtomicInteger atomicInteger3 = new AtomicInteger(0);
        BroadcastState broadcastState = createOperatorStateBackend.getBroadcastState(new MapStateDescriptor("test-broadcast", new VerifyingIntSerializer(createMockEnvironment.getUserCodeClassLoader().asClassLoader(), atomicInteger2), new VerifyingIntSerializer(createMockEnvironment.getUserCodeClassLoader().asClassLoader(), atomicInteger3)));
        broadcastState.put(1, 2);
        broadcastState.put(3, 4);
        broadcastState.put(5, 6);
        FutureUtils.runIfNotDoneAndGet(createOperatorStateBackend.snapshot(1L, 1L, new MemCheckpointStreamFactory(4096), CheckpointOptions.forCheckpointWithDefaultLocation()));
        Assertions.assertThat(atomicInteger.get()).isGreaterThan(0);
        Assertions.assertThat(atomicInteger2.get()).isGreaterThan(0);
        Assertions.assertThat(atomicInteger3.get()).isGreaterThan(0);
    }

    @Test
    void testSnapshotEmpty() throws Exception {
        Assertions.assertThat(((SnapshotResult) FutureUtils.runIfNotDoneAndGet(new MemoryStateBackend(4096).createOperatorStateBackend(new OperatorStateBackendParametersImpl(createMockEnvironment(), "testOperator", this.emptyStateHandles, new CloseableRegistry())).snapshot(0L, 0L, new MemCheckpointStreamFactory(4096), CheckpointOptions.forCheckpointWithDefaultLocation()))).getJobManagerOwnedSnapshot()).isNull();
    }

    @Test
    void testSnapshotBroadcastStateWithEmptyOperatorState() throws Exception {
        MemoryStateBackend memoryStateBackend = new MemoryStateBackend(4096);
        OperatorStateBackend createOperatorStateBackend = memoryStateBackend.createOperatorStateBackend(new OperatorStateBackendParametersImpl(createMockEnvironment(), "testOperator", this.emptyStateHandles, new CloseableRegistry()));
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test-broadcast", BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        HashMap hashMap = new HashMap(3);
        hashMap.put(1, 2);
        hashMap.put(3, 4);
        hashMap.put(5, 6);
        createOperatorStateBackend.getBroadcastState(mapStateDescriptor).putAll(hashMap);
        MemCheckpointStreamFactory memCheckpointStreamFactory = new MemCheckpointStreamFactory(4096);
        StateObject stateObject = null;
        try {
            OperatorStateHandle jobManagerOwnedSnapshot = ((SnapshotResult) FutureUtils.runIfNotDoneAndGet(createOperatorStateBackend.snapshot(0L, 0L, memCheckpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()))).getJobManagerOwnedSnapshot();
            Assertions.assertThat(jobManagerOwnedSnapshot).isNotNull();
            HashMap hashMap2 = new HashMap();
            OperatorStateBackend recreateOperatorStateBackend = recreateOperatorStateBackend(createOperatorStateBackend, memoryStateBackend, StateObjectCollection.singleton(jobManagerOwnedSnapshot));
            BroadcastState broadcastState = recreateOperatorStateBackend.getBroadcastState(mapStateDescriptor);
            for (Map.Entry entry : broadcastState.entries()) {
                hashMap2.put(entry.getKey(), entry.getValue());
            }
            Assertions.assertThat(hashMap2).isEqualTo(hashMap);
            broadcastState.remove(1);
            hashMap.remove(1);
            SnapshotResult snapshotResult = (SnapshotResult) FutureUtils.runIfNotDoneAndGet(recreateOperatorStateBackend.snapshot(1L, 1L, memCheckpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            jobManagerOwnedSnapshot.discardState();
            OperatorStateHandle jobManagerOwnedSnapshot2 = snapshotResult.getJobManagerOwnedSnapshot();
            hashMap2.clear();
            OperatorStateBackend recreateOperatorStateBackend2 = recreateOperatorStateBackend(recreateOperatorStateBackend, memoryStateBackend, StateObjectCollection.singleton(jobManagerOwnedSnapshot2));
            BroadcastState broadcastState2 = recreateOperatorStateBackend2.getBroadcastState(mapStateDescriptor);
            for (Map.Entry entry2 : broadcastState2.immutableEntries()) {
                hashMap2.put(entry2.getKey(), entry2.getValue());
            }
            Assertions.assertThat(hashMap2).isEqualTo(hashMap);
            broadcastState2.clear();
            hashMap.clear();
            SnapshotResult snapshotResult2 = (SnapshotResult) FutureUtils.runIfNotDoneAndGet(recreateOperatorStateBackend2.snapshot(2L, 2L, memCheckpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            if (jobManagerOwnedSnapshot2 != null) {
                jobManagerOwnedSnapshot2.discardState();
            }
            stateObject = (OperatorStateHandle) snapshotResult2.getJobManagerOwnedSnapshot();
            hashMap2.clear();
            createOperatorStateBackend = recreateOperatorStateBackend(recreateOperatorStateBackend2, memoryStateBackend, StateObjectCollection.singleton(stateObject));
            for (Map.Entry entry3 : createOperatorStateBackend.getBroadcastState(mapStateDescriptor).immutableEntries()) {
                hashMap2.put(entry3.getKey(), entry3.getValue());
            }
            Assertions.assertThat(hashMap).isEmpty();
            Assertions.assertThat(hashMap2).isEqualTo(hashMap);
            if (stateObject != null) {
                stateObject.discardState();
                stateObject = null;
            }
            createOperatorStateBackend.close();
            createOperatorStateBackend.dispose();
            if (stateObject != null) {
                stateObject.discardState();
            }
        } catch (Throwable th) {
            createOperatorStateBackend.close();
            createOperatorStateBackend.dispose();
            if (stateObject != null) {
                stateObject.discardState();
            }
            throw th;
        }
    }

    @Test
    void testSnapshotRestoreSync() throws Exception {
        MemoryStateBackend memoryStateBackend = new MemoryStateBackend(8192);
        OperatorStateBackend createOperatorStateBackend = memoryStateBackend.createOperatorStateBackend(new OperatorStateBackendParametersImpl(createMockEnvironment(), "test-op-name", this.emptyStateHandles, new CloseableRegistry()));
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("test1", new JavaSerializer());
        ListStateDescriptor listStateDescriptor2 = new ListStateDescriptor("test2", new JavaSerializer());
        ListStateDescriptor listStateDescriptor3 = new ListStateDescriptor("test3", new JavaSerializer());
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test4", new JavaSerializer(), new JavaSerializer());
        MapStateDescriptor mapStateDescriptor2 = new MapStateDescriptor("test5", new JavaSerializer(), new JavaSerializer());
        MapStateDescriptor mapStateDescriptor3 = new MapStateDescriptor("test6", new JavaSerializer(), new JavaSerializer());
        ListState listState = createOperatorStateBackend.getListState(listStateDescriptor);
        ListState listState2 = createOperatorStateBackend.getListState(listStateDescriptor2);
        ListState unionListState = createOperatorStateBackend.getUnionListState(listStateDescriptor3);
        BroadcastState broadcastState = createOperatorStateBackend.getBroadcastState(mapStateDescriptor);
        BroadcastState broadcastState2 = createOperatorStateBackend.getBroadcastState(mapStateDescriptor2);
        createOperatorStateBackend.getBroadcastState(mapStateDescriptor3);
        listState.add(42);
        listState.add(4711);
        listState2.add(7);
        listState2.add(13);
        listState2.add(23);
        unionListState.add(17);
        unionListState.add(18);
        unionListState.add(19);
        unionListState.add(20);
        broadcastState.put(1, 2);
        broadcastState.put(2, 5);
        broadcastState2.put(2, 5);
        OperatorStateHandle jobManagerOwnedSnapshot = ((SnapshotResult) FutureUtils.runIfNotDoneAndGet(createOperatorStateBackend.snapshot(1L, 1L, new MemCheckpointStreamFactory(8192), CheckpointOptions.forCheckpointWithDefaultLocation()))).getJobManagerOwnedSnapshot();
        try {
            createOperatorStateBackend.close();
            createOperatorStateBackend.dispose();
            OperatorStateBackend createOperatorStateBackend2 = memoryStateBackend.createOperatorStateBackend(new OperatorStateBackendParametersImpl(createMockEnvironment(), "testOperator", StateObjectCollection.singleton(jobManagerOwnedSnapshot), new CloseableRegistry()));
            Assertions.assertThat(createOperatorStateBackend2.getRegisteredStateNames()).hasSize(3);
            Assertions.assertThat(createOperatorStateBackend2.getRegisteredBroadcastStateNames()).hasSize(3);
            ListState listState3 = createOperatorStateBackend2.getListState(listStateDescriptor);
            ListState listState4 = createOperatorStateBackend2.getListState(listStateDescriptor2);
            ListState unionListState2 = createOperatorStateBackend2.getUnionListState(listStateDescriptor3);
            BroadcastState broadcastState3 = createOperatorStateBackend2.getBroadcastState(mapStateDescriptor);
            BroadcastState broadcastState4 = createOperatorStateBackend2.getBroadcastState(mapStateDescriptor2);
            BroadcastState broadcastState5 = createOperatorStateBackend2.getBroadcastState(mapStateDescriptor3);
            Assertions.assertThat(createOperatorStateBackend2.getRegisteredStateNames()).hasSize(3);
            Assertions.assertThat(createOperatorStateBackend2.getRegisteredBroadcastStateNames()).hasSize(3);
            Iterator it = ((Iterable) listState3.get()).iterator();
            Assertions.assertThat(it.next()).isEqualTo(42);
            Assertions.assertThat(it.next()).isEqualTo(4711);
            Assertions.assertThat(it).isExhausted();
            Iterator it2 = ((Iterable) listState4.get()).iterator();
            Assertions.assertThat(it2.next()).isEqualTo(7);
            Assertions.assertThat(it2.next()).isEqualTo(13);
            Assertions.assertThat(it2.next()).isEqualTo(23);
            Assertions.assertThat(it2).isExhausted();
            Iterator it3 = ((Iterable) unionListState2.get()).iterator();
            Assertions.assertThat(it3.next()).isEqualTo(17);
            Assertions.assertThat(it3.next()).isEqualTo(18);
            Assertions.assertThat(it3.next()).isEqualTo(19);
            Assertions.assertThat(it3.next()).isEqualTo(20);
            Assertions.assertThat(it3).isExhausted();
            Iterator it4 = broadcastState3.iterator();
            Assertions.assertThat(it4).hasNext();
            Map.Entry entry = (Map.Entry) it4.next();
            Assertions.assertThat(entry.getKey()).isEqualTo(1);
            Assertions.assertThat(entry.getValue()).isEqualTo(2);
            Assertions.assertThat(it4).hasNext();
            Map.Entry entry2 = (Map.Entry) it4.next();
            Assertions.assertThat(entry2.getKey()).isEqualTo(2);
            Assertions.assertThat(entry2.getValue()).isEqualTo(5);
            Assertions.assertThat(it4).isExhausted();
            Iterator it5 = broadcastState4.iterator();
            Assertions.assertThat(it5).hasNext();
            Map.Entry entry3 = (Map.Entry) it5.next();
            Assertions.assertThat(entry3.getKey()).isEqualTo(2);
            Assertions.assertThat(entry3.getValue()).isEqualTo(5);
            Assertions.assertThat(it5).isExhausted();
            Assertions.assertThat(broadcastState5.iterator()).isExhausted();
            createOperatorStateBackend2.close();
            createOperatorStateBackend2.dispose();
            jobManagerOwnedSnapshot.discardState();
        } catch (Throwable th) {
            jobManagerOwnedSnapshot.discardState();
            throw th;
        }
    }

    @Test
    void testSnapshotRestoreAsync() throws Exception {
        DefaultOperatorStateBackend build = new DefaultOperatorStateBackendBuilder(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true, this.emptyStateHandles, new CloseableRegistry()).build();
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("test1", new JavaSerializer());
        ListStateDescriptor listStateDescriptor2 = new ListStateDescriptor("test2", new JavaSerializer());
        ListStateDescriptor listStateDescriptor3 = new ListStateDescriptor("test3", new JavaSerializer());
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test4", new JavaSerializer(), new JavaSerializer());
        MapStateDescriptor mapStateDescriptor2 = new MapStateDescriptor("test5", new JavaSerializer(), new JavaSerializer());
        MapStateDescriptor mapStateDescriptor3 = new MapStateDescriptor("test6", new JavaSerializer(), new JavaSerializer());
        ListState listState = build.getListState(listStateDescriptor);
        ListState listState2 = build.getListState(listStateDescriptor2);
        ListState unionListState = build.getUnionListState(listStateDescriptor3);
        BroadcastState broadcastState = build.getBroadcastState(mapStateDescriptor);
        BroadcastState broadcastState2 = build.getBroadcastState(mapStateDescriptor2);
        build.getBroadcastState(mapStateDescriptor3);
        listState.add(MutableType.of(42));
        listState.add(MutableType.of(4711));
        listState2.add(MutableType.of(7));
        listState2.add(MutableType.of(13));
        listState2.add(MutableType.of(23));
        unionListState.add(MutableType.of(17));
        unionListState.add(MutableType.of(18));
        unionListState.add(MutableType.of(19));
        unionListState.add(MutableType.of(20));
        broadcastState.put(MutableType.of(1), MutableType.of(2));
        broadcastState.put(MutableType.of(2), MutableType.of(5));
        broadcastState2.put(MutableType.of(2), MutableType.of(5));
        BlockerCheckpointStreamFactory blockerCheckpointStreamFactory = new BlockerCheckpointStreamFactory(1048576);
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        blockerCheckpointStreamFactory.setWaiterLatch(oneShotLatch);
        blockerCheckpointStreamFactory.setBlockerLatch(oneShotLatch2);
        RunnableFuture snapshot = build.snapshot(1L, 1L, blockerCheckpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
        ExecutorService executor = EXECUTOR_EXTENSION.getExecutor();
        executor.submit(snapshot);
        oneShotLatch.await();
        listState.add(MutableType.of(77));
        broadcastState.put(MutableType.of(32), MutableType.of(97));
        int i = 0;
        for (MutableType mutableType : (Iterable) listState2.get()) {
            i++;
            if (i == 2) {
                oneShotLatch2.trigger();
            }
            mutableType.setValue(mutableType.getValue() + 10);
        }
        unionListState.clear();
        broadcastState2.clear();
        build.getListState(new ListStateDescriptor("test4", new JavaSerializer()));
        OperatorStateHandle jobManagerOwnedSnapshot = ((SnapshotResult) snapshot.get()).getJobManagerOwnedSnapshot();
        try {
            build.close();
            build.dispose();
            OperatorStateBackend createOperatorStateBackend = new MemoryStateBackend(4096).createOperatorStateBackend(new OperatorStateBackendParametersImpl(createMockEnvironment(), "testOperator", StateObjectCollection.singleton(jobManagerOwnedSnapshot), new CloseableRegistry()));
            Assertions.assertThat(createOperatorStateBackend.getRegisteredStateNames()).hasSize(3);
            Assertions.assertThat(createOperatorStateBackend.getRegisteredBroadcastStateNames()).hasSize(3);
            ListState listState3 = createOperatorStateBackend.getListState(listStateDescriptor);
            ListState listState4 = createOperatorStateBackend.getListState(listStateDescriptor2);
            ListState unionListState2 = createOperatorStateBackend.getUnionListState(listStateDescriptor3);
            BroadcastState broadcastState3 = createOperatorStateBackend.getBroadcastState(mapStateDescriptor);
            BroadcastState broadcastState4 = createOperatorStateBackend.getBroadcastState(mapStateDescriptor2);
            BroadcastState broadcastState5 = createOperatorStateBackend.getBroadcastState(mapStateDescriptor3);
            Assertions.assertThat(createOperatorStateBackend.getRegisteredStateNames()).hasSize(3);
            Assertions.assertThat(createOperatorStateBackend.getRegisteredBroadcastStateNames()).hasSize(3);
            Iterator it = ((Iterable) listState3.get()).iterator();
            Assertions.assertThat(((MutableType) it.next()).value).isEqualTo(42);
            Assertions.assertThat(((MutableType) it.next()).value).isEqualTo(4711);
            Assertions.assertThat(it).isExhausted();
            Iterator it2 = ((Iterable) listState4.get()).iterator();
            Assertions.assertThat(((MutableType) it2.next()).value).isEqualTo(7);
            Assertions.assertThat(((MutableType) it2.next()).value).isEqualTo(13);
            Assertions.assertThat(((MutableType) it2.next()).value).isEqualTo(23);
            Assertions.assertThat(it2).isExhausted();
            Iterator it3 = ((Iterable) unionListState2.get()).iterator();
            Assertions.assertThat(((MutableType) it3.next()).value).isEqualTo(17);
            Assertions.assertThat(((MutableType) it3.next()).value).isEqualTo(18);
            Assertions.assertThat(((MutableType) it3.next()).value).isEqualTo(19);
            Assertions.assertThat(((MutableType) it3.next()).value).isEqualTo(20);
            Assertions.assertThat(it3).isExhausted();
            Iterator it4 = broadcastState3.iterator();
            Assertions.assertThat(it4).hasNext();
            Map.Entry entry = (Map.Entry) it4.next();
            Assertions.assertThat(((MutableType) entry.getKey()).value).isOne();
            Assertions.assertThat(((MutableType) entry.getValue()).value).isEqualTo(2);
            Assertions.assertThat(it4).hasNext();
            Map.Entry entry2 = (Map.Entry) it4.next();
            Assertions.assertThat(((MutableType) entry2.getKey()).value).isEqualTo(2);
            Assertions.assertThat(((MutableType) entry2.getValue()).value).isEqualTo(5);
            Assertions.assertThat(it4).isExhausted();
            Iterator it5 = broadcastState4.iterator();
            Assertions.assertThat(it5).hasNext();
            Map.Entry entry3 = (Map.Entry) it5.next();
            Assertions.assertThat(((MutableType) entry3.getKey()).value).isEqualTo(2);
            Assertions.assertThat(((MutableType) entry3.getValue()).value).isEqualTo(5);
            Assertions.assertThat(it5).isExhausted();
            Assertions.assertThat(broadcastState5.iterator()).isExhausted();
            createOperatorStateBackend.close();
            createOperatorStateBackend.dispose();
            jobManagerOwnedSnapshot.discardState();
            executor.shutdown();
        } catch (Throwable th) {
            jobManagerOwnedSnapshot.discardState();
            throw th;
        }
    }

    @Test
    void testSnapshotAsyncClose() throws Exception {
        DefaultOperatorStateBackend build = new DefaultOperatorStateBackendBuilder(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true, this.emptyStateHandles, new CloseableRegistry()).build();
        ListState listState = build.getListState(new ListStateDescriptor("test1", new JavaSerializer()));
        listState.add(MutableType.of(42));
        listState.add(MutableType.of(4711));
        BroadcastState broadcastState = build.getBroadcastState(new MapStateDescriptor("test4", new JavaSerializer(), new JavaSerializer()));
        broadcastState.put(MutableType.of(1), MutableType.of(2));
        broadcastState.put(MutableType.of(2), MutableType.of(5));
        BlockerCheckpointStreamFactory blockerCheckpointStreamFactory = new BlockerCheckpointStreamFactory(1048576);
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        blockerCheckpointStreamFactory.setWaiterLatch(oneShotLatch);
        blockerCheckpointStreamFactory.setBlockerLatch(oneShotLatch2);
        RunnableFuture snapshot = build.snapshot(1L, 1L, blockerCheckpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
        EXECUTOR_EXTENSION.getExecutor().submit(snapshot);
        oneShotLatch.await();
        build.close();
        oneShotLatch2.trigger();
        Assertions.assertThatThrownBy(() -> {
        }).isInstanceOf(CancellationException.class);
    }

    @Test
    void testSnapshotAsyncCancel() throws Exception {
        DefaultOperatorStateBackend build = new DefaultOperatorStateBackendBuilder(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true, this.emptyStateHandles, new CloseableRegistry()).build();
        ListState listState = build.getListState(new ListStateDescriptor("test1", new JavaSerializer()));
        listState.add(MutableType.of(42));
        listState.add(MutableType.of(4711));
        BlockerCheckpointStreamFactory blockerCheckpointStreamFactory = new BlockerCheckpointStreamFactory(1048576);
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        blockerCheckpointStreamFactory.setWaiterLatch(oneShotLatch);
        blockerCheckpointStreamFactory.setBlockerLatch(oneShotLatch2);
        RunnableFuture snapshot = build.snapshot(1L, 1L, blockerCheckpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
        EXECUTOR_EXTENSION.getExecutor().submit(snapshot);
        oneShotLatch.await();
        snapshot.cancel(true);
        Iterator<BlockingCheckpointOutputStream> it = blockerCheckpointStreamFactory.getAllCreatedStreams().iterator();
        while (it.hasNext()) {
            Assertions.assertThat(it.next().isClosed()).isTrue();
        }
        oneShotLatch2.trigger();
        Assertions.assertThatThrownBy(() -> {
        }).isInstanceOf(CancellationException.class);
    }

    private static Environment createMockEnvironment() {
        Environment environment = (Environment) Mockito.mock(Environment.class);
        Mockito.when(environment.getExecutionConfig()).thenReturn(new ExecutionConfig());
        Mockito.when(environment.getUserCodeClassLoader()).thenReturn(TestingUserCodeClassLoader.newBuilder().build());
        return environment;
    }

    private static OperatorStateBackend recreateOperatorStateBackend(OperatorStateBackend operatorStateBackend, AbstractStateBackend abstractStateBackend, Collection<OperatorStateHandle> collection) throws Exception {
        operatorStateBackend.close();
        operatorStateBackend.dispose();
        return abstractStateBackend.createOperatorStateBackend(new OperatorStateBackendParametersImpl(createMockEnvironment(), "testOperator", collection, new CloseableRegistry()));
    }
}
