/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.test.IntegrationTest;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
@Category(value={IntegrationTest.class})
public class SuppressionDurabilityIntegrationTest {
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3, Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[0])), 0L);
    private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
    private static final Serde<String> STRING_SERDE = Serdes.String();
    private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
    private static final int COMMIT_INTERVAL = 100;
    private final boolean eosEnabled;

    public SuppressionDurabilityIntegrationTest(boolean eosEnabled) {
        this.eosEnabled = eosEnabled;
    }

    @Parameterized.Parameters(name="{index}: eosEnabled={0}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList({false}, {true});
    }

    private KTable<String, Long> buildCountsTable(String input, StreamsBuilder builder) {
        return builder.table(input, Consumed.with(STRING_SERDE, STRING_SERDE), Materialized.with(STRING_SERDE, STRING_SERDE).withCachingDisabled().withLoggingDisabled()).groupBy((k, v) -> new KeyValue(v, k), Grouped.with(STRING_SERDE, STRING_SERDE)).count(Materialized.as((String)"counts").withCachingDisabled());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldRecoverBufferAfterShutdown() {
        String testId = "-shouldRecoverBufferAfterShutdown";
        String appId = this.getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-shouldRecoverBufferAfterShutdown";
        String input = "input-shouldRecoverBufferAfterShutdown";
        String outputSuppressed = "output-suppressed-shouldRecoverBufferAfterShutdown";
        String outputRaw = "output-raw-shouldRecoverBufferAfterShutdown";
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input-shouldRecoverBufferAfterShutdown", "output-raw-shouldRecoverBufferAfterShutdown", "output-suppressed-shouldRecoverBufferAfterShutdown");
        StreamsBuilder builder = new StreamsBuilder();
        KTable<String, Long> valueCounts = this.buildCountsTable("input-shouldRecoverBufferAfterShutdown", builder);
        KStream suppressedCounts = valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(Long.MAX_VALUE), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxRecords((long)3L).emitEarlyWhenFull())).toStream();
        AtomicInteger eventCount = new AtomicInteger(0);
        suppressedCounts.foreach((key, value) -> eventCount.incrementAndGet());
        suppressedCounts.to("output-suppressed-shouldRecoverBufferAfterShutdown", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().to("output-raw-shouldRecoverBufferAfterShutdown", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Properties streamsConfig = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)appId), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"poll.ms", (Object)Integer.toString(100)), Utils.mkEntry((Object)"commit.interval.ms", (Object)Integer.toString(100)), Utils.mkEntry((Object)"processing.guarantee", (Object)(this.eosEnabled ? "exactly_once" : "at_least_once"))}));
        KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
        try {
            this.produceSynchronously("input-shouldRecoverBufferAfterShutdown", Arrays.asList(new KeyValueTimestamp<String, String>("k1", "v1", this.scaledTime(1L)), new KeyValueTimestamp<String, String>("k2", "v2", this.scaledTime(2L)), new KeyValueTimestamp<String, String>("k3", "v3", this.scaledTime(3L))));
            this.verifyOutput("output-raw-shouldRecoverBufferAfterShutdown", Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 1L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>("v2", 1L, this.scaledTime(2L)), new KeyValueTimestamp<String, Long>("v3", 1L, this.scaledTime(3L))));
            MatcherAssert.assertThat((Object)eventCount.get(), (Matcher)CoreMatchers.is((Object)0));
            this.produceSynchronously("input-shouldRecoverBufferAfterShutdown", Arrays.asList(new KeyValueTimestamp<String, String>("k4", "v4", this.scaledTime(4L)), new KeyValueTimestamp<String, String>("k5", "v5", this.scaledTime(5L))));
            this.verifyOutput("output-raw-shouldRecoverBufferAfterShutdown", Arrays.asList(new KeyValueTimestamp<String, Long>("v4", 1L, this.scaledTime(4L)), new KeyValueTimestamp<String, Long>("v5", 1L, this.scaledTime(5L))));
            MatcherAssert.assertThat((Object)eventCount.get(), (Matcher)CoreMatchers.is((Object)2));
            this.verifyOutput("output-suppressed-shouldRecoverBufferAfterShutdown", Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 1L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>("v2", 1L, this.scaledTime(2L))));
            driver.close();
            MatcherAssert.assertThat((Object)driver.state(), (Matcher)CoreMatchers.is((Object)KafkaStreams.State.NOT_RUNNING));
            driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, false);
            this.produceSynchronously("input-shouldRecoverBufferAfterShutdown", Arrays.asList(new KeyValueTimestamp<String, String>("k6", "v6", this.scaledTime(6L)), new KeyValueTimestamp<String, String>("k7", "v7", this.scaledTime(7L)), new KeyValueTimestamp<String, String>("k8", "v8", this.scaledTime(8L))));
            this.verifyOutput("output-raw-shouldRecoverBufferAfterShutdown", Arrays.asList(new KeyValueTimestamp<String, Long>("v6", 1L, this.scaledTime(6L)), new KeyValueTimestamp<String, Long>("v7", 1L, this.scaledTime(7L)), new KeyValueTimestamp<String, Long>("v8", 1L, this.scaledTime(8L))));
            MatcherAssert.assertThat((Object)eventCount.get(), (Matcher)CoreMatchers.is((Object)5));
            this.verifyOutput("output-suppressed-shouldRecoverBufferAfterShutdown", Arrays.asList(new KeyValueTimestamp<String, Long>("v3", 1L, this.scaledTime(3L)), new KeyValueTimestamp<String, Long>("v4", 1L, this.scaledTime(4L)), new KeyValueTimestamp<String, Long>("v5", 1L, this.scaledTime(5L))));
        }
        finally {
            driver.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, driver);
        }
    }

    private void verifyOutput(String topic, List<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
        Properties properties = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"group.id", (Object)"test-group"), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"key.deserializer", (Object)STRING_DESERIALIZER.getClass().getName()), Utils.mkEntry((Object)"value.deserializer", (Object)LONG_DESERIALIZER.getClass().getName())}));
        IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, keyValueTimestamps);
    }

    private long scaledTime(long unscaledTime) {
        return 200L * unscaledTime;
    }

    private void produceSynchronously(String topic, List<KeyValueTimestamp<String, String>> toProduce) {
        Properties producerConfig = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"client.id", (Object)"anything"), Utils.mkEntry((Object)"key.serializer", (Object)STRING_SERIALIZER.getClass().getName()), Utils.mkEntry((Object)"value.serializer", (Object)STRING_SERIALIZER.getClass().getName()), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers())}));
        IntegrationTestUtils.produceSynchronously(producerConfig, false, topic, toProduce);
    }
}

