/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.filemerging;

import java.io.IOException;
import java.util.Collections;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerTestBase;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType;
import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.util.function.BiFunctionWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class AcrossCheckpointFileMergingSnapshotManagerTest
extends FileMergingSnapshotManagerTestBase {
    @Override
    FileMergingType getFileMergingType() {
        return FileMergingType.MERGE_ACROSS_CHECKPOINT;
    }

    @Test
    void testCreateAndReuseFiles() throws IOException {
        try (FileMergingSnapshotManagerBase fmsm = (FileMergingSnapshotManagerBase)this.createFileMergingSnapshotManager(this.checkpointBaseDir);){
            fmsm.registerSubtaskForSharedStates(this.subtaskKey1);
            fmsm.registerSubtaskForSharedStates(this.subtaskKey2);
            PhysicalFile file1 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.SHARED);
            Assertions.assertThat((Object)file1.getFilePath().getParent()).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.SHARED));
            PhysicalFile file2 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 0L, CheckpointedStateScope.SHARED);
            Assertions.assertThat((Object)file2.getFilePath().getParent()).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.SHARED));
            Assertions.assertThat((Object)file2).isNotEqualTo((Object)file1);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2L);
            fmsm.returnPhysicalFileForNextReuse(this.subtaskKey1, 0L, file1);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2L);
            PhysicalFile file3 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey2, 0L, CheckpointedStateScope.SHARED);
            Assertions.assertThat((Object)file3.getFilePath().getParent()).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey2, CheckpointedStateScope.SHARED));
            Assertions.assertThat((Object)file3).isNotEqualTo((Object)file1);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(3L);
            PhysicalFile file4 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 1L, CheckpointedStateScope.SHARED);
            Assertions.assertThat((Object)file4.getFilePath().getParent()).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.SHARED));
            Assertions.assertThat((Object)file4).isEqualTo((Object)file1);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(3L);
            file4.incSize(fmsm.maxPhysicalFileSize);
            fmsm.returnPhysicalFileForNextReuse(this.subtaskKey1, 1L, file4);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2L);
            PhysicalFile file5 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 1L, CheckpointedStateScope.SHARED);
            Assertions.assertThat((Object)file5.getFilePath().getParent()).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.SHARED));
            Assertions.assertThat((Object)file5).isNotEqualTo((Object)file4);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(3L);
            PhysicalFile file6 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 1L, CheckpointedStateScope.EXCLUSIVE);
            Assertions.assertThat((Object)file6.getFilePath().getParent()).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.EXCLUSIVE));
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(4L);
            PhysicalFile file7 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 1L, CheckpointedStateScope.EXCLUSIVE);
            Assertions.assertThat((Object)file7.getFilePath().getParent()).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.EXCLUSIVE));
            Assertions.assertThat((Object)file7).isNotEqualTo((Object)file5);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5L);
            fmsm.returnPhysicalFileForNextReuse(this.subtaskKey1, 0L, file6);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5L);
            PhysicalFile file8 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 2L, CheckpointedStateScope.EXCLUSIVE);
            Assertions.assertThat((Object)file8.getFilePath().getParent()).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.EXCLUSIVE));
            Assertions.assertThat((Object)file8).isEqualTo((Object)file6);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5L);
            fmsm.returnPhysicalFileForNextReuse(this.subtaskKey1, 0L, file8);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5L);
            PhysicalFile file9 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey2, 2L, CheckpointedStateScope.EXCLUSIVE);
            Assertions.assertThat((Object)file9.getFilePath().getParent()).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey2, CheckpointedStateScope.EXCLUSIVE));
            Assertions.assertThat((Object)file9).isEqualTo((Object)file6);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5L);
            file9.incSize(fmsm.maxPhysicalFileSize);
            fmsm.returnPhysicalFileForNextReuse(this.subtaskKey1, 2L, file9);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(4L);
            PhysicalFile file10 = fmsm.getOrCreatePhysicalFileForCheckpoint(this.subtaskKey1, 2L, CheckpointedStateScope.SHARED);
            Assertions.assertThat((Object)file10.getFilePath().getParent()).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.SHARED));
            Assertions.assertThat((Object)file10).isNotEqualTo((Object)file9);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5L);
            Assertions.assertThat((Object)fmsm.getManagedDir(this.subtaskKey2, CheckpointedStateScope.EXCLUSIVE)).isEqualTo((Object)fmsm.getManagedDir(this.subtaskKey1, CheckpointedStateScope.EXCLUSIVE));
        }
    }

    @Test
    public void testCheckpointNotification() throws Exception {
        try (FileMergingSnapshotManagerBase fmsm = (FileMergingSnapshotManagerBase)this.createFileMergingSnapshotManager(this.checkpointBaseDir);
             CloseableRegistry closeableRegistry = new CloseableRegistry();){
            fmsm.registerSubtaskForSharedStates(this.subtaskKey1);
            fmsm.registerSubtaskForSharedStates(this.subtaskKey2);
            BiFunctionWithException writer = (subtaskKey, checkpointId) -> this.writeCheckpointAndGetStream((FileMergingSnapshotManager.SubtaskKey)subtaskKey, (long)checkpointId, CheckpointedStateScope.SHARED, (FileMergingSnapshotManager)fmsm, closeableRegistry).closeAndGetHandle();
            SegmentFileStateHandle cp1StateHandle1 = (SegmentFileStateHandle)writer.apply((Object)this.subtaskKey1, (Object)1L);
            SegmentFileStateHandle cp1StateHandle2 = (SegmentFileStateHandle)writer.apply((Object)this.subtaskKey2, (Object)1L);
            fmsm.notifyCheckpointComplete(this.subtaskKey1, 1L);
            this.assertFileInManagedDir((FileMergingSnapshotManager)fmsm, cp1StateHandle1);
            this.assertFileInManagedDir((FileMergingSnapshotManager)fmsm, cp1StateHandle2);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2L);
            SegmentFileStateHandle cp2StateHandle1 = (SegmentFileStateHandle)writer.apply((Object)this.subtaskKey1, (Object)2L);
            SegmentFileStateHandle cp2StateHandle2 = (SegmentFileStateHandle)writer.apply((Object)this.subtaskKey2, (Object)2L);
            fmsm.notifyCheckpointComplete(this.subtaskKey1, 2L);
            fmsm.notifyCheckpointComplete(this.subtaskKey2, 2L);
            this.assertFileInManagedDir((FileMergingSnapshotManager)fmsm, cp2StateHandle1);
            this.assertFileInManagedDir((FileMergingSnapshotManager)fmsm, cp2StateHandle2);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileCount.get()).isEqualTo(4L);
            Assertions.assertThat((boolean)fmsm.isCheckpointDiscard(1L)).isFalse();
            Assertions.assertThat((boolean)this.fileExists(cp1StateHandle1)).isTrue();
            Assertions.assertThat((boolean)this.fileExists(cp1StateHandle2)).isTrue();
            fmsm.notifyCheckpointSubsumed(this.subtaskKey1, 1L);
            Assertions.assertThat((boolean)this.fileExists(cp1StateHandle1)).isTrue();
            Assertions.assertThat((boolean)this.fileExists(cp1StateHandle2)).isTrue();
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileCount.get()).isEqualTo(3L);
            Assertions.assertThat((boolean)fmsm.isCheckpointDiscard(1L)).isFalse();
            fmsm.notifyCheckpointSubsumed(this.subtaskKey2, 1L);
            Assertions.assertThat((boolean)fmsm.isCheckpointDiscard(1L)).isTrue();
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2L);
            SegmentFileStateHandle cp3StateHandle1 = (SegmentFileStateHandle)writer.apply((Object)this.subtaskKey1, (Object)3L);
            SegmentFileStateHandle cp3StateHandle2 = (SegmentFileStateHandle)writer.apply((Object)this.subtaskKey2, (Object)3L);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileCount.get()).isEqualTo(4L);
            this.assertFileInManagedDir((FileMergingSnapshotManager)fmsm, cp3StateHandle1);
            this.assertFileInManagedDir((FileMergingSnapshotManager)fmsm, cp3StateHandle2);
            fmsm.notifyCheckpointAborted(this.subtaskKey1, 3L);
            Assertions.assertThat((boolean)this.fileExists(cp3StateHandle1)).isTrue();
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileCount.get()).isEqualTo(3L);
            Assertions.assertThat((boolean)fmsm.isCheckpointDiscard(3L)).isFalse();
            fmsm.notifyCheckpointAborted(this.subtaskKey2, 3L);
            Assertions.assertThat((boolean)fmsm.isCheckpointDiscard(3L)).isTrue();
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2L);
        }
    }

    @Test
    public void testSpaceControl() throws Exception {
        try (FileMergingSnapshotManagerBase fmsm = (FileMergingSnapshotManagerBase)this.createFileMergingSnapshotManager(this.checkpointBaseDir);
             CloseableRegistry closeableRegistry = new CloseableRegistry();){
            fmsm.registerSubtaskForSharedStates(this.subtaskKey1);
            BiFunctionWithException writer = (checkpointId, size) -> this.writeCheckpointAndGetStream(this.subtaskKey1, (long)checkpointId, CheckpointedStateScope.SHARED, (FileMergingSnapshotManager)fmsm, closeableRegistry, (int)size).closeAndGetHandle();
            Integer eighthOfFile = 0x400000;
            SegmentFileStateHandle stateHandle1 = (SegmentFileStateHandle)writer.apply((Object)1L, (Object)eighthOfFile);
            SegmentFileStateHandle stateHandle2 = (SegmentFileStateHandle)writer.apply((Object)1L, (Object)eighthOfFile);
            SegmentFileStateHandle stateHandle3 = (SegmentFileStateHandle)writer.apply((Object)1L, (Object)eighthOfFile);
            SegmentFileStateHandle stateHandle4 = (SegmentFileStateHandle)writer.apply((Object)1L, (Object)eighthOfFile);
            SegmentFileStateHandle stateHandle5 = (SegmentFileStateHandle)writer.apply((Object)1L, (Object)eighthOfFile);
            SegmentFileStateHandle stateHandle6 = (SegmentFileStateHandle)writer.apply((Object)1L, (Object)eighthOfFile);
            fmsm.notifyCheckpointComplete(this.subtaskKey1, 1L);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileCount.get()).isEqualTo(6L);
            Assertions.assertThat((boolean)fmsm.couldReusePreviousStateHandle((StreamStateHandle)stateHandle1)).isTrue();
            SegmentFileStateHandle stateHandle7 = (SegmentFileStateHandle)writer.apply((Object)2L, (Object)eighthOfFile);
            SegmentFileStateHandle stateHandle8 = (SegmentFileStateHandle)writer.apply((Object)2L, (Object)eighthOfFile);
            SegmentFileStateHandle stateHandle9 = (SegmentFileStateHandle)writer.apply((Object)2L, (Object)eighthOfFile);
            fmsm.reusePreviousStateHandle(2L, Collections.singletonList(stateHandle1));
            fmsm.notifyCheckpointComplete(this.subtaskKey1, 2L);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileCount.get()).isEqualTo(9L);
            fmsm.notifyCheckpointSubsumed(this.subtaskKey1, 1L);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileCount.get()).isEqualTo(4L);
            Assertions.assertThat((boolean)fmsm.couldReusePreviousStateHandle((StreamStateHandle)stateHandle1)).isFalse();
            Assertions.assertThat((boolean)fmsm.couldReusePreviousStateHandle((StreamStateHandle)stateHandle7)).isFalse();
            Assertions.assertThat((boolean)fmsm.couldReusePreviousStateHandle((StreamStateHandle)stateHandle9)).isTrue();
            SegmentFileStateHandle stateHandle10 = (SegmentFileStateHandle)writer.apply((Object)3L, (Object)eighthOfFile);
            SegmentFileStateHandle stateHandle11 = (SegmentFileStateHandle)writer.apply((Object)3L, (Object)eighthOfFile);
            SegmentFileStateHandle stateHandle12 = (SegmentFileStateHandle)writer.apply((Object)3L, (Object)eighthOfFile);
            fmsm.notifyCheckpointComplete(this.subtaskKey1, 3L);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileCount.get()).isEqualTo(7L);
            fmsm.notifyCheckpointSubsumed(this.subtaskKey1, 2L);
            Assertions.assertThat((long)fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1L);
            Assertions.assertThat((long)fmsm.spaceStat.logicalFileCount.get()).isEqualTo(3L);
        }
    }
}

