/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.AbstractJoinIntegrationTest;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Category(value={IntegrationTest.class})
@RunWith(value=Parameterized.class)
public class StreamStreamJoinIntegrationTest
extends AbstractJoinIntegrationTest {
    private KStream<Long, String> leftStream;
    private KStream<Long, String> rightStream;

    public StreamStreamJoinIntegrationTest(boolean cacheEnabled) {
        super(cacheEnabled);
    }

    @Before
    public void prepareTopology() throws InterruptedException {
        super.prepareEnvironment();
        appID = "stream-stream-join-integration-test";
        this.builder = new StreamsBuilder();
        this.leftStream = this.builder.stream("inputTopicLeft");
        this.rightStream = this.builder.stream("inputTopicRight");
    }

    @Test
    public void shouldNotAccessJoinStoresWhenGivingName() throws InterruptedException {
        STREAMS_CONFIG.put("application.id", appID + "-no-store-access");
        StreamsBuilder builder = new StreamsBuilder();
        KStream left = builder.stream("inputTopicLeft", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        KStream right = builder.stream("inputTopicRight", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        CountDownLatch latch = new CountDownLatch(1);
        left.join(right, (value1, value2) -> value1 + value2, JoinWindows.of((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.Integer(), (Serde)Serdes.Integer()).withStoreName("join-store"));
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), STREAMS_CONFIG);){
            kafkaStreams.setStateListener((newState, oldState) -> {
                if (newState == KafkaStreams.State.RUNNING) {
                    latch.countDown();
                }
            });
            kafkaStreams.start();
            latch.await();
            Assert.assertThrows(InvalidStateStoreException.class, () -> {
                ReadOnlyKeyValueStore cfr_ignored_0 = (ReadOnlyKeyValueStore)kafkaStreams.store("join-store", QueryableStoreTypes.keyValueStore());
            });
        }
    }

    @Test
    public void testInner() throws Exception {
        STREAMS_CONFIG.put("application.id", appID + "-inner");
        List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(null, null, null, Collections.singletonList(new TestRecord((Object)0L, (Object)"A-a", null, Long.valueOf(4L))), Collections.singletonList(new TestRecord((Object)0L, (Object)"B-a", null, Long.valueOf(5L))), Arrays.asList(new TestRecord((Object)0L, (Object)"A-b", null, Long.valueOf(6L)), new TestRecord((Object)0L, (Object)"B-b", null, Long.valueOf(6L))), null, null, Arrays.asList(new TestRecord((Object)0L, (Object)"C-a", null, Long.valueOf(9L)), new TestRecord((Object)0L, (Object)"C-b", null, Long.valueOf(9L))), Arrays.asList(new TestRecord((Object)0L, (Object)"A-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"B-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"C-c", null, Long.valueOf(10L))), null, null, null, Arrays.asList(new TestRecord((Object)0L, (Object)"A-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"B-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"C-d", null, Long.valueOf(14L))), Arrays.asList(new TestRecord((Object)0L, (Object)"D-a", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-b", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-c", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-d", null, Long.valueOf(15L))));
        this.leftStream.join(this.rightStream, this.valueJoiner, JoinWindows.of((Duration)Duration.ofSeconds(10L))).to("outputTopic");
        this.runTestWithDriver(expectedResult);
    }

    @Test
    public void testInnerRepartitioned() throws Exception {
        STREAMS_CONFIG.put("application.id", appID + "-inner-repartitioned");
        List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(null, null, null, Collections.singletonList(new TestRecord((Object)0L, (Object)"A-a", null, Long.valueOf(4L))), Collections.singletonList(new TestRecord((Object)0L, (Object)"B-a", null, Long.valueOf(5L))), Arrays.asList(new TestRecord((Object)0L, (Object)"A-b", null, Long.valueOf(6L)), new TestRecord((Object)0L, (Object)"B-b", null, Long.valueOf(6L))), null, null, Arrays.asList(new TestRecord((Object)0L, (Object)"C-a", null, Long.valueOf(9L)), new TestRecord((Object)0L, (Object)"C-b", null, Long.valueOf(9L))), Arrays.asList(new TestRecord((Object)0L, (Object)"A-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"B-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"C-c", null, Long.valueOf(10L))), null, null, null, Arrays.asList(new TestRecord((Object)0L, (Object)"A-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"B-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"C-d", null, Long.valueOf(14L))), Arrays.asList(new TestRecord((Object)0L, (Object)"D-a", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-b", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-c", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-d", null, Long.valueOf(15L))));
        this.leftStream.map(MockMapper.noOpKeyValueMapper()).join(this.rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper()).selectKey(MockMapper.selectKeyKeyValueMapper()), this.valueJoiner, JoinWindows.of((Duration)Duration.ofSeconds(10L))).to("outputTopic");
        this.runTestWithDriver(expectedResult);
    }

    @Test
    public void testLeft() throws Exception {
        STREAMS_CONFIG.put("application.id", appID + "-left");
        List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(null, null, Collections.singletonList(new TestRecord((Object)0L, (Object)"A-null", null, Long.valueOf(3L))), Collections.singletonList(new TestRecord((Object)0L, (Object)"A-a", null, Long.valueOf(4L))), Collections.singletonList(new TestRecord((Object)0L, (Object)"B-a", null, Long.valueOf(5L))), Arrays.asList(new TestRecord((Object)0L, (Object)"A-b", null, Long.valueOf(6L)), new TestRecord((Object)0L, (Object)"B-b", null, Long.valueOf(6L))), null, null, Arrays.asList(new TestRecord((Object)0L, (Object)"C-a", null, Long.valueOf(9L)), new TestRecord((Object)0L, (Object)"C-b", null, Long.valueOf(9L))), Arrays.asList(new TestRecord((Object)0L, (Object)"A-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"B-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"C-c", null, Long.valueOf(10L))), null, null, null, Arrays.asList(new TestRecord((Object)0L, (Object)"A-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"B-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"C-d", null, Long.valueOf(14L))), Arrays.asList(new TestRecord((Object)0L, (Object)"D-a", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-b", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-c", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-d", null, Long.valueOf(15L))));
        this.leftStream.leftJoin(this.rightStream, this.valueJoiner, JoinWindows.of((Duration)Duration.ofSeconds(10L))).to("outputTopic");
        this.runTestWithDriver(expectedResult);
    }

    @Test
    public void testLeftRepartitioned() throws Exception {
        STREAMS_CONFIG.put("application.id", appID + "-left-repartitioned");
        List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(null, null, Collections.singletonList(new TestRecord((Object)0L, (Object)"A-null", null, Long.valueOf(3L))), Collections.singletonList(new TestRecord((Object)0L, (Object)"A-a", null, Long.valueOf(4L))), Collections.singletonList(new TestRecord((Object)0L, (Object)"B-a", null, Long.valueOf(5L))), Arrays.asList(new TestRecord((Object)0L, (Object)"A-b", null, Long.valueOf(6L)), new TestRecord((Object)0L, (Object)"B-b", null, Long.valueOf(6L))), null, null, Arrays.asList(new TestRecord((Object)0L, (Object)"C-a", null, Long.valueOf(9L)), new TestRecord((Object)0L, (Object)"C-b", null, Long.valueOf(9L))), Arrays.asList(new TestRecord((Object)0L, (Object)"A-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"B-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"C-c", null, Long.valueOf(10L))), null, null, null, Arrays.asList(new TestRecord((Object)0L, (Object)"A-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"B-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"C-d", null, Long.valueOf(14L))), Arrays.asList(new TestRecord((Object)0L, (Object)"D-a", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-b", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-c", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-d", null, Long.valueOf(15L))));
        this.leftStream.map(MockMapper.noOpKeyValueMapper()).leftJoin(this.rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper()).selectKey(MockMapper.selectKeyKeyValueMapper()), this.valueJoiner, JoinWindows.of((Duration)Duration.ofSeconds(10L))).to("outputTopic");
        this.runTestWithDriver(expectedResult);
    }

    @Test
    public void testOuter() throws Exception {
        STREAMS_CONFIG.put("application.id", appID + "-outer");
        List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(null, null, Collections.singletonList(new TestRecord((Object)0L, (Object)"A-null", null, Long.valueOf(3L))), Collections.singletonList(new TestRecord((Object)0L, (Object)"A-a", null, Long.valueOf(4L))), Collections.singletonList(new TestRecord((Object)0L, (Object)"B-a", null, Long.valueOf(5L))), Arrays.asList(new TestRecord((Object)0L, (Object)"A-b", null, Long.valueOf(6L)), new TestRecord((Object)0L, (Object)"B-b", null, Long.valueOf(6L))), null, null, Arrays.asList(new TestRecord((Object)0L, (Object)"C-a", null, Long.valueOf(9L)), new TestRecord((Object)0L, (Object)"C-b", null, Long.valueOf(9L))), Arrays.asList(new TestRecord((Object)0L, (Object)"A-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"B-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"C-c", null, Long.valueOf(10L))), null, null, null, Arrays.asList(new TestRecord((Object)0L, (Object)"A-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"B-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"C-d", null, Long.valueOf(14L))), Arrays.asList(new TestRecord((Object)0L, (Object)"D-a", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-b", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-c", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-d", null, Long.valueOf(15L))));
        this.leftStream.outerJoin(this.rightStream, this.valueJoiner, JoinWindows.of((Duration)Duration.ofSeconds(10L))).to("outputTopic");
        this.runTestWithDriver(expectedResult);
    }

    @Test
    public void testOuterRepartitioned() throws Exception {
        STREAMS_CONFIG.put("application.id", appID + "-outer");
        List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(null, null, Collections.singletonList(new TestRecord((Object)0L, (Object)"A-null", null, Long.valueOf(3L))), Collections.singletonList(new TestRecord((Object)0L, (Object)"A-a", null, Long.valueOf(4L))), Collections.singletonList(new TestRecord((Object)0L, (Object)"B-a", null, Long.valueOf(5L))), Arrays.asList(new TestRecord((Object)0L, (Object)"A-b", null, Long.valueOf(6L)), new TestRecord((Object)0L, (Object)"B-b", null, Long.valueOf(6L))), null, null, Arrays.asList(new TestRecord((Object)0L, (Object)"C-a", null, Long.valueOf(9L)), new TestRecord((Object)0L, (Object)"C-b", null, Long.valueOf(9L))), Arrays.asList(new TestRecord((Object)0L, (Object)"A-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"B-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"C-c", null, Long.valueOf(10L))), null, null, null, Arrays.asList(new TestRecord((Object)0L, (Object)"A-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"B-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"C-d", null, Long.valueOf(14L))), Arrays.asList(new TestRecord((Object)0L, (Object)"D-a", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-b", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-c", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-d", null, Long.valueOf(15L))));
        this.leftStream.map(MockMapper.noOpKeyValueMapper()).outerJoin(this.rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper()).selectKey(MockMapper.selectKeyKeyValueMapper()), this.valueJoiner, JoinWindows.of((Duration)Duration.ofSeconds(10L))).to("outputTopic");
        this.runTestWithDriver(expectedResult);
    }

    @Test
    public void testMultiInner() throws Exception {
        STREAMS_CONFIG.put("application.id", appID + "-multi-inner");
        List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(null, null, null, Collections.singletonList(new TestRecord((Object)0L, (Object)"A-a-a", null, Long.valueOf(4L))), Collections.singletonList(new TestRecord((Object)0L, (Object)"B-a-a", null, Long.valueOf(5L))), Arrays.asList(new TestRecord((Object)0L, (Object)"A-b-a", null, Long.valueOf(6L)), new TestRecord((Object)0L, (Object)"B-b-a", null, Long.valueOf(6L)), new TestRecord((Object)0L, (Object)"A-a-b", null, Long.valueOf(6L)), new TestRecord((Object)0L, (Object)"B-a-b", null, Long.valueOf(6L)), new TestRecord((Object)0L, (Object)"A-b-b", null, Long.valueOf(6L)), new TestRecord((Object)0L, (Object)"B-b-b", null, Long.valueOf(6L))), null, null, Arrays.asList(new TestRecord((Object)0L, (Object)"C-a-a", null, Long.valueOf(9L)), new TestRecord((Object)0L, (Object)"C-a-b", null, Long.valueOf(9L)), new TestRecord((Object)0L, (Object)"C-b-a", null, Long.valueOf(9L)), new TestRecord((Object)0L, (Object)"C-b-b", null, Long.valueOf(9L))), Arrays.asList(new TestRecord((Object)0L, (Object)"A-c-a", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"A-c-b", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"B-c-a", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"B-c-b", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"C-c-a", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"C-c-b", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"A-a-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"B-a-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"A-b-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"B-b-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"C-a-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"C-b-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"A-c-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"B-c-c", null, Long.valueOf(10L)), new TestRecord((Object)0L, (Object)"C-c-c", null, Long.valueOf(10L))), null, null, null, Arrays.asList(new TestRecord((Object)0L, (Object)"A-d-a", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"A-d-b", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"A-d-c", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"B-d-a", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"B-d-b", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"B-d-c", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"C-d-a", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"C-d-b", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"C-d-c", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"A-a-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"B-a-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"A-b-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"B-b-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"C-a-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"C-b-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"A-c-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"B-c-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"C-c-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"A-d-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"B-d-d", null, Long.valueOf(14L)), new TestRecord((Object)0L, (Object)"C-d-d", null, Long.valueOf(14L))), Arrays.asList(new TestRecord((Object)0L, (Object)"D-a-a", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-a-b", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-a-c", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-a-d", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-b-a", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-b-b", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-b-c", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-b-d", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-c-a", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-c-b", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-c-c", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-c-d", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-d-a", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-d-b", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-d-c", null, Long.valueOf(15L)), new TestRecord((Object)0L, (Object)"D-d-d", null, Long.valueOf(15L))));
        this.leftStream.join(this.rightStream, this.valueJoiner, JoinWindows.of((Duration)Duration.ofSeconds(10L))).join(this.rightStream, this.valueJoiner, JoinWindows.of((Duration)Duration.ofSeconds(10L))).to("outputTopic");
        this.runTestWithDriver(expectedResult);
    }
}

