/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class KStreamRepartitionJoinTest {
    private static final int NUM_BROKERS = 1;
    private static final long COMMIT_INTERVAL_MS = 300L;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    public static final ValueJoiner<Object, Object, String> TOSTRING_JOINER = MockValueJoiner.instance(":");
    private final MockTime mockTime;
    private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS);
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KStream<Long, Integer> streamOne;
    private KStream<Integer, String> streamTwo;
    private KStream<Integer, String> streamFour;
    private KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>> keyMapper;
    private final List<String> expectedStreamOneTwoJoin;
    private KafkaStreams kafkaStreams;
    private String streamOneInput;
    private String streamTwoInput;
    private String streamFourInput;
    private static volatile int testNo = 0;

    public KStreamRepartitionJoinTest() {
        this.mockTime = KStreamRepartitionJoinTest.CLUSTER.time;
        this.expectedStreamOneTwoJoin = Arrays.asList("1:A", "2:B", "3:C", "4:D", "5:E");
    }

    @Before
    public void before() throws InterruptedException {
        String applicationId = "kstream-repartition-join-test-" + ++testNo;
        this.builder = new StreamsBuilder();
        this.createTopics();
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", applicationId);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("commit.interval.ms", (Object)300L);
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("num.stream.threads", (Object)3);
        this.streamsConfiguration.put("internal.leave.group.on.close", (Object)true);
        this.streamOne = this.builder.stream(this.streamOneInput, Consumed.with((Serde)Serdes.Long(), (Serde)Serdes.Integer()));
        this.streamTwo = this.builder.stream(this.streamTwoInput, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        this.streamFour = this.builder.stream(this.streamFourInput, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        this.keyMapper = MockMapper.selectValueKeyValueMapper();
    }

    @After
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache() throws Exception {
        this.verifyRepartitionOnJoinOperations(0);
    }

    @Test
    public void shouldCorrectlyRepartitionOnJoinOperationsWithNonZeroSizedCache() throws Exception {
        this.verifyRepartitionOnJoinOperations(0xA00000);
    }

    private void verifyRepartitionOnJoinOperations(int cacheSizeBytes) throws Exception {
        this.streamsConfiguration.put("cache.max.bytes.buffering", (Object)cacheSizeBytes);
        this.produceMessages();
        ExpectedOutputOnTopic mapOne = this.mapStreamOneAndJoin();
        ExpectedOutputOnTopic mapBoth = this.mapBothStreamsAndJoin();
        ExpectedOutputOnTopic mapMapJoin = this.mapMapJoin();
        ExpectedOutputOnTopic selectKeyJoin = this.selectKeyAndJoin();
        ExpectedOutputOnTopic flatMapJoin = this.flatMapJoin();
        ExpectedOutputOnTopic mapRhs = this.joinMappedRhsStream();
        ExpectedOutputOnTopic mapJoinJoin = this.joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined();
        ExpectedOutputOnTopic leftJoin = this.mapBothStreamsAndLeftJoin();
        this.startStreams();
        this.verifyCorrectOutput(mapOne);
        this.verifyCorrectOutput(mapBoth);
        this.verifyCorrectOutput(mapMapJoin);
        this.verifyCorrectOutput(selectKeyJoin);
        this.verifyCorrectOutput(flatMapJoin);
        this.verifyCorrectOutput(mapRhs);
        this.verifyCorrectOutput(mapJoinJoin);
        this.verifyCorrectOutput(leftJoin);
    }

    private ExpectedOutputOnTopic mapStreamOneAndJoin() throws InterruptedException {
        String mapOneStreamAndJoinOutput = "map-one-join-output-" + testNo;
        this.doJoin((KStream<Integer, Integer>)this.streamOne.map(this.keyMapper), this.streamTwo, mapOneStreamAndJoinOutput);
        return new ExpectedOutputOnTopic(this.expectedStreamOneTwoJoin, mapOneStreamAndJoinOutput);
    }

    private ExpectedOutputOnTopic mapBothStreamsAndJoin() throws InterruptedException {
        KStream map1 = this.streamOne.map(this.keyMapper);
        KStream map2 = this.streamTwo.map(MockMapper.noOpKeyValueMapper());
        this.doJoin((KStream<Integer, Integer>)map1, (KStream<Integer, String>)map2, "map-both-streams-and-join-" + testNo);
        return new ExpectedOutputOnTopic(this.expectedStreamOneTwoJoin, "map-both-streams-and-join-" + testNo);
    }

    private ExpectedOutputOnTopic mapMapJoin() throws InterruptedException {
        KStream mapMapStream = this.streamOne.map((KeyValueMapper)new KeyValueMapper<Long, Integer, KeyValue<Long, Integer>>(){

            public KeyValue<Long, Integer> apply(Long key, Integer value) {
                if (value == null) {
                    return new KeyValue(null, null);
                }
                return new KeyValue((Object)(key + (long)value.intValue()), (Object)value);
            }
        }).map(this.keyMapper);
        String outputTopic = "map-map-join-" + testNo;
        this.doJoin((KStream<Integer, Integer>)mapMapStream, this.streamTwo, outputTopic);
        return new ExpectedOutputOnTopic(this.expectedStreamOneTwoJoin, outputTopic);
    }

    private ExpectedOutputOnTopic selectKeyAndJoin() throws Exception {
        KStream keySelected = this.streamOne.selectKey(MockMapper.selectValueMapper());
        String outputTopic = "select-key-join-" + testNo;
        this.doJoin((KStream<Integer, Integer>)keySelected, this.streamTwo, outputTopic);
        return new ExpectedOutputOnTopic(this.expectedStreamOneTwoJoin, outputTopic);
    }

    private ExpectedOutputOnTopic flatMapJoin() throws InterruptedException {
        KStream flatMapped = this.streamOne.flatMap((KeyValueMapper)new KeyValueMapper<Long, Integer, Iterable<KeyValue<Integer, Integer>>>(){

            public Iterable<KeyValue<Integer, Integer>> apply(Long key, Integer value) {
                return Collections.singletonList(new KeyValue((Object)value, (Object)value));
            }
        });
        String outputTopic = "flat-map-join-" + testNo;
        this.doJoin((KStream<Integer, Integer>)flatMapped, this.streamTwo, outputTopic);
        return new ExpectedOutputOnTopic(this.expectedStreamOneTwoJoin, outputTopic);
    }

    private ExpectedOutputOnTopic joinMappedRhsStream() throws InterruptedException {
        String output = "join-rhs-stream-mapped-" + testNo;
        CLUSTER.createTopic(output);
        this.streamTwo.join(this.streamOne.map(this.keyMapper), TOSTRING_JOINER, this.getJoinWindow(), Joined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.Integer())).to(Serdes.Integer(), Serdes.String(), output);
        return new ExpectedOutputOnTopic(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"), output);
    }

    private ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws InterruptedException {
        KStream map1 = this.streamOne.map(this.keyMapper);
        KStream map2 = this.streamTwo.map(MockMapper.noOpKeyValueMapper());
        String outputTopic = "left-join-" + testNo;
        CLUSTER.createTopic(outputTopic);
        map1.leftJoin(map2, TOSTRING_JOINER, this.getJoinWindow(), Joined.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer(), (Serde)Serdes.String())).filterNot((Predicate)new Predicate<Integer, String>(){

            public boolean test(Integer key, String value) {
                return value.substring(2).equals("null");
            }
        }).to(Serdes.Integer(), Serdes.String(), outputTopic);
        return new ExpectedOutputOnTopic(this.expectedStreamOneTwoJoin, outputTopic);
    }

    private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws InterruptedException {
        KStream map1 = this.streamOne.map(this.keyMapper);
        KeyValueMapper kvMapper = MockMapper.noOpKeyValueMapper();
        KStream map2 = this.streamTwo.map(kvMapper);
        KStream join = map1.join(map2, TOSTRING_JOINER, this.getJoinWindow(), Joined.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer(), (Serde)Serdes.String()));
        String topic = "map-join-join-" + testNo;
        CLUSTER.createTopic(topic);
        join.map(kvMapper).join(this.streamFour.map(kvMapper), TOSTRING_JOINER, this.getJoinWindow(), Joined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String())).to(Serdes.Integer(), Serdes.String(), topic);
        return new ExpectedOutputOnTopic(Arrays.asList("1:A:A", "2:B:B", "3:C:C", "4:D:D", "5:E:E"), topic);
    }

    private JoinWindows getJoinWindow() {
        return JoinWindows.of((long)WINDOW_SIZE).until(3L * WINDOW_SIZE);
    }

    private void verifyCorrectOutput(ExpectedOutputOnTopic expectedOutputOnTopic) throws InterruptedException {
        MatcherAssert.assertThat(this.receiveMessages((Deserializer<?>)new StringDeserializer(), expectedOutputOnTopic.expectedOutput.size(), expectedOutputOnTopic.outputTopic), (Matcher)Is.is((Object)expectedOutputOnTopic.expectedOutput));
    }

    private void produceMessages() throws Exception {
        this.produceToStreamOne();
        this.produceStreamTwoInputTo(this.streamTwoInput);
        this.produceStreamTwoInputTo(this.streamFourInput);
    }

    private void produceStreamTwoInputTo(String streamTwoInput) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronously(streamTwoInput, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)2, (Object)"B"), new KeyValue((Object)3, (Object)"C"), new KeyValue((Object)4, (Object)"D"), new KeyValue((Object)5, (Object)"E")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
    }

    private void produceToStreamOne() throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamOneInput, Arrays.asList(new KeyValue((Object)10L, (Object)1), new KeyValue((Object)5L, (Object)2), new KeyValue((Object)12L, (Object)3), new KeyValue((Object)15L, (Object)4), new KeyValue((Object)20L, (Object)5), new KeyValue((Object)70L, null)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), LongSerializer.class, IntegerSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
    }

    private void createTopics() throws InterruptedException {
        this.streamOneInput = "stream-one-" + testNo;
        this.streamTwoInput = "stream-two-" + testNo;
        this.streamFourInput = "stream-four-" + testNo;
        CLUSTER.createTopic(this.streamOneInput, 2, 1);
        CLUSTER.createTopic(this.streamTwoInput, 2, 1);
        CLUSTER.createTopic(this.streamFourInput, 2, 1);
    }

    private void startStreams() {
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.kafkaStreams.start();
    }

    private List<String> receiveMessages(Deserializer<?> valueDeserializer, int numMessages, String topic) throws InterruptedException {
        Properties config = new Properties();
        config.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        config.setProperty("group.id", "kstream-test");
        config.setProperty("auto.offset.reset", "earliest");
        config.setProperty("key.deserializer", IntegerDeserializer.class.getName());
        config.setProperty("value.deserializer", valueDeserializer.getClass().getName());
        List<String> received = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(config, topic, numMessages, 60000L);
        Collections.sort(received);
        return received;
    }

    private void doJoin(KStream<Integer, Integer> lhs, KStream<Integer, String> rhs, String outputTopic) throws InterruptedException {
        CLUSTER.createTopic(outputTopic);
        lhs.join(rhs, TOSTRING_JOINER, this.getJoinWindow(), Joined.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer(), (Serde)Serdes.String())).to(Serdes.Integer(), Serdes.String(), outputTopic);
    }

    private class ExpectedOutputOnTopic {
        private final List<String> expectedOutput;
        private final String outputTopic;

        ExpectedOutputOnTopic(List<String> expectedOutput, String outputTopic) {
            this.expectedOutput = expectedOutput;
            this.outputTopic = outputTopic;
        }
    }
}

