/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class KStreamKTableJoinIntegrationTest {
    private static final int NUM_BROKERS = 1;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final MockTime mockTime;
    private String userClicksTopic;
    private String userRegionsTopic;
    private String userRegionsStoreName;
    private String outputTopic;
    private static volatile int testNo = 0;
    private KafkaStreams kafkaStreams;
    private Properties streamsConfiguration;
    @Parameterized.Parameter
    public long cacheSizeBytes;

    public KStreamKTableJoinIntegrationTest() {
        this.mockTime = KStreamKTableJoinIntegrationTest.CLUSTER.time;
    }

    @Before
    public void before() throws InterruptedException {
        this.userClicksTopic = "user-clicks-" + ++testNo;
        this.userRegionsTopic = "user-regions-" + testNo;
        this.userRegionsStoreName = "user-regions-store-name-" + testNo;
        this.outputTopic = "output-topic-" + testNo;
        CLUSTER.createTopic(this.userClicksTopic);
        CLUSTER.createTopic(this.userRegionsTopic);
        CLUSTER.createTopic(this.outputTopic);
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", "join-integration-test-" + testNo);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("key.serde", Serdes.String().getClass().getName());
        this.streamsConfiguration.put("value.serde", Serdes.String().getClass().getName());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("cache.max.bytes.buffering", (Object)this.cacheSizeBytes);
        this.streamsConfiguration.put("internal.leave.group.on.close", (Object)true);
    }

    @After
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Parameterized.Parameters
    public static Object[] data() {
        return new Object[]{0, 0xA00000L};
    }

    @Test
    public void shouldCountClicksPerRegion() throws Exception {
        List userClicks = Arrays.asList(new KeyValue((Object)"alice", (Object)13L), new KeyValue((Object)"bob", (Object)4L), new KeyValue((Object)"chao", (Object)25L), new KeyValue((Object)"bob", (Object)19L), new KeyValue((Object)"dave", (Object)56L), new KeyValue((Object)"eve", (Object)78L), new KeyValue((Object)"alice", (Object)40L), new KeyValue((Object)"fang", (Object)99L));
        List userRegions = Arrays.asList(new KeyValue((Object)"alice", (Object)"asia"), new KeyValue((Object)"bob", (Object)"americas"), new KeyValue((Object)"chao", (Object)"asia"), new KeyValue((Object)"dave", (Object)"europe"), new KeyValue((Object)"alice", (Object)"europe"), new KeyValue((Object)"eve", (Object)"americas"), new KeyValue((Object)"fang", (Object)"asia"));
        List<KeyValue> expectedClicksPerRegion = this.cacheSizeBytes == 0L ? Arrays.asList(new KeyValue((Object)"europe", (Object)13L), new KeyValue((Object)"americas", (Object)4L), new KeyValue((Object)"asia", (Object)25L), new KeyValue((Object)"americas", (Object)23L), new KeyValue((Object)"europe", (Object)69L), new KeyValue((Object)"americas", (Object)101L), new KeyValue((Object)"europe", (Object)109L), new KeyValue((Object)"asia", (Object)124L)) : Arrays.asList(new KeyValue((Object)"americas", (Object)101L), new KeyValue((Object)"europe", (Object)109L), new KeyValue((Object)"asia", (Object)124L));
        Serde stringSerde = Serdes.String();
        Serde longSerde = Serdes.Long();
        KStreamBuilder builder = new KStreamBuilder();
        KStream userClicksStream = builder.stream(stringSerde, longSerde, new String[]{this.userClicksTopic});
        KTable userRegionsTable = builder.table(stringSerde, stringSerde, this.userRegionsTopic, this.userRegionsStoreName);
        KTable clicksPerRegion = userClicksStream.leftJoin(userRegionsTable, (ValueJoiner)new ValueJoiner<Long, String, RegionWithClicks>(){

            public RegionWithClicks apply(Long clicks, String region) {
                return new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks);
            }
        }).map((KeyValueMapper)new KeyValueMapper<String, RegionWithClicks, KeyValue<String, Long>>(){

            public KeyValue<String, Long> apply(String key, RegionWithClicks value) {
                return new KeyValue((Object)value.getRegion(), (Object)value.getClicks());
            }
        }).groupByKey(stringSerde, longSerde).reduce((Reducer)new Reducer<Long>(){

            public Long apply(Long value1, Long value2) {
                return value1 + value2;
            }
        }, "ClicksPerRegionUnwindowed");
        clicksPerRegion.to(stringSerde, longSerde, this.outputTopic);
        this.kafkaStreams = new KafkaStreams((TopologyBuilder)builder, this.streamsConfiguration);
        this.kafkaStreams.start();
        Properties userRegionsProducerConfig = new Properties();
        userRegionsProducerConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        userRegionsProducerConfig.put("acks", "all");
        userRegionsProducerConfig.put("retries", (Object)0);
        userRegionsProducerConfig.put("key.serializer", StringSerializer.class);
        userRegionsProducerConfig.put("value.serializer", StringSerializer.class);
        IntegrationTestUtils.produceKeyValuesSynchronously(this.userRegionsTopic, userRegions, userRegionsProducerConfig, (Time)this.mockTime);
        Properties userClicksProducerConfig = new Properties();
        userClicksProducerConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        userClicksProducerConfig.put("acks", "all");
        userClicksProducerConfig.put("retries", (Object)0);
        userClicksProducerConfig.put("key.serializer", StringSerializer.class);
        userClicksProducerConfig.put("value.serializer", LongSerializer.class);
        IntegrationTestUtils.produceKeyValuesSynchronously(this.userClicksTopic, userClicks, userClicksProducerConfig, (Time)this.mockTime);
        Properties consumerConfig = new Properties();
        consumerConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerConfig.put("group.id", "join-integration-test-standard-consumer");
        consumerConfig.put("auto.offset.reset", "earliest");
        consumerConfig.put("key.deserializer", StringDeserializer.class);
        consumerConfig.put("value.deserializer", LongDeserializer.class);
        List actualClicksPerRegion = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, this.outputTopic, expectedClicksPerRegion.size());
        Assert.assertThat(actualClicksPerRegion, (Matcher)CoreMatchers.equalTo(expectedClicksPerRegion));
    }

    private static final class RegionWithClicks {
        private final String region;
        private final long clicks;

        public RegionWithClicks(String region, long clicks) {
            if (region == null || region.isEmpty()) {
                throw new IllegalArgumentException("region must be set");
            }
            if (clicks < 0L) {
                throw new IllegalArgumentException("clicks must not be negative");
            }
            this.region = region;
            this.clicks = clicks;
        }

        public String getRegion() {
            return this.region;
        }

        public long getClicks() {
            return this.clicks;
        }
    }
}

