/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.RunnableFuture;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

class StateSnapshotCompressionTest {
    StateSnapshotCompressionTest() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testCompressionConfiguration() throws BackendBuildingException {
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setUseSnapshotCompression(true);
        HeapKeyedStateBackend<String> stateBackend = this.getStringHeapKeyedStateBackend(executionConfig);
        try {
            Assertions.assertThat((boolean)SnappyStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator())).isTrue();
        }
        finally {
            IOUtils.closeQuietly(stateBackend);
            stateBackend.dispose();
        }
        executionConfig = new ExecutionConfig();
        executionConfig.setUseSnapshotCompression(false);
        stateBackend = this.getStringHeapKeyedStateBackend(executionConfig);
        try {
            Assertions.assertThat((boolean)UncompressedStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator())).isTrue();
        }
        finally {
            IOUtils.closeQuietly(stateBackend);
            stateBackend.dispose();
        }
    }

    @Test
    void snapshotRestoreRoundtripWithCompression() throws Exception {
        this.snapshotRestoreRoundtrip(true);
    }

    @Test
    void snapshotRestoreRoundtripUncompressed() throws Exception {
        this.snapshotRestoreRoundtrip(false);
    }

    private HeapKeyedStateBackend<String> getStringHeapKeyedStateBackend(ExecutionConfig executionConfig) throws BackendBuildingException {
        return this.getStringHeapKeyedStateBackend(executionConfig, Collections.emptyList());
    }

    private HeapKeyedStateBackend<String> getStringHeapKeyedStateBackend(ExecutionConfig executionConfig, Collection<KeyedStateHandle> stateHandles) throws BackendBuildingException {
        return new HeapKeyedStateBackendBuilder((TaskKvStateRegistry)Mockito.mock(TaskKvStateRegistry.class), (TypeSerializer)StringSerializer.INSTANCE, StateSnapshotCompressionTest.class.getClassLoader(), 16, new KeyGroupRange(0, 15), executionConfig, TtlTimeProvider.DEFAULT, LatencyTrackingStateConfig.disabled(), stateHandles, AbstractStateBackend.getCompressionDecorator((ExecutionConfig)executionConfig), TestLocalRecoveryConfig.disabled(), (HeapPriorityQueueSetFactory)Mockito.mock(HeapPriorityQueueSetFactory.class), true, new CloseableRegistry()).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void snapshotRestoreRoundtrip(boolean useCompression) throws Exception {
        KeyedStateHandle stateHandle;
        InternalValueState state;
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setUseSnapshotCompression(useCompression);
        ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("test", String.class);
        stateDescriptor.initializeSerializerUnlessSet(executionConfig);
        HeapKeyedStateBackend<String> stateBackend = this.getStringHeapKeyedStateBackend(executionConfig);
        try {
            state = (InternalValueState)stateBackend.createOrUpdateInternalState((TypeSerializer)new VoidNamespaceSerializer(), (StateDescriptor)stateDescriptor);
            stateBackend.setCurrentKey((Object)"A");
            state.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            state.update((Object)"42");
            stateBackend.setCurrentKey((Object)"B");
            state.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            state.update((Object)"43");
            stateBackend.setCurrentKey((Object)"C");
            state.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            state.update((Object)"44");
            stateBackend.setCurrentKey((Object)"D");
            state.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            state.update((Object)"45");
            MemCheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(0x400000);
            RunnableFuture snapshot = stateBackend.snapshot(0L, 0L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            snapshot.run();
            SnapshotResult snapshotResult = (SnapshotResult)snapshot.get();
            stateHandle = (KeyedStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
        }
        finally {
            IOUtils.closeQuietly(stateBackend);
            stateBackend.dispose();
        }
        executionConfig = new ExecutionConfig();
        stateBackend = this.getStringHeapKeyedStateBackend(executionConfig, (Collection<KeyedStateHandle>)StateObjectCollection.singleton((StateObject)stateHandle));
        try {
            state = (InternalValueState)stateBackend.createOrUpdateInternalState((TypeSerializer)new VoidNamespaceSerializer(), (StateDescriptor)stateDescriptor);
            stateBackend.setCurrentKey((Object)"A");
            state.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            Assertions.assertThat((String)((String)state.value())).isEqualTo("42");
            stateBackend.setCurrentKey((Object)"B");
            state.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            Assertions.assertThat((String)((String)state.value())).isEqualTo("43");
            stateBackend.setCurrentKey((Object)"C");
            state.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            Assertions.assertThat((String)((String)state.value())).isEqualTo("44");
            stateBackend.setCurrentKey((Object)"D");
            state.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            Assertions.assertThat((String)((String)state.value())).isEqualTo("45");
        }
        finally {
            IOUtils.closeQuietly(stateBackend);
            stateBackend.dispose();
        }
    }
}

