/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
@Category(value={IntegrationTest.class})
public class EOSUncleanShutdownIntegrationTest {
    @Rule
    public Timeout globalTimeout = Timeout.seconds((long)600L);
    @Parameterized.Parameter
    public String eosConfig;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
    @ClassRule
    public static final TemporaryFolder TEST_FOLDER = new TemporaryFolder(TestUtils.tempDirectory());
    private static final Properties STREAMS_CONFIG = new Properties();
    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
    private static final Long COMMIT_INTERVAL = 100L;
    private static final int RECORD_TOTAL = 3;

    @Parameterized.Parameters(name="{0}")
    public static Collection<String[]> data() {
        return Arrays.asList({"exactly_once"}, {"exactly_once_v2"});
    }

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
        STREAMS_CONFIG.put("auto.offset.reset", "earliest");
        STREAMS_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        STREAMS_CONFIG.put("default.key.serde", Serdes.String().getClass());
        STREAMS_CONFIG.put("default.value.serde", Serdes.String().getClass());
        STREAMS_CONFIG.put("commit.interval.ms", COMMIT_INTERVAL);
        STREAMS_CONFIG.put("state.dir", TEST_FOLDER.getRoot().getPath());
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedException {
        String appId = "shouldWorkWithUncleanShutdownWipeOutStateStore";
        STREAMS_CONFIG.put("application.id", "shouldWorkWithUncleanShutdownWipeOutStateStore");
        STREAMS_CONFIG.put("processing.guarantee", this.eosConfig);
        String input = "input-topic";
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input-topic");
        StreamsBuilder builder = new StreamsBuilder();
        KStream inputStream = builder.stream("input-topic");
        AtomicInteger recordCount = new AtomicInteger(0);
        KTable valueCounts = inputStream.groupByKey().aggregate(() -> "()", (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")", Materialized.as((String)"aggregated_value"));
        valueCounts.toStream().peek((key, value) -> {
            if (recordCount.incrementAndGet() >= 3) {
                throw new IllegalStateException("Crash on the 3 record");
            }
        });
        Properties producerConfig = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"client.id", (Object)"anything"), Utils.mkEntry((Object)"key.serializer", (Object)STRING_SERIALIZER.getClass().getName()), Utils.mkEntry((Object)"value.serializer", (Object)STRING_SERIALIZER.getClass().getName()), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers())}));
        KafkaStreams driver = new KafkaStreams(builder.build(), STREAMS_CONFIG);
        driver.cleanUp();
        driver.start();
        File taskStateDir = new File(String.join((CharSequence)"/", TEST_FOLDER.getRoot().getPath(), "shouldWorkWithUncleanShutdownWipeOutStateStore", "0_0"));
        File taskCheckpointFile = new File(taskStateDir, ".checkpoint");
        try {
            IntegrationTestUtils.produceSynchronously(producerConfig, false, "input-topic", Optional.empty(), Collections.singletonList(new KeyValueTimestamp<String, String>("k1", "v1", 0L)));
            TestUtils.waitForCondition(() -> taskStateDir.exists() && taskStateDir.isDirectory() && taskStateDir.list().length > 0, (String)"Failed awaiting CreateTopics first request failure");
            IntegrationTestUtils.produceSynchronously(producerConfig, false, "input-topic", Optional.empty(), Arrays.asList(new KeyValueTimestamp<String, String>("k2", "v2", 1L), new KeyValueTimestamp<String, String>("k3", "v3", 2L)));
            TestUtils.waitForCondition(() -> recordCount.get() == 3, (String)("Expected 3 records processed but only got " + recordCount.get()));
        }
        catch (Throwable throwable) {
            TestUtils.waitForCondition(() -> driver.state().equals((Object)KafkaStreams.State.ERROR), (String)("Expected ERROR state but driver is on " + driver.state()));
            driver.close();
            Assert.assertTrue((!taskStateDir.exists() || taskStateDir.exists() && taskStateDir.list().length > 0 && !taskCheckpointFile.exists() || taskCheckpointFile.exists() && taskCheckpointFile.length() == 0L ? 1 : 0) != 0);
            IntegrationTestUtils.quietlyCleanStateAfterTest(CLUSTER, driver);
            throw throwable;
        }
        TestUtils.waitForCondition(() -> driver.state().equals((Object)KafkaStreams.State.ERROR), (String)("Expected ERROR state but driver is on " + driver.state()));
        driver.close();
        Assert.assertTrue((!taskStateDir.exists() || taskStateDir.exists() && taskStateDir.list().length > 0 && !taskCheckpointFile.exists() || taskCheckpointFile.exists() && taskCheckpointFile.length() == 0L ? 1 : 0) != 0);
        IntegrationTestUtils.quietlyCleanStateAfterTest(CLUSTER, driver);
    }
}

