/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;

@Tag(value="integration")
public class HandlingSourceTopicDeletionIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final int NUM_THREADS = 2;
    private static final long TIMEOUT = 60000L;
    private static final String INPUT_TOPIC = "inputTopic";
    private static final String OUTPUT_TOPIC = "outputTopic";
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void before() throws InterruptedException {
        CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC);
    }

    @AfterEach
    public void after() throws InterruptedException {
        CLUSTER.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC);
    }

    @Test
    public void shouldThrowErrorAfterSourceTopicDeleted(TestInfo testName) throws InterruptedException {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(INPUT_TOPIC, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).to(OUTPUT_TOPIC, Produced.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(testName);
        String appId = "app-" + safeTestName;
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", appId);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        streamsConfiguration.put("num.stream.threads", (Object)2);
        streamsConfiguration.put("metadata.max.age.ms", (Object)2000);
        Topology topology = builder.build();
        KafkaStreams kafkaStreams1 = new KafkaStreams(topology, streamsConfiguration);
        AtomicBoolean calledUncaughtExceptionHandler1 = new AtomicBoolean(false);
        kafkaStreams1.setUncaughtExceptionHandler(exception -> {
            calledUncaughtExceptionHandler1.set(true);
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
        });
        kafkaStreams1.start();
        KafkaStreams kafkaStreams2 = new KafkaStreams(topology, streamsConfiguration);
        AtomicBoolean calledUncaughtExceptionHandler2 = new AtomicBoolean(false);
        kafkaStreams2.setUncaughtExceptionHandler(exception -> {
            calledUncaughtExceptionHandler2.set(true);
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
        });
        kafkaStreams2.start();
        TestUtils.waitForCondition(() -> kafkaStreams1.state() == KafkaStreams.State.RUNNING && kafkaStreams2.state() == KafkaStreams.State.RUNNING, (long)60000L, () -> "Kafka Streams clients did not reach state RUNNING");
        CLUSTER.deleteTopicAndWait(INPUT_TOPIC);
        TestUtils.waitForCondition(() -> kafkaStreams1.state() == KafkaStreams.State.ERROR && kafkaStreams2.state() == KafkaStreams.State.ERROR, (long)60000L, () -> "Kafka Streams clients did not reach state ERROR");
        MatcherAssert.assertThat((Object)calledUncaughtExceptionHandler1.get(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)calledUncaughtExceptionHandler2.get(), (Matcher)CoreMatchers.is((Object)true));
    }
}

