/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StateDirectoryTest {
    private final MockTime time = new MockTime();
    private File stateDir;
    private final String applicationId = "applicationId";
    private StateDirectory directory;
    private File appDir;

    private void initializeStateDirectory(boolean createStateDirectory) throws Exception {
        this.stateDir = new File(TestUtils.IO_TMP_DIR, "kafka-" + TestUtils.randomString((int)5));
        if (!createStateDirectory) {
            this.cleanup();
        }
        this.directory = new StateDirectory(new StreamsConfig((Map)new Properties(){
            {
                this.put("application.id", "applicationId");
                this.put("bootstrap.servers", "dummy:1234");
                this.put("state.dir", StateDirectoryTest.this.stateDir.getPath());
            }
        }), (Time)this.time, createStateDirectory);
        this.appDir = new File(this.stateDir, "applicationId");
    }

    @Before
    public void before() throws Exception {
        this.initializeStateDirectory(true);
    }

    @After
    public void cleanup() throws Exception {
        Utils.delete((File)this.stateDir);
    }

    @Test
    public void shouldCreateBaseDirectory() {
        Assert.assertTrue((boolean)this.stateDir.exists());
        Assert.assertTrue((boolean)this.stateDir.isDirectory());
        Assert.assertTrue((boolean)this.appDir.exists());
        Assert.assertTrue((boolean)this.appDir.isDirectory());
    }

    @Test
    public void shouldCreateTaskStateDirectory() {
        TaskId taskId = new TaskId(0, 0);
        File taskDirectory = this.directory.directoryForTask(taskId);
        Assert.assertTrue((boolean)taskDirectory.exists());
        Assert.assertTrue((boolean)taskDirectory.isDirectory());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldLockTaskStateDirectory() throws Exception {
        TaskId taskId = new TaskId(0, 0);
        File taskDirectory = this.directory.directoryForTask(taskId);
        this.directory.lock(taskId);
        try (FileChannel channel = FileChannel.open(new File(taskDirectory, ".lock").toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);){
            channel.tryLock();
            Assert.fail((String)"shouldn't be able to lock already locked directory");
        }
        catch (OverlappingFileLockException overlappingFileLockException) {
        }
        finally {
            this.directory.unlock(taskId);
        }
    }

    @Test
    public void shouldBeTrueIfAlreadyHoldsLock() throws Exception {
        TaskId taskId = new TaskId(0, 0);
        this.directory.directoryForTask(taskId);
        this.directory.lock(taskId);
        try {
            Assert.assertTrue((boolean)this.directory.lock(taskId));
        }
        finally {
            this.directory.unlock(taskId);
        }
    }

    @Test
    public void shouldThrowProcessorStateException() throws Exception {
        TaskId taskId = new TaskId(0, 0);
        Utils.delete((File)this.stateDir);
        try {
            this.directory.directoryForTask(taskId);
            Assert.fail((String)"Should have thrown ProcessorStateException");
        }
        catch (ProcessorStateException processorStateException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotLockDeletedDirectory() throws Exception {
        TaskId taskId = new TaskId(0, 0);
        Utils.delete((File)this.stateDir);
        Assert.assertFalse((boolean)this.directory.lock(taskId));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldLockMultipleTaskDirectories() throws Exception {
        TaskId taskId = new TaskId(0, 0);
        File task1Dir = this.directory.directoryForTask(taskId);
        TaskId taskId2 = new TaskId(1, 0);
        File task2Dir = this.directory.directoryForTask(taskId2);
        try (FileChannel channel1 = FileChannel.open(new File(task1Dir, ".lock").toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
             FileChannel channel2 = FileChannel.open(new File(task2Dir, ".lock").toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);){
            this.directory.lock(taskId);
            this.directory.lock(taskId2);
            channel1.tryLock();
            channel2.tryLock();
            Assert.fail((String)"shouldn't be able to lock already locked directory");
        }
        catch (OverlappingFileLockException overlappingFileLockException) {
        }
        finally {
            this.directory.unlock(taskId);
            this.directory.unlock(taskId2);
        }
    }

    @Test
    public void shouldReleaseTaskStateDirectoryLock() throws Exception {
        TaskId taskId = new TaskId(0, 0);
        File taskDirectory = this.directory.directoryForTask(taskId);
        this.directory.lock(taskId);
        this.directory.unlock(taskId);
        try (FileChannel channel = FileChannel.open(new File(taskDirectory, ".lock").toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);){
            channel.tryLock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws Exception {
        TaskId task0 = new TaskId(0, 0);
        TaskId task1 = new TaskId(1, 0);
        TaskId task2 = new TaskId(2, 0);
        try {
            Assert.assertTrue((boolean)new File(this.directory.directoryForTask(task0), "store").mkdir());
            Assert.assertTrue((boolean)new File(this.directory.directoryForTask(task1), "store").mkdir());
            Assert.assertTrue((boolean)new File(this.directory.directoryForTask(task2), "store").mkdir());
            this.directory.lock(task0);
            this.directory.lock(task1);
            File dir0 = new File(this.appDir, task0.toString());
            File dir1 = new File(this.appDir, task1.toString());
            File dir2 = new File(this.appDir, task2.toString());
            Set files = Arrays.stream((Object[])Objects.requireNonNull(this.directory.listAllTaskDirectories())).collect(Collectors.toSet());
            Assert.assertEquals((Object)Utils.mkSet((Object[])new File[]{dir0, dir1, dir2}), files);
            files = Arrays.stream((Object[])Objects.requireNonNull(this.directory.listNonEmptyTaskDirectories())).collect(Collectors.toSet());
            Assert.assertEquals((Object)Utils.mkSet((Object[])new File[]{dir0, dir1, dir2}), files);
            this.time.sleep(5000L);
            this.directory.cleanRemovedTasks(0L);
            files = Arrays.stream((Object[])Objects.requireNonNull(this.directory.listAllTaskDirectories())).collect(Collectors.toSet());
            Assert.assertEquals((Object)Utils.mkSet((Object[])new File[]{dir0, dir1, dir2}), files);
            files = Arrays.stream((Object[])Objects.requireNonNull(this.directory.listNonEmptyTaskDirectories())).collect(Collectors.toSet());
            Assert.assertEquals((Object)Utils.mkSet((Object[])new File[]{dir0, dir1}), files);
        }
        finally {
            this.directory.unlock(task0);
            this.directory.unlock(task1);
        }
    }

    @Test
    public void shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay() {
        File dir = this.directory.directoryForTask(new TaskId(2, 0));
        Assert.assertTrue((boolean)new File(dir, "store").mkdir());
        int cleanupDelayMs = 60000;
        this.directory.cleanRemovedTasks(60000L);
        Assert.assertTrue((boolean)dir.exists());
        Assert.assertEquals((long)1L, (long)this.directory.listAllTaskDirectories().length);
        Assert.assertEquals((long)1L, (long)this.directory.listNonEmptyTaskDirectories().length);
        this.time.sleep(61000L);
        this.directory.cleanRemovedTasks(60000L);
        Assert.assertTrue((boolean)dir.exists());
        Assert.assertEquals((long)1L, (long)this.directory.listAllTaskDirectories().length);
        Assert.assertEquals((long)0L, (long)this.directory.listNonEmptyTaskDirectories().length);
    }

    @Test
    public void shouldNotRemoveNonTaskDirectoriesAndFiles() {
        File otherDir = TestUtils.tempDirectory((Path)this.stateDir.toPath(), (String)"foo");
        this.directory.cleanRemovedTasks(0L);
        Assert.assertTrue((boolean)otherDir.exists());
    }

    @Test
    public void shouldOnlyListNonEmptyTaskDirectories() throws IOException {
        TestUtils.tempDirectory((Path)this.stateDir.toPath(), (String)"foo");
        File taskDir1 = this.directory.directoryForTask(new TaskId(0, 0));
        File taskDir2 = this.directory.directoryForTask(new TaskId(0, 1));
        File storeDir = new File(taskDir1, "store");
        Assert.assertTrue((boolean)storeDir.mkdir());
        Assert.assertEquals((Object)Utils.mkSet((Object[])new File[]{taskDir1, taskDir2}), Arrays.stream(this.directory.listAllTaskDirectories()).collect(Collectors.toSet()));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new File[]{taskDir1}), Arrays.stream(this.directory.listNonEmptyTaskDirectories()).collect(Collectors.toSet()));
        Utils.delete((File)taskDir1, Collections.singletonList(new File(taskDir1, ".lock")));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new File[]{taskDir1, taskDir2}), Arrays.stream(this.directory.listAllTaskDirectories()).collect(Collectors.toSet()));
        Assert.assertEquals(Collections.emptySet(), Arrays.stream(this.directory.listNonEmptyTaskDirectories()).collect(Collectors.toSet()));
    }

    @Test
    public void shouldCreateDirectoriesIfParentDoesntExist() {
        File tempDir = TestUtils.tempDirectory();
        final File stateDir = new File(new File(tempDir, "foo"), "state-dir");
        StateDirectory stateDirectory = new StateDirectory(new StreamsConfig((Map)new Properties(){
            {
                this.put("application.id", "applicationId");
                this.put("bootstrap.servers", "dummy:1234");
                this.put("state.dir", stateDir.getPath());
            }
        }), (Time)this.time, true);
        File taskDir = stateDirectory.directoryForTask(new TaskId(0, 0));
        Assert.assertTrue((boolean)stateDir.exists());
        Assert.assertTrue((boolean)taskDir.exists());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldLockGlobalStateDirectory() throws Exception {
        this.directory.lockGlobalState();
        try (FileChannel channel = FileChannel.open(new File(this.directory.globalStateDir(), ".lock").toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);){
            channel.lock();
            Assert.fail((String)"Should have thrown OverlappingFileLockException");
        }
        catch (OverlappingFileLockException overlappingFileLockException) {
        }
        finally {
            this.directory.unlockGlobalState();
        }
    }

    @Test
    public void shouldUnlockGlobalStateDirectory() throws Exception {
        this.directory.lockGlobalState();
        this.directory.unlockGlobalState();
        try (FileChannel channel = FileChannel.open(new File(this.directory.globalStateDir(), ".lock").toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);){
            channel.lock();
        }
    }

    @Test
    public void shouldNotLockStateDirLockedByAnotherThread() throws Exception {
        TaskId taskId = new TaskId(0, 0);
        AtomicReference exceptionOnThread = new AtomicReference();
        Thread thread = new Thread(() -> {
            try {
                this.directory.lock(taskId);
            }
            catch (IOException e) {
                exceptionOnThread.set(e);
            }
        });
        thread.start();
        thread.join(30000L);
        Assert.assertNull((String)"should not have had an exception during locking on other thread", exceptionOnThread.get());
        Assert.assertFalse((boolean)this.directory.lock(taskId));
    }

    @Test
    public void shouldNotUnLockStateDirLockedByAnotherThread() throws Exception {
        TaskId taskId = new TaskId(0, 0);
        CountDownLatch lockLatch = new CountDownLatch(1);
        CountDownLatch unlockLatch = new CountDownLatch(1);
        AtomicReference exceptionOnThread = new AtomicReference();
        Thread thread = new Thread(() -> {
            try {
                this.directory.lock(taskId);
                lockLatch.countDown();
                unlockLatch.await();
                this.directory.unlock(taskId);
            }
            catch (Exception e) {
                exceptionOnThread.set(e);
            }
        });
        thread.start();
        lockLatch.await(5L, TimeUnit.SECONDS);
        Assert.assertNull((String)"should not have had an exception on other thread", exceptionOnThread.get());
        this.directory.unlock(taskId);
        Assert.assertFalse((boolean)this.directory.lock(taskId));
        unlockLatch.countDown();
        thread.join(30000L);
        Assert.assertNull((String)"should not have had an exception on other thread", exceptionOnThread.get());
        Assert.assertTrue((boolean)this.directory.lock(taskId));
    }

    @Test
    public void shouldCleanupAllTaskDirectoriesIncludingGlobalOne() {
        TaskId id = new TaskId(1, 0);
        this.directory.directoryForTask(id);
        this.directory.globalStateDir();
        File dir0 = new File(this.appDir, id.toString());
        File globalDir = new File(this.appDir, "global");
        Assert.assertEquals((Object)Utils.mkSet((Object[])new File[]{dir0, globalDir}), Arrays.stream((Object[])Objects.requireNonNull(this.appDir.listFiles())).collect(Collectors.toSet()));
        this.directory.clean();
        Assert.assertEquals(Collections.emptySet(), Arrays.stream((Object[])Objects.requireNonNull(this.appDir.listFiles())).collect(Collectors.toSet()));
    }

    @Test
    public void shouldNotCreateBaseDirectory() throws Exception {
        this.initializeStateDirectory(false);
        Assert.assertFalse((boolean)this.stateDir.exists());
        Assert.assertFalse((boolean)this.appDir.exists());
    }

    @Test
    public void shouldNotCreateTaskStateDirectory() throws Exception {
        this.initializeStateDirectory(false);
        TaskId taskId = new TaskId(0, 0);
        File taskDirectory = this.directory.directoryForTask(taskId);
        Assert.assertFalse((boolean)taskDirectory.exists());
    }

    @Test
    public void shouldNotCreateGlobalStateDirectory() throws Exception {
        this.initializeStateDirectory(false);
        File globalStateDir = this.directory.globalStateDir();
        Assert.assertFalse((boolean)globalStateDir.exists());
    }

    @Test
    public void shouldLockTaskStateDirectoryWhenDirectoryCreationDisabled() throws Exception {
        this.initializeStateDirectory(false);
        TaskId taskId = new TaskId(0, 0);
        Assert.assertTrue((boolean)this.directory.lock(taskId));
    }

    @Test
    public void shouldLockGlobalStateDirectoryWhenDirectoryCreationDisabled() throws Exception {
        this.initializeStateDirectory(false);
        Assert.assertTrue((boolean)this.directory.lockGlobalState());
    }
}

