/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.ChangelogTestUtils;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
import org.apache.flink.runtime.state.SharedStateRegistryKey;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestStreamStateHandle;
import org.apache.flink.runtime.state.TestingStreamStateHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.EmptyFileMergingOperatorStreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class SharedStateRegistryTest {
    private static final String RESTORED_STATE_ID = "restored-state";

    SharedStateRegistryTest() {
    }

    @Test
    void testRegistryNormal() {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        TestSharedState firstState = new TestSharedState("first");
        StreamStateHandle result = sharedStateRegistry.registerReference(firstState.getRegistrationKey(), (StreamStateHandle)firstState, 0L);
        Assertions.assertThat((Object)result).isSameAs((Object)firstState);
        Assertions.assertThat((boolean)firstState.isDiscarded()).isFalse();
        TestSharedState secondState = new TestSharedState("second");
        result = sharedStateRegistry.registerReference(secondState.getRegistrationKey(), (StreamStateHandle)secondState, 0L);
        Assertions.assertThat((Object)result).isSameAs((Object)secondState);
        Assertions.assertThat((boolean)firstState.isDiscarded()).isFalse();
        Assertions.assertThat((boolean)secondState.isDiscarded()).isFalse();
        sharedStateRegistry.unregisterUnusedState(1L);
        Assertions.assertThat((boolean)secondState.isDiscarded()).isTrue();
        Assertions.assertThat((boolean)firstState.isDiscarded()).isTrue();
    }

    @Test
    void testUnregisterWithUnexistedKey() {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        sharedStateRegistry.unregisterUnusedState(-1L);
        sharedStateRegistry.unregisterUnusedState(Long.MAX_VALUE);
    }

    @Test
    void testRegisterChangelogStateBackendHandles() throws InterruptedException {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        long materializationId1 = 1L;
        ChangelogTestUtils.IncrementalStateHandleWrapper materializedStateBase1 = ChangelogTestUtils.createDummyIncrementalStateHandle(materializationId1);
        ChangelogTestUtils.IncrementalStateHandleWrapper materializedState1 = materializedStateBase1.deserialize();
        ChangelogTestUtils.ChangelogStateHandleWrapper nonMaterializedState1 = ChangelogTestUtils.createDummyChangelogStateHandle(1L, 2L);
        long materializationId = 1L;
        long checkpointId1 = 41L;
        ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl changelogStateBackendHandle1 = new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(Collections.singletonList(materializedState1), Collections.singletonList(nonMaterializedState1), materializedStateBase1.getKeyGroupRange(), checkpointId1, materializationId, nonMaterializedState1.getStateSize());
        changelogStateBackendHandle1.registerSharedStates((SharedStateRegistry)sharedStateRegistry, checkpointId1);
        sharedStateRegistry.checkpointCompleted(checkpointId1);
        sharedStateRegistry.unregisterUnusedState(checkpointId1);
        ChangelogTestUtils.IncrementalStateHandleWrapper materializedState2 = materializedStateBase1.deserialize();
        ChangelogTestUtils.ChangelogStateHandleWrapper nonMaterializedState2 = ChangelogTestUtils.createDummyChangelogStateHandle(2L, 3L);
        long checkpointId2 = 42L;
        ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl changelogStateBackendHandle2 = new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(Collections.singletonList(materializedState2), Collections.singletonList(nonMaterializedState2), materializedStateBase1.getKeyGroupRange(), checkpointId2, materializationId, nonMaterializedState2.getStateSize());
        changelogStateBackendHandle2.registerSharedStates((SharedStateRegistry)sharedStateRegistry, checkpointId2);
        sharedStateRegistry.checkpointCompleted(checkpointId2);
        sharedStateRegistry.unregisterUnusedState(checkpointId2);
        Assertions.assertThat((boolean)materializedState1.isDiscarded()).isFalse();
        Assertions.assertThat((boolean)materializedState2.isDiscarded()).isFalse();
        Assertions.assertThat((boolean)nonMaterializedState1.isDiscarded()).isTrue();
        long materializationId2 = 2L;
        ChangelogTestUtils.IncrementalStateHandleWrapper materializedStateBase2 = ChangelogTestUtils.createDummyIncrementalStateHandle(materializationId2);
        ChangelogTestUtils.IncrementalStateHandleWrapper materializedState3 = materializedStateBase2.deserialize();
        long checkpointId3 = 43L;
        ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl changelogStateBackendHandle3 = new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(Collections.singletonList(materializedState3), Collections.singletonList(nonMaterializedState2), materializedState3.getKeyGroupRange(), checkpointId3, materializationId2, 0L);
        changelogStateBackendHandle3.registerSharedStates((SharedStateRegistry)sharedStateRegistry, checkpointId3);
        sharedStateRegistry.checkpointCompleted(checkpointId3);
        sharedStateRegistry.unregisterUnusedState(checkpointId3);
        Assertions.assertThat((boolean)materializedState1.isDiscarded()).isTrue();
        Assertions.assertThat((boolean)nonMaterializedState2.isDiscarded()).isFalse();
    }

    @Test
    void testUnregisterUnusedSavepointState() {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        TestingStreamStateHandle handle = new TestingStreamStateHandle();
        this.registerInitialCheckpoint((SharedStateRegistry)sharedStateRegistry, RESTORED_STATE_ID, CheckpointProperties.forSavepoint((boolean)false, (SavepointFormatType)SavepointFormatType.NATIVE));
        sharedStateRegistry.registerReference(new SharedStateRegistryKey(RESTORED_STATE_ID), (StreamStateHandle)handle, 2L);
        sharedStateRegistry.registerReference(new SharedStateRegistryKey(RESTORED_STATE_ID), (StreamStateHandle)handle, 3L);
        sharedStateRegistry.registerReference(new SharedStateRegistryKey("new-state"), (StreamStateHandle)new TestingStreamStateHandle(), 4L);
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)sharedStateRegistry.unregisterUnusedState(3L)).withFailMessage("Only the initial checkpoint should be retained because its state is in use", new Object[0])).containsExactly((Object[])new Long[]{1L});
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)sharedStateRegistry.unregisterUnusedState(4L)).withFailMessage("The initial checkpoint state is unused so it could be discarded", new Object[0])).isEmpty();
    }

    @Test
    void testUnregisterNonInitialCheckpoint() {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        String stateId = "stateId";
        byte[] stateContent = stateId.getBytes(StandardCharsets.UTF_8);
        sharedStateRegistry.registerReference(new SharedStateRegistryKey(stateId), (StreamStateHandle)new TestingStreamStateHandle(stateId, stateContent), 1L);
        sharedStateRegistry.registerReference(new SharedStateRegistryKey(stateId), (StreamStateHandle)new TestingStreamStateHandle(stateId, stateContent), 2L);
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)sharedStateRegistry.unregisterUnusedState(2L)).withFailMessage("First (non-initial) checkpoint could be discarded", new Object[0])).isEmpty();
    }

    @Test
    void testUnregisterInitialCheckpoint() {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        TestingStreamStateHandle handle = new TestingStreamStateHandle();
        this.registerInitialCheckpoint((SharedStateRegistry)sharedStateRegistry, RESTORED_STATE_ID, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION));
        sharedStateRegistry.registerReference(new SharedStateRegistryKey(RESTORED_STATE_ID), (StreamStateHandle)handle, 2L);
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)sharedStateRegistry.unregisterUnusedState(2L)).withFailMessage("(retained) checkpoint - should NOT be considered in use even if its state is in use", new Object[0])).isEmpty();
    }

    @Test
    void testUnregisterInitialCheckpointUsedInChangelog() {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        TestingStreamStateHandle handle = new TestingStreamStateHandle();
        this.registerInitialCheckpoint((SharedStateRegistry)sharedStateRegistry, RESTORED_STATE_ID, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION));
        sharedStateRegistry.registerReference(new SharedStateRegistryKey(RESTORED_STATE_ID), (StreamStateHandle)handle, 2L, true);
        sharedStateRegistry.registerReference(new SharedStateRegistryKey(RESTORED_STATE_ID), (StreamStateHandle)handle, 3L, false);
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)sharedStateRegistry.unregisterUnusedState(3L)).withFailMessage("(retained) checkpoint - should be considered in use as long as its state is in use by changelog", new Object[0])).containsExactly((Object[])new Long[]{1L});
    }

    @Test
    void testFireMergingOperatorStateRegister(@TempDir File tmpFolder) throws IOException {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        Path checkpointBaseDir = new Path(tmpFolder.toString());
        Path sharedStateDir = new Path(checkpointBaseDir, "shared");
        Path taskOwnedStateDir = new Path(checkpointBaseDir, "taskowned");
        FileMergingSnapshotManager.SubtaskKey subtaskKey = new FileMergingSnapshotManager.SubtaskKey("opId", 1, 2);
        FileMergingSnapshotManager snapshotManager = this.createFileMergingSnapshotManager(checkpointBaseDir, sharedStateDir, taskOwnedStateDir);
        snapshotManager.registerSubtaskForSharedStates(subtaskKey);
        EmptyFileMergingOperatorStreamStateHandle handle1 = EmptyFileMergingOperatorStreamStateHandle.create((DirectoryStreamStateHandle)snapshotManager.getManagedDirStateHandle(subtaskKey, CheckpointedStateScope.EXCLUSIVE), (DirectoryStreamStateHandle)snapshotManager.getManagedDirStateHandle(subtaskKey, CheckpointedStateScope.SHARED));
        handle1.registerSharedStates((SharedStateRegistry)sharedStateRegistry, 1L);
        LocalFileSystem fs = LocalFileSystem.getSharedInstance();
        Assertions.assertThat((boolean)fs.exists(snapshotManager.getManagedDir(subtaskKey, CheckpointedStateScope.EXCLUSIVE))).isTrue();
        Assertions.assertThat((boolean)fs.exists(snapshotManager.getManagedDir(subtaskKey, CheckpointedStateScope.SHARED))).isTrue();
        sharedStateRegistry.checkpointCompleted(1L);
        sharedStateRegistry.unregisterUnusedState(1L);
        Assertions.assertThat((boolean)fs.exists(snapshotManager.getManagedDir(subtaskKey, CheckpointedStateScope.EXCLUSIVE))).isTrue();
        Assertions.assertThat((boolean)fs.exists(snapshotManager.getManagedDir(subtaskKey, CheckpointedStateScope.SHARED))).isTrue();
        sharedStateRegistry.unregisterUnusedState(2L);
        Assertions.assertThat((boolean)fs.exists(snapshotManager.getManagedDir(subtaskKey, CheckpointedStateScope.EXCLUSIVE))).isFalse();
        Assertions.assertThat((boolean)fs.exists(snapshotManager.getManagedDir(subtaskKey, CheckpointedStateScope.SHARED))).isFalse();
    }

    private FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpointBaseDir, Path sharedStateDir, Path taskOwnedStateDir) {
        FileMergingSnapshotManager mgr = new FileMergingSnapshotManagerBuilder("test-1", FileMergingType.MERGE_WITHIN_CHECKPOINT).build();
        mgr.initFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), checkpointBaseDir, sharedStateDir, taskOwnedStateDir, 1024);
        return mgr;
    }

    private void registerInitialCheckpoint(SharedStateRegistry sharedStateRegistry, String stateId, CheckpointProperties properties) {
        IncrementalRemoteKeyedStateHandle initialHandle = IncrementalRemoteKeyedStateHandle.restore((UUID)UUID.randomUUID(), (KeyGroupRange)KeyGroupRange.EMPTY_KEY_GROUP_RANGE, (long)1L, Collections.emptyList(), Collections.emptyList(), (StreamStateHandle)new ByteStreamStateHandle("meta", new byte[1]), (long)1024L, (StateHandleID)new StateHandleID(stateId));
        OperatorID operatorID = new OperatorID();
        OperatorState operatorState = new OperatorState(operatorID, 1, 1);
        operatorState.putState(0, OperatorSubtaskState.builder().setManagedKeyedState((KeyedStateHandle)initialHandle).build());
        sharedStateRegistry.registerAllAfterRestored(new CompletedCheckpoint(new JobID(), 1L, 1L, 1L, Collections.singletonMap(operatorID, operatorState), Collections.emptyList(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation(), null, properties), RestoreMode.DEFAULT);
    }

    private static class TestSharedState
    implements TestStreamStateHandle {
        private static final long serialVersionUID = 4468635881465159780L;
        private SharedStateRegistryKey key;
        private boolean discarded;

        TestSharedState(String key) {
            this.key = new SharedStateRegistryKey(key);
            this.discarded = false;
        }

        public SharedStateRegistryKey getRegistrationKey() {
            return this.key;
        }

        public void discardState() throws Exception {
            this.discarded = true;
        }

        public long getStateSize() {
            return this.key.toString().length();
        }

        public int hashCode() {
            return this.key.hashCode();
        }

        public FSDataInputStream openInputStream() throws IOException {
            throw new UnsupportedOperationException();
        }

        public Optional<byte[]> asBytesIfInMemory() {
            return Optional.empty();
        }

        public boolean isDiscarded() {
            return this.discarded;
        }
    }
}

