/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.filesystem;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import javax.annotation.Nonnull;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
import org.apache.flink.runtime.state.filesystem.FsMergingCheckpointStorageLocation;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class FsMergingCheckpointStorageLocationTest {
    @Rule
    public final TemporaryFolder tmpFolder = new TemporaryFolder();
    public static Path checkpointBaseDir;
    public static Path sharedStateDir;
    public static Path taskOwnedStateDir;
    private final Random random = new Random();
    private static final String SNAPSHOT_MGR_ID = "snapshotMgrId";
    private static final int FILE_STATE_SIZE_THRESHOLD = 1024;
    private static final int WRITE_BUFFER_SIZE = 1024;
    private static final FileMergingSnapshotManager.SubtaskKey SUBTASK_KEY;

    @Before
    public void prepareDirectories() {
        checkpointBaseDir = new Path(this.tmpFolder.toString());
        sharedStateDir = new Path(checkpointBaseDir, "shared");
        taskOwnedStateDir = new Path(checkpointBaseDir, "taskowned");
    }

    @Test
    public void testWriteMultipleStateFilesWithinCheckpoint() throws Exception {
        this.testWriteMultipleStateFiles();
    }

    private void testWriteMultipleStateFiles() throws Exception {
        FileMergingSnapshotManager snapshotManager = this.createFileMergingSnapshotManager();
        long checkpointID = 1L;
        FsMergingCheckpointStorageLocation storageLocation = this.createFsMergingCheckpointStorageLocation(checkpointID, snapshotManager);
        FileSystem fs = storageLocation.getFileSystem();
        Assertions.assertThat((boolean)fs.exists(snapshotManager.getManagedDir(SUBTASK_KEY, CheckpointedStateScope.EXCLUSIVE))).isTrue();
        Assertions.assertThat((boolean)fs.exists(snapshotManager.getManagedDir(SUBTASK_KEY, CheckpointedStateScope.SHARED))).isTrue();
        int numStates = 3;
        List<byte[]> states = this.generateRandomByteStates(numStates, 2, 16);
        ArrayList<SegmentFileStateHandle> stateHandles = new ArrayList<SegmentFileStateHandle>(numStates);
        for (byte[] s : states) {
            SegmentFileStateHandle segmentFileStateHandle = this.uploadOneStateFileAndGetStateHandle(checkpointID, storageLocation, s);
            stateHandles.add(segmentFileStateHandle);
        }
        this.verifyStateHandlesAllPointToTheSameFile(stateHandles);
    }

    @Test
    public void testCheckpointStreamClosedExceptionally() throws Exception {
        FileMergingSnapshotManager snapshotManager = this.createFileMergingSnapshotManager();
        Throwable throwable = null;
        try {
            Path filePath1 = null;
            try {
                FileMergingCheckpointStateOutputStream stream1 = snapshotManager.createCheckpointStateOutputStream(SUBTASK_KEY, 1L, CheckpointedStateScope.EXCLUSIVE);
                Throwable throwable2 = null;
                try {
                    try {
                        stream1.flushToFile();
                        filePath1 = stream1.getFilePath();
                        this.assertPathNotNullAndCheckExistence(filePath1, true);
                        throw new IOException();
                    }
                    catch (Throwable throwable3) {
                        throwable2 = throwable3;
                        throw throwable3;
                    }
                }
                catch (Throwable throwable4) {
                    if (stream1 != null) {
                        if (throwable2 != null) {
                            try {
                                stream1.close();
                            }
                            catch (Throwable throwable5) {
                                throwable2.addSuppressed(throwable5);
                            }
                        } else {
                            stream1.close();
                        }
                    }
                    throw throwable4;
                }
            }
            catch (IOException iOException) {
                this.assertPathNotNullAndCheckExistence(filePath1, false);
                if (snapshotManager != null) {
                    if (throwable != null) {
                        try {
                            snapshotManager.close();
                        }
                        catch (Throwable throwable6) {
                            throwable.addSuppressed(throwable6);
                        }
                    } else {
                        snapshotManager.close();
                    }
                }
            }
        }
        catch (Throwable throwable7) {
            try {
                throwable = throwable7;
                throw throwable7;
            }
            catch (Throwable throwable8) {
                if (snapshotManager != null) {
                    if (throwable != null) {
                        try {
                            snapshotManager.close();
                        }
                        catch (Throwable throwable9) {
                            throwable.addSuppressed(throwable9);
                        }
                    } else {
                        snapshotManager.close();
                    }
                }
                throw throwable8;
            }
        }
    }

    private void assertPathNotNullAndCheckExistence(Path path, boolean exist) throws IOException {
        Assertions.assertThat((Object)path).isNotNull();
        Assertions.assertThat((boolean)path.getFileSystem().exists(path)).isEqualTo(exist);
    }

    @Test
    public void testWritingToClosedStream() {
        FileMergingSnapshotManager snapshotManager = this.createFileMergingSnapshotManager();
        FsMergingCheckpointStorageLocation storageLocation = this.createFsMergingCheckpointStorageLocation(1L, snapshotManager);
        try (FileMergingCheckpointStateOutputStream stream = storageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);){
            stream.flushToFile();
            stream.closeAndGetHandle();
            stream.flushToFile();
            Assertions.fail((String)"Expected IOException");
        }
        catch (IOException e) {
            Assertions.assertThat((String)e.getMessage()).isEqualTo("Cannot call flushToFile() to a closed stream.");
        }
    }

    @Test
    public void testWriteAndReadPositionInformation() throws Exception {
        long maxFileSize = 128L;
        FileMergingSnapshotManager snapshotManager = this.createFileMergingSnapshotManager(maxFileSize);
        FsMergingCheckpointStorageLocation storageLocation1 = this.createFsMergingCheckpointStorageLocation(1L, snapshotManager);
        int stateSize1 = 10;
        List<byte[]> cp1States = this.generateRandomByteStates(1, stateSize1, stateSize1);
        this.uploadCheckpointStates(1L, cp1States, storageLocation1);
        for (int checkpointId = 2; checkpointId < 10; ++checkpointId) {
            this.testWriteAndReadPositionInformationInCheckpoint(checkpointId, maxFileSize, snapshotManager);
        }
    }

    private void testWriteAndReadPositionInformationInCheckpoint(long checkpointId, long maxFileSize, FileMergingSnapshotManager snapshotManager) throws IOException {
        FsMergingCheckpointStorageLocation storageLocation = this.createFsMergingCheckpointStorageLocation(checkpointId, snapshotManager);
        try (FileMergingCheckpointStateOutputStream stateOutputStream = storageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);){
            int stateSize = 64;
            byte[] expectedBytes = new byte[10];
            byte[] stateValues = this.generateRandomBytes(stateSize);
            stateOutputStream.write(stateValues);
            for (int i = 0; i < 10; ++i) {
                int position = this.random.nextInt(stateSize);
                byte[] positionBytes = this.longToBytes(position);
                expectedBytes[i] = stateValues[position];
                stateOutputStream.write(positionBytes);
            }
            SegmentFileStateHandle cpStateHandle = stateOutputStream.closeAndGetHandle();
            Assertions.assertThat((Object)cpStateHandle).isNotNull();
            byte[] actualBytes = new byte[10];
            byte[] oneByte = new byte[1];
            FSDataInputStream inputStream = cpStateHandle.openInputStream();
            Assertions.assertThat((InputStream)inputStream).isNotNull();
            inputStream.seek((long)stateSize);
            for (int i = 0; i < 10; ++i) {
                byte[] longBytes = new byte[8];
                int readContent = inputStream.read(longBytes);
                Assertions.assertThat((int)readContent).isEqualTo(8);
                long curPos = inputStream.getPos();
                inputStream.seek(this.bytesToLong(longBytes));
                Assertions.assertThat((inputStream.read(oneByte) >= 0 ? 1 : 0) != 0).isTrue();
                actualBytes[i] = oneByte[0];
                inputStream.seek(curPos);
            }
            Assertions.assertThat((byte[])actualBytes).isEqualTo((Object)expectedBytes);
        }
    }

    private FileMergingSnapshotManager createFileMergingSnapshotManager() {
        return this.createFileMergingSnapshotManager(-1L);
    }

    private FileMergingSnapshotManager createFileMergingSnapshotManager(long maxFileSize) {
        FileMergingSnapshotManager mgr = new FileMergingSnapshotManagerBuilder(SNAPSHOT_MGR_ID, FileMergingType.MERGE_WITHIN_CHECKPOINT).build();
        mgr.initFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), checkpointBaseDir, sharedStateDir, taskOwnedStateDir, 1024);
        mgr.registerSubtaskForSharedStates(SUBTASK_KEY);
        return mgr;
    }

    public FsMergingCheckpointStorageLocation createFsMergingCheckpointStorageLocation(long checkpointId, @Nonnull FileMergingSnapshotManager snapshotManager) {
        LocalFileSystem fs = LocalFileSystem.getSharedInstance();
        CheckpointStorageLocationReference cslReference = AbstractFsCheckpointStorageAccess.encodePathAsReference((Path)Path.fromLocalFile((File)fs.pathToFile(checkpointBaseDir)));
        Assertions.assertThat((Object)snapshotManager).isNotNull();
        return new FsMergingCheckpointStorageLocation(SUBTASK_KEY, (FileSystem)LocalFileSystem.getSharedInstance(), checkpointBaseDir, sharedStateDir, taskOwnedStateDir, cslReference, 1024, 1024, snapshotManager, checkpointId);
    }

    private SegmentFileStateHandle uploadOneStateFileAndGetStateHandle(long checkpointID, FsMergingCheckpointStorageLocation storageLocation, byte[] stateContent) throws IOException {
        try (FileMergingCheckpointStateOutputStream stateOutputStream = storageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);){
            stateOutputStream.write(stateContent);
            SegmentFileStateHandle segmentFileStateHandle = stateOutputStream.closeAndGetHandle();
            return segmentFileStateHandle;
        }
    }

    private boolean bytesEqual(byte[] bytes1, byte[] bytes2) {
        if (bytes1 == null || bytes2 == null) {
            return false;
        }
        if (bytes1.length == bytes2.length) {
            for (int i = 0; i < bytes1.length; ++i) {
                if (bytes1[i] == bytes2[i]) continue;
                return false;
            }
            return true;
        }
        return false;
    }

    private List<SegmentFileStateHandle> uploadCheckpointStates(long checkpointID, List<byte[]> states, FsMergingCheckpointStorageLocation storageLocation) throws IOException {
        ArrayList<SegmentFileStateHandle> stateHandles = new ArrayList<SegmentFileStateHandle>(states.size());
        for (byte[] state : states) {
            SegmentFileStateHandle segmentFileStateHandle = this.uploadOneStateFileAndGetStateHandle(checkpointID, storageLocation, state);
            stateHandles.add(segmentFileStateHandle);
        }
        return stateHandles;
    }

    private byte[] generateRandomBytes(int size) {
        byte[] bytes = new byte[size];
        this.random.nextBytes(bytes);
        return bytes;
    }

    private List<byte[]> generateRandomByteStates(int numStates, int perStateMinSize, int perStateMaxSize) {
        ArrayList<byte[]> result = new ArrayList<byte[]>(numStates);
        for (int i = 0; i < numStates; ++i) {
            int stateSize = this.random.nextInt(perStateMaxSize - perStateMinSize + 1) + perStateMinSize;
            result.add(this.generateRandomBytes(stateSize));
        }
        return result;
    }

    private void verifyStateHandlesAllPointToTheSameFile(List<SegmentFileStateHandle> stateHandles) {
        Path lastFilePath = null;
        for (SegmentFileStateHandle stateHandle : stateHandles) {
            Assertions.assertThat((lastFilePath == null || lastFilePath.equals((Object)stateHandle.getFilePath()) ? 1 : 0) != 0).isTrue();
            lastFilePath = stateHandle.getFilePath();
        }
    }

    private byte[] longToBytes(long x) {
        ByteBuffer buffer = ByteBuffer.allocate(8);
        buffer.putLong(x);
        return buffer.array();
    }

    private long bytesToLong(byte[] bytes) {
        ByteBuffer buffer = ByteBuffer.allocate(8);
        buffer.put(bytes);
        buffer.flip();
        return buffer.getLong();
    }

    static {
        SUBTASK_KEY = new FileMergingSnapshotManager.SubtaskKey("opId", 1, 1);
    }
}

