/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class SuppressionIntegrationTest {
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[0])), 0L);
    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
    private static final Serde<String> STRING_SERDE = Serdes.String();
    private static final int COMMIT_INTERVAL = 100;

    private static KTable<String, Long> buildCountsTable(String input, StreamsBuilder builder) {
        return builder.table(input, Consumed.with(STRING_SERDE, STRING_SERDE), Materialized.with(STRING_SERDE, STRING_SERDE).withCachingDisabled().withLoggingDisabled()).groupBy((k, v) -> new KeyValue(v, k), Grouped.with(STRING_SERDE, STRING_SERDE)).count(Materialized.as((String)"counts").withCachingDisabled());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldUseDefaultSerdes() {
        String testId = "-shouldInheritSerdes";
        String appId = this.getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-shouldInheritSerdes";
        String input = "input-shouldInheritSerdes";
        String outputSuppressed = "output-suppressed-shouldInheritSerdes";
        String outputRaw = "output-raw-shouldInheritSerdes";
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input-shouldInheritSerdes", "output-raw-shouldInheritSerdes", "output-suppressed-shouldInheritSerdes");
        StreamsBuilder builder = new StreamsBuilder();
        KStream inputStream = builder.stream("input-shouldInheritSerdes");
        KTable valueCounts = inputStream.groupByKey().aggregate(() -> "()", (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")");
        valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(Long.MAX_VALUE), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxRecords((long)1L).emitEarlyWhenFull())).toStream().to("output-suppressed-shouldInheritSerdes");
        valueCounts.toStream().to("output-raw-shouldInheritSerdes");
        Properties streamsConfig = SuppressionIntegrationTest.getStreamsConfig(appId);
        streamsConfig.put("default.key.serde", Serdes.StringSerde.class);
        streamsConfig.put("default.value.serde", Serdes.StringSerde.class);
        KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
        try {
            SuppressionIntegrationTest.produceSynchronously("input-shouldInheritSerdes", Arrays.asList(new KeyValueTimestamp<String, String>("k1", "v1", SuppressionIntegrationTest.scaledTime(0L)), new KeyValueTimestamp<String, String>("k1", "v2", SuppressionIntegrationTest.scaledTime(1L)), new KeyValueTimestamp<String, String>("k2", "v1", SuppressionIntegrationTest.scaledTime(2L)), new KeyValueTimestamp<String, String>("x", "x", SuppressionIntegrationTest.scaledTime(3L))));
            boolean rawRecords = SuppressionIntegrationTest.waitForAnyRecord("output-raw-shouldInheritSerdes");
            boolean suppressedRecords = SuppressionIntegrationTest.waitForAnyRecord("output-suppressed-shouldInheritSerdes");
            MatcherAssert.assertThat((Object)rawRecords, (Matcher)Matchers.is((Object)true));
            MatcherAssert.assertThat((Object)suppressedRecords, (Matcher)CoreMatchers.is((Object)true));
        }
        finally {
            driver.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, driver);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldInheritSerdes() {
        String testId = "-shouldInheritSerdes";
        String appId = this.getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-shouldInheritSerdes";
        String input = "input-shouldInheritSerdes";
        String outputSuppressed = "output-suppressed-shouldInheritSerdes";
        String outputRaw = "output-raw-shouldInheritSerdes";
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input-shouldInheritSerdes", "output-raw-shouldInheritSerdes", "output-suppressed-shouldInheritSerdes");
        StreamsBuilder builder = new StreamsBuilder();
        KStream inputStream = builder.stream("input-shouldInheritSerdes");
        KTable valueCounts = inputStream.groupByKey().count();
        valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(Long.MAX_VALUE), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxRecords((long)1L).emitEarlyWhenFull())).toStream().to("output-suppressed-shouldInheritSerdes");
        valueCounts.toStream().to("output-raw-shouldInheritSerdes");
        Properties streamsConfig = SuppressionIntegrationTest.getStreamsConfig(appId);
        streamsConfig.put("default.key.serde", Serdes.StringSerde.class);
        streamsConfig.put("default.value.serde", Serdes.StringSerde.class);
        KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
        try {
            SuppressionIntegrationTest.produceSynchronously("input-shouldInheritSerdes", Arrays.asList(new KeyValueTimestamp<String, String>("k1", "v1", SuppressionIntegrationTest.scaledTime(0L)), new KeyValueTimestamp<String, String>("k1", "v2", SuppressionIntegrationTest.scaledTime(1L)), new KeyValueTimestamp<String, String>("k2", "v1", SuppressionIntegrationTest.scaledTime(2L)), new KeyValueTimestamp<String, String>("x", "x", SuppressionIntegrationTest.scaledTime(3L))));
            boolean rawRecords = SuppressionIntegrationTest.waitForAnyRecord("output-raw-shouldInheritSerdes");
            boolean suppressedRecords = SuppressionIntegrationTest.waitForAnyRecord("output-suppressed-shouldInheritSerdes");
            MatcherAssert.assertThat((Object)rawRecords, (Matcher)Matchers.is((Object)true));
            MatcherAssert.assertThat((Object)suppressedRecords, (Matcher)CoreMatchers.is((Object)true));
        }
        finally {
            driver.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, driver);
        }
    }

    private static boolean waitForAnyRecord(String topic) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        properties.put("enable.auto.commit", (Object)false);
        try (KafkaConsumer consumer = new KafkaConsumer(properties);){
            List partitions = consumer.partitionsFor(topic).stream().map(pi -> new TopicPartition(pi.topic(), pi.partition())).collect(Collectors.toList());
            consumer.assign(partitions);
            consumer.seekToBeginning(partitions);
            long start = System.currentTimeMillis();
            while (System.currentTimeMillis() - start < 60000L) {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(500L));
                if (records.isEmpty()) continue;
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldShutdownWhenRecordConstraintIsViolated() throws InterruptedException {
        String testId = "-shouldShutdownWhenRecordConstraintIsViolated";
        String appId = this.getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-shouldShutdownWhenRecordConstraintIsViolated";
        String input = "input-shouldShutdownWhenRecordConstraintIsViolated";
        String outputSuppressed = "output-suppressed-shouldShutdownWhenRecordConstraintIsViolated";
        String outputRaw = "output-raw-shouldShutdownWhenRecordConstraintIsViolated";
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input-shouldShutdownWhenRecordConstraintIsViolated", "output-raw-shouldShutdownWhenRecordConstraintIsViolated", "output-suppressed-shouldShutdownWhenRecordConstraintIsViolated");
        StreamsBuilder builder = new StreamsBuilder();
        KTable<String, Long> valueCounts = SuppressionIntegrationTest.buildCountsTable("input-shouldShutdownWhenRecordConstraintIsViolated", builder);
        valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(Long.MAX_VALUE), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxRecords((long)1L).shutDownWhenFull())).toStream().to("output-suppressed-shouldShutdownWhenRecordConstraintIsViolated", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().to("output-raw-shouldShutdownWhenRecordConstraintIsViolated", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Properties streamsConfig = SuppressionIntegrationTest.getStreamsConfig(appId);
        KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
        try {
            SuppressionIntegrationTest.produceSynchronously("input-shouldShutdownWhenRecordConstraintIsViolated", Arrays.asList(new KeyValueTimestamp<String, String>("k1", "v1", SuppressionIntegrationTest.scaledTime(0L)), new KeyValueTimestamp<String, String>("k1", "v2", SuppressionIntegrationTest.scaledTime(1L)), new KeyValueTimestamp<String, String>("k2", "v1", SuppressionIntegrationTest.scaledTime(2L)), new KeyValueTimestamp<String, String>("x", "x", SuppressionIntegrationTest.scaledTime(3L))));
            SuppressionIntegrationTest.verifyErrorShutdown(driver);
        }
        finally {
            driver.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, driver);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldShutdownWhenBytesConstraintIsViolated() throws InterruptedException {
        String testId = "-shouldShutdownWhenBytesConstraintIsViolated";
        String appId = this.getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-shouldShutdownWhenBytesConstraintIsViolated";
        String input = "input-shouldShutdownWhenBytesConstraintIsViolated";
        String outputSuppressed = "output-suppressed-shouldShutdownWhenBytesConstraintIsViolated";
        String outputRaw = "output-raw-shouldShutdownWhenBytesConstraintIsViolated";
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input-shouldShutdownWhenBytesConstraintIsViolated", "output-raw-shouldShutdownWhenBytesConstraintIsViolated", "output-suppressed-shouldShutdownWhenBytesConstraintIsViolated");
        StreamsBuilder builder = new StreamsBuilder();
        KTable<String, Long> valueCounts = SuppressionIntegrationTest.buildCountsTable("input-shouldShutdownWhenBytesConstraintIsViolated", builder);
        valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(Long.MAX_VALUE), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxBytes((long)200L).shutDownWhenFull())).toStream().to("output-suppressed-shouldShutdownWhenBytesConstraintIsViolated", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().to("output-raw-shouldShutdownWhenBytesConstraintIsViolated", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Properties streamsConfig = SuppressionIntegrationTest.getStreamsConfig(appId);
        KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
        try {
            SuppressionIntegrationTest.produceSynchronously("input-shouldShutdownWhenBytesConstraintIsViolated", Arrays.asList(new KeyValueTimestamp<String, String>("k1", "v1", SuppressionIntegrationTest.scaledTime(0L)), new KeyValueTimestamp<String, String>("k1", "v2", SuppressionIntegrationTest.scaledTime(1L)), new KeyValueTimestamp<String, String>("k2", "v1", SuppressionIntegrationTest.scaledTime(2L)), new KeyValueTimestamp<String, String>("x", "x", SuppressionIntegrationTest.scaledTime(3L))));
            SuppressionIntegrationTest.verifyErrorShutdown(driver);
        }
        finally {
            driver.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, driver);
        }
    }

    private static Properties getStreamsConfig(String appId) {
        return Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)appId), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"poll.ms", (Object)Integer.toString(100)), Utils.mkEntry((Object)"commit.interval.ms", (Object)Integer.toString(100)), Utils.mkEntry((Object)"processing.guarantee", (Object)"at_least_once"), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getPath())}));
    }

    private static long scaledTime(long unscaledTime) {
        return 200L * unscaledTime;
    }

    private static void produceSynchronously(String topic, List<KeyValueTimestamp<String, String>> toProduce) {
        Properties producerConfig = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"client.id", (Object)"anything"), Utils.mkEntry((Object)"key.serializer", (Object)STRING_SERIALIZER.getClass().getName()), Utils.mkEntry((Object)"value.serializer", (Object)STRING_SERIALIZER.getClass().getName()), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers())}));
        IntegrationTestUtils.produceSynchronously(producerConfig, false, topic, Optional.empty(), toProduce);
    }

    private static void verifyErrorShutdown(KafkaStreams driver) throws InterruptedException {
        TestUtils.waitForCondition(() -> !driver.state().isRunningOrRebalancing(), (long)60000L, (String)"Streams didn't shut down.");
        MatcherAssert.assertThat((Object)driver.state(), (Matcher)CoreMatchers.is((Object)KafkaStreams.State.ERROR));
    }
}

