/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kafka.log.LogConfig;
import kafka.utils.MockTime;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class InternalTopicIntegrationTest {
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final String APP_ID = "internal-topics-integration-test";
    private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
    private static final String DEFAULT_INPUT_TABLE_TOPIC = "inputTable";
    private final MockTime mockTime;
    private Properties streamsProp;

    public InternalTopicIntegrationTest() {
        this.mockTime = InternalTopicIntegrationTest.CLUSTER.time;
    }

    @BeforeClass
    public static void startCluster() throws IOException, InterruptedException {
        CLUSTER.start();
        CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_INPUT_TABLE_TOPIC);
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void before() {
        this.streamsProp = new Properties();
        this.streamsProp.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsProp.put("default.key.serde", Serdes.String().getClass().getName());
        this.streamsProp.put("default.value.serde", Serdes.String().getClass().getName());
        this.streamsProp.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsProp.put("commit.interval.ms", (Object)100L);
        this.streamsProp.put("cache.max.bytes.buffering", (Object)0);
        this.streamsProp.put("auto.offset.reset", "earliest");
    }

    @After
    public void after() throws IOException {
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsProp);
    }

    private void produceData(List<String> inputValues) {
        Properties producerProp = new Properties();
        producerProp.put("bootstrap.servers", CLUSTER.bootstrapServers());
        producerProp.put("acks", "all");
        producerProp.put("key.serializer", StringSerializer.class);
        producerProp.put("value.serializer", StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerProp, (Time)this.mockTime);
    }

    private Properties getTopicProperties(String changelog) {
        Throwable throwable = null;
        try (Admin adminClient = this.createAdminClient();){
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, changelog);
            try {
                Config config = (Config)((KafkaFuture)adminClient.describeConfigs(Collections.singletonList(configResource)).values().get(configResource)).get();
                Properties properties = new Properties();
                for (ConfigEntry configEntry : config.entries()) {
                    if (configEntry.source() != ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) continue;
                    properties.put(configEntry.name(), configEntry.value());
                }
                Properties properties2 = properties;
                return properties2;
            }
            catch (InterruptedException | ExecutionException e) {
                try {
                    throw new RuntimeException(e);
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
            }
        }
    }

    private Admin createAdminClient() {
        Properties adminClientConfig = new Properties();
        adminClientConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        return Admin.create((Properties)adminClientConfig);
    }

    @Test
    public void shouldGetToRunningWithWindowedTableInFKJ() throws Exception {
        String appID = "internal-topics-integration-test-windowed-FKJ";
        this.streamsProp.put("application.id", "internal-topics-integration-test-windowed-FKJ");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream inputTopic = streamsBuilder.stream(DEFAULT_INPUT_TOPIC);
        KTable inputTable = streamsBuilder.table(DEFAULT_INPUT_TABLE_TOPIC);
        inputTopic.groupBy((k, v) -> k, Grouped.with((String)"GroupName", (Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMinutes(10L))).aggregate(() -> "", (k, v, a) -> a + k).leftJoin(inputTable, v -> v, (x, y) -> x + y);
        KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), this.streamsProp);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(streams), Duration.ofSeconds(60L));
    }

    @Test
    public void shouldCompactTopicsForKeyValueStoreChangelogs() {
        String appID = "internal-topics-integration-test-compact";
        this.streamsProp.put("application.id", "internal-topics-integration-test-compact");
        StreamsBuilder builder = new StreamsBuilder();
        KStream textLines = builder.stream(DEFAULT_INPUT_TOPIC);
        textLines.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))).groupBy(MockMapper.selectValueMapper()).count(Materialized.as((String)"Counts"));
        KafkaStreams streams = new KafkaStreams(builder.build(), this.streamsProp);
        streams.start();
        this.produceData(Arrays.asList("hello", "world", "world", "hello world"));
        IntegrationTestUtils.waitForCompletion(streams, 2, 30000L);
        streams.close();
        Properties changelogProps = this.getTopicProperties(ProcessorStateManager.storeChangelogTopic((String)"internal-topics-integration-test-compact", (String)"Counts", null));
        Assert.assertEquals((Object)LogConfig.Compact(), (Object)changelogProps.getProperty(LogConfig.CleanupPolicyProp()));
        Properties repartitionProps = this.getTopicProperties("internal-topics-integration-test-compact-Counts-repartition");
        Assert.assertEquals((Object)LogConfig.Delete(), (Object)repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
        Assert.assertEquals((long)4L, (long)repartitionProps.size());
    }

    @Test
    public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs() {
        String appID = "internal-topics-integration-test-compact-delete";
        this.streamsProp.put("application.id", "internal-topics-integration-test-compact-delete");
        StreamsBuilder builder = new StreamsBuilder();
        KStream textLines = builder.stream(DEFAULT_INPUT_TOPIC);
        int durationMs = 2000;
        textLines.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))).groupBy(MockMapper.selectValueMapper()).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofSeconds(1L)).grace(Duration.ofMillis(0L))).count(Materialized.as((String)"CountWindows").withRetention(Duration.ofSeconds(2L)));
        KafkaStreams streams = new KafkaStreams(builder.build(), this.streamsProp);
        streams.start();
        this.produceData(Arrays.asList("hello", "world", "world", "hello world"));
        IntegrationTestUtils.waitForCompletion(streams, 2, 30000L);
        streams.close();
        Properties properties = this.getTopicProperties(ProcessorStateManager.storeChangelogTopic((String)"internal-topics-integration-test-compact-delete", (String)"CountWindows", null));
        List<String> policies = Arrays.asList(properties.getProperty(LogConfig.CleanupPolicyProp()).split(","));
        Assert.assertEquals((long)2L, (long)policies.size());
        Assert.assertTrue((boolean)policies.contains(LogConfig.Compact()));
        Assert.assertTrue((boolean)policies.contains(LogConfig.Delete()));
        long retention = TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS) + 2000L;
        Assert.assertEquals((long)retention, (long)Long.parseLong(properties.getProperty(LogConfig.RetentionMsProp())));
        Properties repartitionProps = this.getTopicProperties("internal-topics-integration-test-compact-delete-CountWindows-repartition");
        Assert.assertEquals((Object)LogConfig.Delete(), (Object)repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
        Assert.assertEquals((long)4L, (long)repartitionProps.size());
    }
}

