/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Category(value={IntegrationTest.class})
@RunWith(value=Parameterized.class)
public abstract class AbstractJoinIntegrationTest {
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    @Rule
    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
    static String appID;
    private static final Long COMMIT_INTERVAL;
    static final Properties STREAMS_CONFIG;
    static final String INPUT_TOPIC_RIGHT = "inputTopicRight";
    static final String INPUT_TOPIC_LEFT = "inputTopicLeft";
    static final String OUTPUT_TOPIC = "outputTopic";
    static final long ANY_UNIQUE_KEY = 0L;
    private static final Properties PRODUCER_CONFIG;
    private static final Properties RESULT_CONSUMER_CONFIG;
    private KafkaProducer<Long, String> producer;
    private KafkaStreams streams;
    StreamsBuilder builder;
    int numRecordsExpected = 0;
    AtomicBoolean finalResultReached = new AtomicBoolean(false);
    private final List<Input<String>> input = Arrays.asList(new Input<Object>("inputTopicLeft", null), new Input<Object>("inputTopicRight", null), new Input<String>("inputTopicLeft", "A"), new Input<String>("inputTopicRight", "a"), new Input<String>("inputTopicLeft", "B"), new Input<String>("inputTopicRight", "b"), new Input<Object>("inputTopicLeft", null), new Input<Object>("inputTopicRight", null), new Input<String>("inputTopicLeft", "C"), new Input<String>("inputTopicRight", "c"), new Input<Object>("inputTopicRight", null), new Input<Object>("inputTopicLeft", null), new Input<Object>("inputTopicRight", null), new Input<String>("inputTopicRight", "d"), new Input<String>("inputTopicLeft", "D"));
    final ValueJoiner<String, String, String> valueJoiner = (value1, value2) -> value1 + "-" + value2;
    final boolean cacheEnabled;

    @Parameterized.Parameters(name="caching enabled = {0}")
    public static Collection<Object[]> data() {
        ArrayList<Object[]> values = new ArrayList<Object[]>();
        for (boolean cacheEnabled : Arrays.asList(true, false)) {
            values.add(new Object[]{cacheEnabled});
        }
        return values;
    }

    AbstractJoinIntegrationTest(boolean cacheEnabled) {
        this.cacheEnabled = cacheEnabled;
    }

    @BeforeClass
    public static void setupConfigsAndUtils() {
        PRODUCER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        PRODUCER_CONFIG.put("acks", "all");
        PRODUCER_CONFIG.put("retries", (Object)0);
        PRODUCER_CONFIG.put("key.serializer", LongSerializer.class);
        PRODUCER_CONFIG.put("value.serializer", StringSerializer.class);
        RESULT_CONSUMER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        RESULT_CONSUMER_CONFIG.put("group.id", appID + "-result-consumer");
        RESULT_CONSUMER_CONFIG.put("auto.offset.reset", "earliest");
        RESULT_CONSUMER_CONFIG.put("key.deserializer", LongDeserializer.class);
        RESULT_CONSUMER_CONFIG.put("value.deserializer", StringDeserializer.class);
        STREAMS_CONFIG.put("auto.offset.reset", "earliest");
        STREAMS_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        STREAMS_CONFIG.put("default.key.serde", Serdes.Long().getClass());
        STREAMS_CONFIG.put("default.value.serde", Serdes.String().getClass());
        STREAMS_CONFIG.put("commit.interval.ms", COMMIT_INTERVAL);
    }

    void prepareEnvironment() throws InterruptedException {
        CLUSTER.createTopics(INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
        if (!this.cacheEnabled) {
            STREAMS_CONFIG.put("cache.max.bytes.buffering", (Object)0);
        }
        STREAMS_CONFIG.put("state.dir", this.testFolder.getRoot().getPath());
        this.producer = new KafkaProducer(PRODUCER_CONFIG);
    }

    @After
    public void cleanup() throws InterruptedException {
        this.producer.close(Duration.ofMillis(0L));
        CLUSTER.deleteAllTopicsAndWait(120000L);
    }

    private void checkResult(String outputTopic, List<KeyValueTimestamp<Long, String>> expectedResult) throws InterruptedException {
        IntegrationTestUtils.verifyKeyValueTimestamps(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult);
    }

    private void checkResult(String outputTopic, KeyValueTimestamp<Long, String> expectedFinalResult, int expectedTotalNumRecords) throws InterruptedException {
        List result = IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedTotalNumRecords, 30000L);
        MatcherAssert.assertThat(result.get(result.size() - 1), (Matcher)Is.is(expectedFinalResult));
    }

    void runTest(List<List<KeyValueTimestamp<Long, String>>> expectedResult) throws Exception {
        this.runTest(expectedResult, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void runTest(List<List<KeyValueTimestamp<Long, String>>> expectedResult, String storeName) throws Exception {
        assert (expectedResult.size() == this.input.size());
        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
        this.streams = new KafkaStreams(this.builder.build(), STREAMS_CONFIG);
        KeyValueTimestamp expectedFinalResult = null;
        try {
            long firstTimestamp;
            this.streams.start();
            long ts = firstTimestamp = System.currentTimeMillis();
            Iterator<List<KeyValueTimestamp<Long, String>>> resultIterator = expectedResult.iterator();
            for (Input<String> singleInput : this.input) {
                this.producer.send(new ProducerRecord(singleInput.topic, null, Long.valueOf(++ts), singleInput.record.key, singleInput.record.value)).get();
                List<KeyValueTimestamp<Long, String>> expected = resultIterator.next();
                if (expected == null) continue;
                LinkedList<KeyValueTimestamp<Long, String>> updatedExpected = new LinkedList<KeyValueTimestamp<Long, String>>();
                for (KeyValueTimestamp<Long, String> record : expected) {
                    updatedExpected.add(new KeyValueTimestamp<Long, String>(record.key(), record.value(), firstTimestamp + record.timestamp()));
                }
                this.checkResult(OUTPUT_TOPIC, updatedExpected);
                expectedFinalResult = (KeyValueTimestamp)updatedExpected.get(expected.size() - 1);
            }
            if (storeName != null) {
                this.checkQueryableStore(storeName, expectedFinalResult);
            }
        }
        finally {
            this.streams.close();
        }
    }

    void runTest(KeyValueTimestamp<Long, String> expectedFinalResult) throws Exception {
        this.runTest(expectedFinalResult, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void runTest(KeyValueTimestamp<Long, String> expectedFinalResult, String storeName) throws Exception {
        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
        this.streams = new KafkaStreams(this.builder.build(), STREAMS_CONFIG);
        try {
            long firstTimestamp;
            this.streams.start();
            long ts = firstTimestamp = System.currentTimeMillis();
            for (Input<String> singleInput : this.input) {
                this.producer.send(new ProducerRecord(singleInput.topic, null, Long.valueOf(++ts), singleInput.record.key, singleInput.record.value)).get();
            }
            TestUtils.waitForCondition(() -> this.finalResultReached.get(), (String)"Never received expected final result.");
            KeyValueTimestamp<Long, String> updatedExpectedFinalResult = new KeyValueTimestamp<Long, String>(expectedFinalResult.key(), expectedFinalResult.value(), firstTimestamp + expectedFinalResult.timestamp());
            this.checkResult(OUTPUT_TOPIC, updatedExpectedFinalResult, this.numRecordsExpected);
            if (storeName != null) {
                this.checkQueryableStore(storeName, updatedExpectedFinalResult);
            }
        }
        finally {
            this.streams.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkQueryableStore(String queryableName, KeyValueTimestamp<Long, String> expectedFinalResult) {
        ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)this.streams.store(queryableName, QueryableStoreTypes.timestampedKeyValueStore());
        KeyValueIterator all = store.all();
        KeyValue onlyEntry = (KeyValue)all.next();
        try {
            MatcherAssert.assertThat((Object)onlyEntry.key, (Matcher)Is.is((Object)expectedFinalResult.key()));
            MatcherAssert.assertThat((Object)((ValueAndTimestamp)onlyEntry.value).value(), (Matcher)Is.is((Object)expectedFinalResult.value()));
            MatcherAssert.assertThat((Object)((ValueAndTimestamp)onlyEntry.value).timestamp(), (Matcher)Is.is((Object)expectedFinalResult.timestamp()));
            MatcherAssert.assertThat((Object)all.hasNext(), (Matcher)Is.is((Object)false));
        }
        finally {
            all.close();
        }
    }

    static {
        COMMIT_INTERVAL = 100L;
        STREAMS_CONFIG = new Properties();
        PRODUCER_CONFIG = new Properties();
        RESULT_CONSUMER_CONFIG = new Properties();
    }

    private final class Input<V> {
        String topic;
        KeyValue<Long, V> record;

        Input(String topic, V value) {
            this.topic = topic;
            this.record = KeyValue.pair((Object)0L, value);
        }
    }
}

