/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreamsWrapper;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

@Category(value={IntegrationTest.class})
public class JoinWithIncompleteMetadataIntegrationTest {
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    @Rule
    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
    private static final String APP_ID = "join-incomplete-metadata-integration-test";
    private static final Long COMMIT_INTERVAL = 100L;
    static final Properties STREAMS_CONFIG = new Properties();
    static final String INPUT_TOPIC_RIGHT = "inputTopicRight";
    static final String NON_EXISTENT_INPUT_TOPIC_LEFT = "inputTopicLeft-not-exist";
    static final String OUTPUT_TOPIC = "outputTopic";
    StreamsBuilder builder;
    final ValueJoiner<String, String, String> valueJoiner = (value1, value2) -> value1 + "-" + value2;
    private KTable<Long, String> rightTable;

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
        STREAMS_CONFIG.put("auto.offset.reset", "earliest");
        STREAMS_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        STREAMS_CONFIG.put("default.key.serde", Serdes.Long().getClass());
        STREAMS_CONFIG.put("default.value.serde", Serdes.String().getClass());
        STREAMS_CONFIG.put("commit.interval.ms", COMMIT_INTERVAL);
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void prepareTopology() throws InterruptedException {
        CLUSTER.createTopics(INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
        STREAMS_CONFIG.put("state.dir", this.testFolder.getRoot().getPath());
        this.builder = new StreamsBuilder();
        this.rightTable = this.builder.table(INPUT_TOPIC_RIGHT);
    }

    @After
    public void cleanup() throws InterruptedException {
        CLUSTER.deleteAllTopicsAndWait(120000L);
    }

    @Test
    public void testShouldAutoShutdownOnJoinWithIncompleteMetadata() throws InterruptedException {
        STREAMS_CONFIG.put("application.id", APP_ID);
        STREAMS_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        KStream notExistStream = this.builder.stream(NON_EXISTENT_INPUT_TOPIC_LEFT);
        KTable aggregatedTable = notExistStream.leftJoin(this.rightTable, this.valueJoiner).groupBy((key, value) -> key).reduce((value1, value2) -> value1 + value2);
        aggregatedTable.toStream().to(OUTPUT_TOPIC);
        KafkaStreamsWrapper streams = new KafkaStreamsWrapper(this.builder.build(), STREAMS_CONFIG);
        IntegrationTestUtils.StateListenerStub listener = new IntegrationTestUtils.StateListenerStub();
        streams.setStreamThreadStateListener(listener);
        streams.start();
        TestUtils.waitForCondition(listener::transitToPendingShutdownSeen, (String)"Did not seen thread state transited to PENDING_SHUTDOWN");
        streams.close();
        Assert.assertTrue((boolean)listener.transitToPendingShutdownSeen());
    }
}

