/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.filesystem;

import java.io.DataInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.function.Supplier;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

@RunWith(value=Parameterized.class)
public class FsCheckpointStateOutputStreamTest {
    @Parameterized.Parameter
    public boolean relativePaths;
    @Rule
    public final TemporaryFolder tempDir = new TemporaryFolder();

    @Parameterized.Parameters(name="relativePaths = {0}")
    public static List<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    @Test(expected=IllegalArgumentException.class)
    public void testWrongParameters() throws Exception {
        new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)this.tempDir.newFolder()), FileSystem.getLocalFileSystem(), 4000, 5000, this.relativePaths);
    }

    @Test
    public void testEmptyState() throws Exception {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)this.tempDir.newFolder()), FileSystem.getLocalFileSystem(), 1024, 512, this.relativePaths);
        StreamStateHandle handle = stream.closeAndGetHandle();
        Assert.assertNull((Object)handle);
    }

    @Test
    public void testStateBelowMemThreshold() throws Exception {
        this.runTest(999, 1024, 1000, false);
    }

    @Test
    public void testStateOneBufferAboveThreshold() throws Exception {
        this.runTest(896, 1024, 15, true);
    }

    @Test
    public void testStateAboveMemThreshold() throws Exception {
        this.runTest(576446, 259, 17, true);
    }

    @Test
    public void testZeroThreshold() throws Exception {
        this.runTest(16678, 4096, 0, true);
    }

    @Test
    public void testGetPos() throws Exception {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)this.tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17, this.relativePaths);
        for (int i = 0; i < 64; ++i) {
            Assert.assertEquals((long)i, (long)stream.getPos());
            stream.write(66);
        }
        stream.closeAndGetHandle();
        stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)this.tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17, this.relativePaths);
        byte[] data = "testme!".getBytes(ConfigConstants.DEFAULT_CHARSET);
        for (int i = 0; i < 7; ++i) {
            Assert.assertEquals((long)(i * (1 + data.length)), (long)stream.getPos());
            stream.write(66);
            stream.write(data);
        }
        stream.closeAndGetHandle();
    }

    @Test
    public void testCleanupWhenClosingStream() throws IOException {
        FileSystem fs = (FileSystem)Mockito.mock(FileSystem.class);
        FSDataOutputStream outputStream = (FSDataOutputStream)Mockito.mock(FSDataOutputStream.class);
        ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class);
        Mockito.when((Object)fs.create((Path)pathCaptor.capture(), (FileSystem.WriteMode)Matchers.any(FileSystem.WriteMode.class))).thenReturn((Object)outputStream);
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)this.tempDir.newFolder()), fs, 4, 0, this.relativePaths);
        stream.write(new byte[]{1, 2, 3, 4, 5});
        ((FileSystem)Mockito.verify((Object)fs)).create((Path)Matchers.any(Path.class), (FileSystem.WriteMode)Matchers.any(FileSystem.WriteMode.class));
        stream.close();
        ((FileSystem)Mockito.verify((Object)fs)).delete((Path)Matchers.eq((Object)pathCaptor.getValue()), Matchers.anyBoolean());
    }

    @Test
    public void testCleanupWhenFailingCloseAndGetHandle() throws IOException {
        FileSystem fs = (FileSystem)Mockito.mock(FileSystem.class);
        FSDataOutputStream outputStream = (FSDataOutputStream)Mockito.mock(FSDataOutputStream.class);
        ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class);
        Mockito.when((Object)fs.create((Path)pathCaptor.capture(), (FileSystem.WriteMode)Matchers.any(FileSystem.WriteMode.class))).thenReturn((Object)outputStream);
        ((FSDataOutputStream)Mockito.doThrow((Throwable[])new Throwable[]{new IOException("Test IOException.")}).when((Object)outputStream)).close();
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)this.tempDir.newFolder()), fs, 4, 0, this.relativePaths);
        stream.write(new byte[]{1, 2, 3, 4, 5});
        ((FileSystem)Mockito.verify((Object)fs)).create((Path)Matchers.any(Path.class), (FileSystem.WriteMode)Matchers.any(FileSystem.WriteMode.class));
        try {
            stream.closeAndGetHandle();
            Assert.fail((String)"Expected IOException");
        }
        catch (IOException iOException) {
            // empty catch block
        }
        ((FileSystem)Mockito.verify((Object)fs)).delete((Path)Matchers.eq((Object)pathCaptor.getValue()), Matchers.anyBoolean());
    }

    private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)this.tempDir.newFolder()), FileSystem.getLocalFileSystem(), bufferSize, threshold, this.relativePaths);
        Random rnd = new Random();
        byte[] original = new byte[numBytes];
        byte[] bytes = new byte[original.length];
        rnd.nextBytes(original);
        System.arraycopy(original, 0, bytes, 0, original.length);
        int pos = 0;
        while (pos < bytes.length) {
            boolean single = rnd.nextBoolean();
            if (single) {
                stream.write((int)bytes[pos++]);
                continue;
            }
            int num = rnd.nextBoolean() ? bytes.length - pos : rnd.nextInt(bytes.length - pos);
            stream.write(bytes, pos, num);
            pos += num;
        }
        StreamStateHandle handle = stream.closeAndGetHandle();
        if (expectFile) {
            Assert.assertTrue((boolean)(handle instanceof FileStateHandle));
        } else {
            Assert.assertTrue((boolean)(handle instanceof ByteStreamStateHandle));
        }
        Assert.assertArrayEquals((byte[])original, (byte[])bytes);
        try (FSDataInputStream inStream = handle.openInputStream();){
            byte[] validation = new byte[bytes.length];
            DataInputStream dataInputStream = new DataInputStream((InputStream)inStream);
            dataInputStream.readFully(validation);
            Assert.assertArrayEquals((byte[])bytes, (byte[])validation);
        }
        handle.discardState();
    }

    @Test
    public void testWriteFailsFastWhenClosed() throws Exception {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)this.tempDir.newFolder()), FileSystem.getLocalFileSystem(), 1024, 512, this.relativePaths);
        Assert.assertFalse((boolean)stream.isClosed());
        stream.close();
        Assert.assertTrue((boolean)stream.isClosed());
        try {
            stream.write(1);
            Assert.fail();
        }
        catch (IOException iOException) {
            // empty catch block
        }
        try {
            stream.write(new byte[4], 1, 2);
            Assert.fail();
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    @Test
    public void testMixedBelowAndAboveThreshold() throws Exception {
        StreamStateHandle handle4;
        byte[] state1 = new byte[0x137331];
        byte[] state2 = new byte[1];
        byte[] state3 = new byte[]{};
        byte[] state4 = new byte[177];
        Random rnd = new Random();
        rnd.nextBytes(state1);
        rnd.nextBytes(state2);
        rnd.nextBytes(state3);
        rnd.nextBytes(state4);
        File directory = this.tempDir.newFolder();
        Path basePath = Path.fromLocalFile((File)directory);
        Supplier<CheckpointStateOutputStream> factory = () -> new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(basePath, FileSystem.getLocalFileSystem(), 1024, 15, this.relativePaths);
        CheckpointStateOutputStream stream1 = factory.get();
        CheckpointStateOutputStream stream2 = factory.get();
        CheckpointStateOutputStream stream3 = factory.get();
        stream1.write(state1);
        stream2.write(state2);
        stream3.write(state3);
        FileStateHandle handle1 = (FileStateHandle)stream1.closeAndGetHandle();
        ByteStreamStateHandle handle2 = (ByteStreamStateHandle)stream2.closeAndGetHandle();
        ByteStreamStateHandle handle3 = (ByteStreamStateHandle)stream3.closeAndGetHandle();
        try (CheckpointStateOutputStream stream4 = factory.get();){
            stream4.write(state4);
            handle4 = stream4.closeAndGetHandle();
        }
        CheckpointStateOutputStream stream5 = factory.get();
        stream5.write(state4);
        stream5.close();
        try {
            stream5.closeAndGetHandle();
            Assert.fail();
        }
        catch (IOException iOException) {
            // empty catch block
        }
        FsCheckpointStateOutputStreamTest.validateBytesInStream((InputStream)handle1.openInputStream(), state1);
        handle1.discardState();
        Assert.assertFalse((boolean)FsCheckpointStateOutputStreamTest.isDirectoryEmpty(directory));
        FsCheckpointStateOutputStreamTest.ensureLocalFileDeleted(handle1.getFilePath());
        FsCheckpointStateOutputStreamTest.validateBytesInStream((InputStream)handle2.openInputStream(), state2);
        handle2.discardState();
        Assert.assertFalse((boolean)FsCheckpointStateOutputStreamTest.isDirectoryEmpty(directory));
        Assert.assertNull((Object)handle3);
        Assert.assertFalse((boolean)FsCheckpointStateOutputStreamTest.isDirectoryEmpty(directory));
        FsCheckpointStateOutputStreamTest.validateBytesInStream((InputStream)handle4.openInputStream(), state4);
        handle4.discardState();
        Assert.assertTrue((boolean)FsCheckpointStateOutputStreamTest.isDirectoryEmpty(directory));
    }

    @Test
    public void testStreamDoesNotTryToCleanUpParentOnError() throws Exception {
        File directory = this.tempDir.newFolder();
        Assume.assumeTrue((boolean)directory.setWritable(false, true));
        FsCheckpointStateOutputStreamTest.checkDirectoryNotWritable(directory);
        FileSystem fs = (FileSystem)Mockito.spy((Object)FileSystem.getLocalFileSystem());
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream1 = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)directory), fs, 1024, 1, this.relativePaths);
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream2 = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile((File)directory), fs, 1024, 1, this.relativePaths);
        stream1.write(new byte[61]);
        stream2.write(new byte[61]);
        try {
            stream1.closeAndGetHandle();
            Assert.fail((String)"this should fail with an exception");
        }
        catch (IOException iOException) {
            // empty catch block
        }
        stream2.close();
        ((FileSystem)Mockito.verify((Object)fs, (VerificationMode)Mockito.times((int)0))).delete((Path)Matchers.any(Path.class), Matchers.anyBoolean());
        Assert.assertTrue((boolean)directory.exists());
        Assert.assertTrue((boolean)directory.isDirectory());
    }

    private static void ensureLocalFileDeleted(Path path) {
        URI uri = path.toUri();
        if (!"file".equals(uri.getScheme())) {
            throw new IllegalArgumentException("not a local path");
        }
        File file = new File(uri.getPath());
        Assert.assertFalse((String)"file not properly deleted", (boolean)file.exists());
    }

    private static boolean isDirectoryEmpty(File directory) {
        if (!directory.exists()) {
            return true;
        }
        String[] nested = directory.list();
        return nested == null || nested.length == 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void validateBytesInStream(InputStream is, byte[] data) throws IOException {
        try {
            int pos;
            int read;
            byte[] holder = new byte[data.length];
            for (pos = 0; pos < holder.length && (read = is.read(holder, pos, holder.length - pos)) != -1; pos += read) {
            }
            Assert.assertEquals((String)"not enough data", (long)holder.length, (long)pos);
            Assert.assertEquals((String)"too much data", (long)-1L, (long)is.read());
            Assert.assertArrayEquals((String)"wrong data", (byte[])data, (byte[])holder);
        }
        finally {
            is.close();
        }
    }

    private static void checkDirectoryNotWritable(File directory) {
        try {
            try (FileOutputStream fos = new FileOutputStream(new File(directory, "temp"));){
                fos.write(42);
                fos.flush();
            }
            Assert.fail((String)"this should fail when writing is properly prevented");
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }
}

