/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import kafka.utils.MockTime;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
@Category(value={IntegrationTest.class})
public class GlobalKTableEOSIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final Properties BROKER_CONFIG = new Properties();
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER;
    @Parameterized.Parameter
    public String eosConfig;
    private final MockTime mockTime;
    private final KeyValueMapper<String, Long, Long> keyMapper;
    private final ValueJoiner<Long, String, String> joiner;
    private final String globalStore = "globalStore";
    private final Map<String, String> results;
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String globalTableTopic;
    private String streamTopic;
    private GlobalKTable<Long, String> globalTable;
    private KStream<String, Long> stream;
    private ForeachAction<String, String> foreachAction;
    @Rule
    public TestName testName;

    public GlobalKTableEOSIntegrationTest() {
        this.mockTime = GlobalKTableEOSIntegrationTest.CLUSTER.time;
        this.keyMapper = (key, value) -> value;
        this.joiner = (value1, value2) -> value1 + "+" + value2;
        this.globalStore = "globalStore";
        this.results = new HashMap<String, String>();
        this.testName = new TestName();
    }

    @Parameterized.Parameters(name="{0}")
    public static Collection<String[]> data() {
        return Arrays.asList({"exactly_once"}, {"exactly_once_beta"});
    }

    @Before
    public void before() throws Exception {
        this.builder = new StreamsBuilder();
        this.createTopics();
        this.streamsConfiguration = new Properties();
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.streamsConfiguration.put("application.id", "app-" + safeTestName);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("cache.max.bytes.buffering", (Object)0L);
        this.streamsConfiguration.put("commit.interval.ms", (Object)100L);
        this.streamsConfiguration.put("processing.guarantee", this.eosConfig);
        this.streamsConfiguration.put("task.timeout.ms", (Object)1L);
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("session.timeout.ms", (Object)1000);
        this.streamsConfiguration.put("heartbeat.interval.ms", (Object)300);
        this.streamsConfiguration.put("request.timeout.ms", (Object)5000);
        this.globalTable = this.builder.globalTable(this.globalTableTopic, Consumed.with((Serde)Serdes.Long(), (Serde)Serdes.String()), Materialized.as((String)"globalStore").withKeySerde(Serdes.Long()).withValueSerde(Serdes.String()));
        Consumed stringLongConsumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Long());
        this.stream = this.builder.stream(this.streamTopic, stringLongConsumed);
        this.foreachAction = this.results::put;
    }

    @After
    public void after() throws Exception {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
        KStream streamTableJoin = this.stream.leftJoin(this.globalTable, this.keyMapper, this.joiner);
        streamTableJoin.foreach(this.foreachAction);
        this.produceInitialGlobalTableValues();
        this.startStreams();
        this.produceTopicValues(this.streamTopic);
        HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("a", "1+A");
        expected.put("b", "2+B");
        expected.put("c", "3+C");
        expected.put("d", "4+D");
        expected.put("e", "5+null");
        TestUtils.waitForCondition(() -> this.results.equals(expected), (long)30000L, () -> "waiting for initial values;\n  expected: " + expected + "\n  received: " + this.results);
        this.produceGlobalTableValues();
        ReadOnlyKeyValueStore replicatedStore = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore("globalStore", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        Assert.assertNotNull((Object)replicatedStore);
        HashMap<Long, String> expectedState = new HashMap<Long, String>();
        expectedState.put(1L, "F");
        expectedState.put(2L, "G");
        expectedState.put(3L, "H");
        expectedState.put(4L, "I");
        expectedState.put(5L, "J");
        HashMap globalState = new HashMap();
        TestUtils.waitForCondition(() -> {
            globalState.clear();
            replicatedStore.all().forEachRemaining(pair -> {
                String cfr_ignored_0 = (String)globalState.put(pair.key, pair.value);
            });
            return globalState.equals(expectedState);
        }, (long)30000L, () -> "waiting for data in replicated store\n  expected: " + expectedState + "\n  received: " + globalState);
        this.produceTopicValues(this.streamTopic);
        expected.put("a", "1+F");
        expected.put("b", "2+G");
        expected.put("c", "3+H");
        expected.put("d", "4+I");
        expected.put("e", "5+J");
        TestUtils.waitForCondition(() -> this.results.equals(expected), (long)30000L, () -> "waiting for final values\n  expected: " + expected + "\n  received: " + this.results);
    }

    @Test
    public void shouldKStreamGlobalKTableJoin() throws Exception {
        KStream streamTableJoin = this.stream.join(this.globalTable, this.keyMapper, this.joiner);
        streamTableJoin.foreach(this.foreachAction);
        this.produceInitialGlobalTableValues();
        this.startStreams();
        this.produceTopicValues(this.streamTopic);
        HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("a", "1+A");
        expected.put("b", "2+B");
        expected.put("c", "3+C");
        expected.put("d", "4+D");
        TestUtils.waitForCondition(() -> this.results.equals(expected), (long)30000L, () -> "waiting for initial values\n  expected: " + expected + "\n  received: " + this.results);
        this.produceGlobalTableValues();
        ReadOnlyKeyValueStore replicatedStore = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore("globalStore", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        Assert.assertNotNull((Object)replicatedStore);
        HashMap<Long, String> expectedState = new HashMap<Long, String>();
        expectedState.put(1L, "F");
        expectedState.put(2L, "G");
        expectedState.put(3L, "H");
        expectedState.put(4L, "I");
        expectedState.put(5L, "J");
        HashMap globalState = new HashMap();
        TestUtils.waitForCondition(() -> {
            globalState.clear();
            replicatedStore.all().forEachRemaining(pair -> {
                String cfr_ignored_0 = (String)globalState.put(pair.key, pair.value);
            });
            return globalState.equals(expectedState);
        }, (long)30000L, () -> "waiting for data in replicated store\n  expected: " + expectedState + "\n  received: " + globalState);
        this.produceTopicValues(this.streamTopic);
        expected.put("a", "1+F");
        expected.put("b", "2+G");
        expected.put("c", "3+H");
        expected.put("d", "4+I");
        expected.put("e", "5+J");
        TestUtils.waitForCondition(() -> this.results.equals(expected), (long)30000L, () -> "waiting for final values\n  expected: " + expected + "\n  received: " + this.results);
    }

    @Test
    public void shouldRestoreTransactionalMessages() throws Exception {
        this.produceInitialGlobalTableValues();
        this.startStreams();
        HashMap<Long, String> expected = new HashMap<Long, String>();
        expected.put(1L, "A");
        expected.put(2L, "B");
        expected.put(3L, "C");
        expected.put(4L, "D");
        ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore("globalStore", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        Assert.assertNotNull((Object)store);
        HashMap result = new HashMap();
        TestUtils.waitForCondition(() -> {
            result.clear();
            KeyValueIterator it = store.all();
            while (it.hasNext()) {
                KeyValue kv = (KeyValue)it.next();
                result.put(kv.key, kv.value);
            }
            return result.equals(expected);
        }, (long)30000L, () -> "waiting for initial values\n  expected: " + expected + "\n  received: " + result);
    }

    @Test
    public void shouldSkipOverTxMarkersOnRestore() throws Exception {
        this.shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(false);
    }

    @Test
    public void shouldSkipOverAbortedMessagesOnRestore() throws Exception {
        this.shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(true);
    }

    private void shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(boolean appendAbortedMessages) throws Exception {
        this.produceInitialGlobalTableValues();
        String stateDir = this.streamsConfiguration.getProperty("state.dir");
        File globalStateDir = new File(stateDir + File.separator + this.streamsConfiguration.getProperty("application.id") + File.separator + "global");
        Assert.assertTrue((boolean)globalStateDir.mkdirs());
        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(globalStateDir, ".checkpoint"));
        checkpoint.write(Collections.singletonMap(new TopicPartition(this.globalTableTopic, 1), 1L));
        if (appendAbortedMessages) {
            final AtomicReference error = new AtomicReference();
            this.startStreams(new StateRestoreListener(){

                public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {
                    try {
                        GlobalKTableEOSIntegrationTest.this.produceAbortedMessages();
                    }
                    catch (Exception fatal) {
                        error.set(fatal);
                    }
                }

                public void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) {
                }

                public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {
                }
            });
            Exception fatal = (Exception)error.get();
            if (fatal != null) {
                throw fatal;
            }
        } else {
            this.startStreams();
        }
        HashMap<Long, String> expected = new HashMap<Long, String>();
        expected.put(1L, "A");
        expected.put(2L, "B");
        expected.put(4L, "D");
        ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore("globalStore", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        Assert.assertNotNull((Object)store);
        HashMap storeContent = new HashMap();
        TestUtils.waitForCondition(() -> {
            storeContent.clear();
            KeyValueIterator it = store.all();
            while (it.hasNext()) {
                KeyValue kv = (KeyValue)it.next();
                storeContent.put(kv.key, kv.value);
            }
            return storeContent.equals(expected);
        }, (long)30000L, () -> "waiting for initial values\n  expected: " + expected + "\n  received: " + storeContent);
    }

    @Test
    public void shouldNotRestoreAbortedMessages() throws Exception {
        this.produceAbortedMessages();
        this.produceInitialGlobalTableValues();
        this.produceAbortedMessages();
        this.startStreams();
        HashMap<Long, String> expected = new HashMap<Long, String>();
        expected.put(1L, "A");
        expected.put(2L, "B");
        expected.put(3L, "C");
        expected.put(4L, "D");
        ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore("globalStore", this.kafkaStreams, QueryableStoreTypes.keyValueStore());
        Assert.assertNotNull((Object)store);
        HashMap storeContent = new HashMap();
        TestUtils.waitForCondition(() -> {
            storeContent.clear();
            store.all().forEachRemaining(pair -> {
                String cfr_ignored_0 = (String)storeContent.put(pair.key, pair.value);
            });
            return storeContent.equals(expected);
        }, (long)30000L, () -> "waiting for initial values\n  expected: " + expected + "\n  received: " + storeContent);
    }

    private void createTopics() throws Exception {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.streamTopic = "stream-" + safeTestName;
        this.globalTableTopic = "globalTable-" + safeTestName;
        CLUSTER.createTopics(this.streamTopic);
        CLUSTER.createTopic(this.globalTableTopic, 2, 1);
    }

    private void startStreams() {
        this.startStreams(null);
    }

    private void startStreams(StateRestoreListener stateRestoreListener) {
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.kafkaStreams.setGlobalStateRestoreListener(stateRestoreListener);
        this.kafkaStreams.start();
    }

    private void produceTopicValues(String topic) {
        Properties config = new Properties();
        config.setProperty("enable.idempotence", "true");
        IntegrationTestUtils.produceKeyValuesSynchronously(topic, Arrays.asList(new KeyValue((Object)"a", (Object)1L), new KeyValue((Object)"b", (Object)2L), new KeyValue((Object)"c", (Object)3L), new KeyValue((Object)"d", (Object)4L), new KeyValue((Object)"e", (Object)5L)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class, (Properties)config), (Time)this.mockTime);
    }

    private void produceAbortedMessages() throws Exception {
        Properties properties = new Properties();
        properties.put("transactional.id", "someid");
        IntegrationTestUtils.produceAbortedKeyValuesSynchronouslyWithTimestamp(this.globalTableTopic, Arrays.asList(new KeyValue((Object)1L, (Object)"A"), new KeyValue((Object)2L, (Object)"B"), new KeyValue((Object)3L, (Object)"C"), new KeyValue((Object)4L, (Object)"D")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class, (Properties)properties), this.mockTime.milliseconds());
    }

    private void produceInitialGlobalTableValues() {
        Properties properties = new Properties();
        properties.put("transactional.id", "someid");
        IntegrationTestUtils.produceKeyValuesSynchronously(this.globalTableTopic, Arrays.asList(new KeyValue((Object)1L, (Object)"A"), new KeyValue((Object)2L, (Object)"B"), new KeyValue((Object)3L, (Object)"C"), new KeyValue((Object)4L, (Object)"D")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class, (Properties)properties), (Time)this.mockTime, true);
    }

    private void produceGlobalTableValues() {
        Properties config = new Properties();
        config.setProperty("enable.idempotence", "true");
        IntegrationTestUtils.produceKeyValuesSynchronously(this.globalTableTopic, Arrays.asList(new KeyValue((Object)1L, (Object)"F"), new KeyValue((Object)2L, (Object)"G"), new KeyValue((Object)3L, (Object)"H"), new KeyValue((Object)4L, (Object)"I"), new KeyValue((Object)5L, (Object)"J")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class, (Properties)config), (Time)this.mockTime);
    }

    static {
        BROKER_CONFIG.put("transaction.state.log.replication.factor", (Object)1);
        BROKER_CONFIG.put("transaction.state.log.min.isr", (Object)1);
        CLUSTER = new EmbeddedKafkaCluster(1, BROKER_CONFIG);
    }
}

