/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;

@Category(value={IntegrationTest.class})
public class StateDirectoryIntegrationTest {
    @Rule
    public Timeout globalTimeout = Timeout.seconds((long)600L);
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
    @Rule
    public TestName testName = new TestName();

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCleanUpStateDirIfEmpty() throws InterruptedException {
        String uniqueTestName = IntegrationTestUtils.safeUniqueTestName(this.testName);
        String input = uniqueTestName + "-input";
        CLUSTER.createTopic(input);
        Properties producerConfig = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"acks", (Object)"all"), Utils.mkEntry((Object)"key.serializer", (Object)StringSerializer.class.getCanonicalName()), Utils.mkEntry((Object)"value.serializer", (Object)StringSerializer.class.getCanonicalName())}));
        try (KafkaProducer producer = new KafkaProducer(producerConfig, Serdes.String().serializer(), Serdes.String().serializer());){
            producer.send(new ProducerRecord(input, (Object)"a"));
            producer.send(new ProducerRecord(input, (Object)"b"));
            producer.send(new ProducerRecord(input, (Object)"c"));
            String storeName = uniqueTestName + "-input-table";
            StreamsBuilder builder = new StreamsBuilder();
            builder.table(input, Materialized.as((String)storeName).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
            Topology topology = builder.build();
            String stateDir = TestUtils.tempDirectory((String)uniqueTestName).getPath();
            String applicationId = uniqueTestName + "-app";
            Properties streamsConfig = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)applicationId), Utils.mkEntry((Object)"state.dir", (Object)stateDir), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers())}));
            KafkaStreams streams = new KafkaStreams(topology, streamsConfig);
            CountDownLatch runningLatch = new CountDownLatch(1);
            CountDownLatch notRunningLatch = new CountDownLatch(1);
            KafkaStreams.StateListener stateListener = (newState, oldState) -> {
                if (newState == KafkaStreams.State.RUNNING) {
                    runningLatch.countDown();
                }
                if (newState == KafkaStreams.State.NOT_RUNNING) {
                    notRunningLatch.countDown();
                }
            };
            streams.setStateListener(stateListener);
            File appDir = new File(stateDir, applicationId);
            streams.start();
            try {
                runningLatch.await(60000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Streams didn't start in time.", e);
            }
            Assert.assertTrue((boolean)new File(stateDir).exists());
            Assert.assertTrue((boolean)appDir.exists());
            streams.close();
            try {
                notRunningLatch.await(60000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Streams didn't cleaned up in time.", e);
            }
            streams.cleanUp();
            Assert.assertTrue((boolean)new File(stateDir).exists());
            Assert.assertTrue((appDir.exists() || Arrays.stream(appDir.listFiles()).filter(f -> f.isDirectory() && f.listFiles().length > 0 && !new File((File)f, ".checkpoint").exists()).findFirst().isPresent() || Arrays.stream(appDir.listFiles()).filter(f -> f.isDirectory() && new File((File)f, ".checkpoint").length() == 0L).findFirst().isPresent() ? 1 : 0) != 0);
        }
        finally {
            CLUSTER.deleteAllTopicsAndWait(0L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNotCleanUpStateDirIfNotEmpty() throws InterruptedException {
        String uniqueTestName = IntegrationTestUtils.safeUniqueTestName(this.testName);
        String input = uniqueTestName + "-input";
        CLUSTER.createTopic(input);
        Properties producerConfig = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"acks", (Object)"all"), Utils.mkEntry((Object)"key.serializer", (Object)StringSerializer.class.getCanonicalName()), Utils.mkEntry((Object)"value.serializer", (Object)StringSerializer.class.getCanonicalName())}));
        try (KafkaProducer producer = new KafkaProducer(producerConfig, Serdes.String().serializer(), Serdes.String().serializer());){
            producer.send(new ProducerRecord(input, (Object)"a"));
            producer.send(new ProducerRecord(input, (Object)"b"));
            producer.send(new ProducerRecord(input, (Object)"c"));
            String storeName = uniqueTestName + "-input-table";
            StreamsBuilder builder = new StreamsBuilder();
            builder.table(input, Materialized.as((String)storeName).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
            Topology topology = builder.build();
            String stateDir = TestUtils.tempDirectory((String)uniqueTestName).getPath();
            String applicationId = uniqueTestName + "-app";
            Properties streamsConfig = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)applicationId), Utils.mkEntry((Object)"state.dir", (Object)stateDir), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers())}));
            KafkaStreams streams = new KafkaStreams(topology, streamsConfig);
            CountDownLatch runningLatch = new CountDownLatch(1);
            CountDownLatch notRunningLatch = new CountDownLatch(1);
            KafkaStreams.StateListener stateListener = (newState, oldState) -> {
                if (newState == KafkaStreams.State.RUNNING) {
                    runningLatch.countDown();
                }
                if (newState == KafkaStreams.State.NOT_RUNNING) {
                    notRunningLatch.countDown();
                }
            };
            streams.setStateListener(stateListener);
            File appDir = new File(stateDir, applicationId);
            streams.start();
            try {
                runningLatch.await(60000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Streams didn't start in time.", e);
            }
            Assert.assertTrue((boolean)new File(stateDir).exists());
            Assert.assertTrue((boolean)appDir.exists());
            try {
                File dummyFile = new File(appDir, "dummy");
                Files.createFile(dummyFile.toPath(), new FileAttribute[0]);
            }
            catch (IOException e) {
                throw new RuntimeException("Failed to create dummy file.", e);
            }
            streams.close();
            try {
                notRunningLatch.await(60000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Streams didn't cleaned up in time.", e);
            }
            streams.cleanUp();
            Assert.assertTrue((boolean)new File(stateDir).exists());
            Assert.assertTrue((boolean)appDir.exists());
        }
        finally {
            CLUSTER.deleteAllTopicsAndWait(0L);
        }
    }
}

