/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kafka.admin.AdminUtils;
import kafka.log.LogConfig;
import kafka.utils.MockTime;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import scala.Tuple2;
import scala.collection.Map;

public class InternalTopicIntegrationTest {
    private static final int NUM_BROKERS = 1;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final MockTime mockTime;
    private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
    private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10000;
    private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8000;
    private Properties streamsConfiguration;
    private String applicationId;

    public InternalTopicIntegrationTest() {
        this.mockTime = InternalTopicIntegrationTest.CLUSTER.time;
        this.applicationId = "compact-topics-integration-test";
    }

    @BeforeClass
    public static void startKafkaCluster() throws Exception {
        CLUSTER.createTopic(DEFAULT_INPUT_TOPIC);
        CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
    }

    @Before
    public void before() {
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", this.applicationId);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("key.serde", Serdes.String().getClass().getName());
        this.streamsConfiguration.put("value.serde", Serdes.String().getClass().getName());
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("commit.interval.ms", (Object)100);
        this.streamsConfiguration.put("cache.max.bytes.buffering", (Object)0);
        this.streamsConfiguration.put("internal.leave.group.on.close", (Object)true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Properties getTopicConfigProperties(String changelog) {
        try (ZkClient zkClient = new ZkClient(CLUSTER.zKConnectString(), 10000, 8000, (ZkSerializer)ZKStringSerializer$.MODULE$);){
            boolean isSecure = false;
            ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(CLUSTER.zKConnectString()), false);
            Map topicConfigs = AdminUtils.fetchAllTopicConfigs((ZkUtils)zkUtils);
            for (Tuple2 topicConfig : topicConfigs) {
                String topic = (String)topicConfig._1;
                Properties prop = (Properties)topicConfig._2;
                if (!topic.equals(changelog)) continue;
                Properties properties = prop;
                return properties;
            }
            Properties properties = new Properties();
            return properties;
        }
    }

    @Test
    public void shouldCompactTopicsForStateChangelogs() throws Exception {
        Serde stringSerde = Serdes.String();
        Serde longSerde = Serdes.Long();
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", "compact-topics-integration-test");
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("key.serde", Serdes.String().getClass().getName());
        streamsConfiguration.put("value.serde", Serdes.String().getClass().getName());
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        streamsConfiguration.put("auto.offset.reset", "earliest");
        KStreamBuilder builder = new KStreamBuilder();
        KStream textLines = builder.stream(new String[]{DEFAULT_INPUT_TOPIC});
        KStream wordCounts = textLines.flatMapValues((ValueMapper)new ValueMapper<String, Iterable<String>>(){

            public Iterable<String> apply(String value) {
                return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
            }
        }).groupBy(MockKeyValueMapper.SelectValueMapper()).count("Counts").toStream();
        wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC);
        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, streamsConfiguration);
        streams.start();
        this.produceData(Arrays.asList("hello", "world", "world", "hello world"));
        streams.close();
        Properties properties = this.getTopicConfigProperties(ProcessorStateManager.storeChangelogTopic((String)this.applicationId, (String)"Counts"));
        Assert.assertEquals((Object)LogConfig.Compact(), (Object)properties.getProperty(LogConfig.CleanupPolicyProp()));
    }

    private void produceData(List<String> inputValues) throws ExecutionException, InterruptedException {
        Properties producerConfig = new Properties();
        producerConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        producerConfig.put("acks", "all");
        producerConfig.put("retries", (Object)0);
        producerConfig.put("key.serializer", StringSerializer.class);
        producerConfig.put("value.serializer", StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig, (Time)this.mockTime);
    }

    @Test
    public void shouldUseCompactAndDeleteForWindowStoreChangelogs() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        KStream textLines = builder.stream(new String[]{DEFAULT_INPUT_TOPIC});
        int durationMs = 2000;
        textLines.flatMapValues((ValueMapper)new ValueMapper<String, Iterable<String>>(){

            public Iterable<String> apply(String value) {
                return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
            }
        }).groupBy(MockKeyValueMapper.SelectValueMapper()).count((Windows)TimeWindows.of((long)1000L).until(2000L), "CountWindows").toStream();
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, this.streamsConfiguration);
        streams.start();
        this.produceData(Arrays.asList("hello", "world", "world", "hello world"));
        streams.close();
        Properties properties = this.getTopicConfigProperties(ProcessorStateManager.storeChangelogTopic((String)this.applicationId, (String)"CountWindows"));
        List<String> policies = Arrays.asList(properties.getProperty(LogConfig.CleanupPolicyProp()).split(","));
        Assert.assertEquals((long)2L, (long)policies.size());
        Assert.assertTrue((boolean)policies.contains(LogConfig.Compact()));
        Assert.assertTrue((boolean)policies.contains(LogConfig.Delete()));
        long retention = TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS) + 2000L;
        Assert.assertEquals((long)retention, (long)Long.parseLong(properties.getProperty(LogConfig.RetentionMsProp())));
    }
}

