/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.perf;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.perf.YahooBenchmark;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;

public class SimpleBenchmark {
    private static final String LOADING_PRODUCER_CLIENT_ID = "simple-benchmark-loading-producer";
    private static final String SOURCE_TOPIC_ONE = "simpleBenchmarkSourceTopic1";
    private static final String SOURCE_TOPIC_TWO = "simpleBenchmarkSourceTopic2";
    private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
    private static final String YAHOO_CAMPAIGNS_TOPIC = "yahooCampaigns";
    private static final String YAHOO_EVENTS_TOPIC = "yahooEvents";
    private static final ValueJoiner<byte[], byte[], byte[]> VALUE_JOINER = new ValueJoiner<byte[], byte[], byte[]>(){

        public byte[] apply(byte[] value1, byte[] value2) {
            if (value1 != null) {
                return value1;
            }
            if (value2 != null) {
                return value2;
            }
            return new byte[100];
        }
    };
    private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
    private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
    long processedBytes = 0L;
    int processedRecords = 0;
    private static final long POLL_MS = 500L;
    private static final long COMMIT_INTERVAL_MS = 30000L;
    private static final int MAX_POLL_RECORDS = 1000;
    private static final int KEY_SPACE_SIZE = 10000;
    private static final long STREAM_STREAM_JOIN_WINDOW = 10000L;
    private static final long AGGREGATE_WINDOW_SIZE = 1000L;
    private static final long AGGREGATE_WINDOW_ADVANCE = 500L;
    private static final int SOCKET_SIZE_BYTES = 0x100000;
    private static final int MAX_WAIT_MS = 180000;
    final String testName;
    final int numRecords;
    final Properties props;
    private final int valueSize;
    private final double keySkew;

    private SimpleBenchmark(Properties props, String testName, int numRecords, double keySkew, int valueSize) {
        this.props = props;
        this.testName = testName;
        this.keySkew = keySkew;
        this.valueSize = valueSize;
        this.numRecords = numRecords;
    }

    private void run() {
        switch (this.testName) {
            case "load-one": {
                this.produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_ONE, this.numRecords, this.keySkew, this.valueSize);
                break;
            }
            case "load-two": {
                this.produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_ONE, this.numRecords, this.keySkew, this.valueSize);
                this.produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_TWO, this.numRecords, this.keySkew, this.valueSize);
                break;
            }
            case "consume": {
                this.consume(SOURCE_TOPIC_ONE);
                break;
            }
            case "consumeproduce": {
                this.consumeAndProduce(SOURCE_TOPIC_ONE);
                break;
            }
            case "streamcount": {
                this.countStreamsNonWindowed(SOURCE_TOPIC_ONE);
                break;
            }
            case "streamcountwindowed": {
                this.countStreamsWindowed(SOURCE_TOPIC_ONE);
                break;
            }
            case "streamprocess": {
                this.processStream(SOURCE_TOPIC_ONE);
                break;
            }
            case "streamprocesswithsink": {
                this.processStreamWithSink(SOURCE_TOPIC_ONE);
                break;
            }
            case "streamprocesswithstatestore": {
                this.processStreamWithStateStore(SOURCE_TOPIC_ONE);
                break;
            }
            case "streamprocesswithwindowstore": {
                this.processStreamWithWindowStore(SOURCE_TOPIC_ONE);
                break;
            }
            case "streamtablejoin": {
                this.streamTableJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
                break;
            }
            case "streamstreamjoin": {
                this.streamStreamJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
                break;
            }
            case "tabletablejoin": {
                this.tableTableJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
                break;
            }
            case "yahoo": {
                this.yahooBenchmark(YAHOO_CAMPAIGNS_TOPIC, YAHOO_EVENTS_TOPIC);
                break;
            }
            default: {
                throw new RuntimeException("Unknown test name " + this.testName);
            }
        }
    }

    public static void main(String[] args) throws IOException {
        if (args.length < 5) {
            System.err.println("Not enough parameters are provided; expecting propFileName, testName, numRecords, keySkew, valueSize");
            System.exit(1);
        }
        String propFileName = args[0];
        String testName = args[1].toLowerCase(Locale.ROOT);
        int numRecords = Integer.parseInt(args[2]);
        double keySkew = Double.parseDouble(args[3]);
        int valueSize = Integer.parseInt(args[4]);
        Properties props = Utils.loadProps((String)propFileName);
        String kafka = props.getProperty("bootstrap.servers");
        if (kafka == null) {
            System.err.println("No bootstrap kafka servers specified in bootstrap.servers");
            System.exit(1);
        }
        System.out.println("StreamsTest instance started");
        System.out.println("testName=" + testName);
        System.out.println("streamsProperties=" + props);
        System.out.println("numRecords=" + numRecords);
        System.out.println("keySkew=" + keySkew);
        System.out.println("valueSize=" + valueSize);
        SimpleBenchmark benchmark = new SimpleBenchmark(props, testName, numRecords, keySkew, valueSize);
        benchmark.run();
    }

    public void setStreamProperties(String applicationId) {
        this.props.put("application.id", applicationId);
        this.props.put("client.id", "simple-benchmark");
        this.props.put("poll.ms", (Object)500L);
        this.props.put("commit.interval.ms", (Object)30000L);
        this.props.put("default.key.serde", Serdes.Integer().getClass());
        this.props.put("default.value.serde", Serdes.ByteArray().getClass());
        this.props.put("auto.offset.reset", "earliest");
        this.props.put("receive.buffer.bytes", (Object)0x100000);
        this.props.put("max.poll.records", (Object)1000);
        this.props.put("linger.ms", (Object)5000);
        this.props.put("batch.size", (Object)131072);
    }

    private Properties setProduceConsumeProperties(String clientId) {
        Properties clientProps = new Properties();
        clientProps.put("bootstrap.servers", this.props.getProperty("bootstrap.servers"));
        clientProps.put("client.id", clientId);
        clientProps.put("linger.ms", (Object)5000);
        clientProps.put("batch.size", (Object)131072);
        clientProps.put("send.buffer.bytes", (Object)0x100000);
        clientProps.put("key.serializer", IntegerSerializer.class);
        clientProps.put("value.serializer", ByteArraySerializer.class);
        clientProps.put("key.deserializer", IntegerDeserializer.class);
        clientProps.put("value.deserializer", ByteArrayDeserializer.class);
        clientProps.put("enable.auto.commit", "false");
        clientProps.put("receive.buffer.bytes", (Object)0x100000);
        clientProps.put("max.poll.records", (Object)1000);
        return clientProps;
    }

    void resetStats() {
        this.processedRecords = 0;
        this.processedBytes = 0L;
    }

    private void produce(String clientId, String topic, int numRecords, double keySkew, int valueSize) {
        Properties props = this.setProduceConsumeProperties(clientId);
        ZipfGenerator keyGen = new ZipfGenerator(10000, keySkew);
        try (KafkaProducer producer = new KafkaProducer(props);){
            byte[] value = new byte[valueSize];
            new Random(System.currentTimeMillis()).nextBytes(value);
            for (int i = 0; i < numRecords; ++i) {
                producer.send(new ProducerRecord(topic, (Object)keyGen.next(), (Object)value));
            }
        }
    }

    private void consumeAndProduce(String topic) {
        Properties consumerProps = this.setProduceConsumeProperties("simple-benchmark-consumer");
        Properties producerProps = this.setProduceConsumeProperties("simple-benchmark-producer");
        long startTime = System.currentTimeMillis();
        try (KafkaConsumer consumer = new KafkaConsumer(consumerProps);
             KafkaProducer producer = new KafkaProducer(producerProps);){
            List<TopicPartition> partitions = this.getAllPartitions(consumer, topic);
            consumer.assign(partitions);
            consumer.seekToBeginning(partitions);
            block18: do {
                ConsumerRecords records;
                if ((records = consumer.poll(Duration.ofMillis(500L))).isEmpty()) {
                    if (this.processedRecords != this.numRecords) continue;
                    break;
                }
                for (ConsumerRecord record : records) {
                    producer.send(new ProducerRecord(SINK_TOPIC, record.key(), record.value()));
                    ++this.processedRecords;
                    this.processedBytes += (long)(((byte[])record.value()).length + 32);
                    if (this.processedRecords != this.numRecords) continue;
                    continue block18;
                }
            } while (this.processedRecords != this.numRecords);
        }
        long endTime = System.currentTimeMillis();
        this.printResults("ConsumerProducer Performance [records/latency/rec-sec/MB-sec read]: ", endTime - startTime);
    }

    private void consume(String topic) {
        Properties consumerProps = this.setProduceConsumeProperties("simple-benchmark-consumer");
        long startTime = System.currentTimeMillis();
        try (KafkaConsumer consumer = new KafkaConsumer(consumerProps);){
            List<TopicPartition> partitions = this.getAllPartitions(consumer, topic);
            consumer.assign(partitions);
            consumer.seekToBeginning(partitions);
            block9: do {
                ConsumerRecords records;
                if ((records = consumer.poll(Duration.ofMillis(500L))).isEmpty()) {
                    if (this.processedRecords != this.numRecords) continue;
                    break;
                }
                for (ConsumerRecord record : records) {
                    ++this.processedRecords;
                    this.processedBytes += (long)(((byte[])record.value()).length + 32);
                    if (this.processedRecords != this.numRecords) continue;
                    continue block9;
                }
            } while (this.processedRecords != this.numRecords);
        }
        long endTime = System.currentTimeMillis();
        this.printResults("Consumer Performance [records/latency/rec-sec/MB-sec read]: ", endTime - startTime);
    }

    private void processStream(String topic) {
        CountDownLatch latch = new CountDownLatch(1);
        this.setStreamProperties("simple-benchmark-streams-source");
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE)).peek((ForeachAction)new CountDownAction(latch));
        KafkaStreams streams = this.createKafkaStreamsWithExceptionHandler(builder, this.props);
        this.runGenericBenchmark(streams, "Streams Source Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
    }

    private void processStreamWithSink(String topic) {
        CountDownLatch latch = new CountDownLatch(1);
        this.setStreamProperties("simple-benchmark-streams-source-sink");
        StreamsBuilder builder = new StreamsBuilder();
        KStream source = builder.stream(topic);
        source.peek((ForeachAction)new CountDownAction(latch)).to(SINK_TOPIC);
        KafkaStreams streams = this.createKafkaStreamsWithExceptionHandler(builder, this.props);
        this.runGenericBenchmark(streams, "Streams SourceSink Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
    }

    private void processStreamWithStateStore(String topic) {
        CountDownLatch latch = new CountDownLatch(1);
        this.setStreamProperties("simple-benchmark-streams-with-store");
        StreamsBuilder builder = new StreamsBuilder();
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)"store"), INTEGER_SERDE, BYTE_SERDE);
        builder.addStateStore(storeBuilder.withCachingEnabled());
        KStream source = builder.stream(topic);
        source.peek((ForeachAction)new CountDownAction(latch)).process((ProcessorSupplier)new ProcessorSupplier<Integer, byte[]>(){

            public Processor<Integer, byte[]> get() {
                return new AbstractProcessor<Integer, byte[]>(){
                    KeyValueStore<Integer, byte[]> store;

                    public void init(ProcessorContext context) {
                        super.init(context);
                        this.store = (KeyValueStore)context.getStateStore("store");
                    }

                    public void process(Integer key, byte[] value) {
                        this.store.get((Object)key);
                        this.store.put((Object)key, (Object)value);
                    }
                };
            }
        }, new String[]{"store"});
        KafkaStreams streams = this.createKafkaStreamsWithExceptionHandler(builder, this.props);
        this.runGenericBenchmark(streams, "Streams Stateful Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
    }

    private void processStreamWithWindowStore(String topic) {
        CountDownLatch latch = new CountDownLatch(1);
        this.setStreamProperties("simple-benchmark-streams-with-store");
        StreamsBuilder builder = new StreamsBuilder();
        StoreBuilder storeBuilder = Stores.windowStoreBuilder((WindowBytesStoreSupplier)Stores.persistentWindowStore((String)"store", (Duration)Duration.ofMillis(3000L), (Duration)Duration.ofMillis(1000L), (boolean)false), INTEGER_SERDE, BYTE_SERDE);
        builder.addStateStore(storeBuilder.withCachingEnabled());
        KStream source = builder.stream(topic);
        source.peek((ForeachAction)new CountDownAction(latch)).process((ProcessorSupplier)new ProcessorSupplier<Integer, byte[]>(){

            public Processor<Integer, byte[]> get() {
                return new AbstractProcessor<Integer, byte[]>(){
                    WindowStore<Integer, byte[]> store;

                    public void init(ProcessorContext context) {
                        super.init(context);
                        this.store = (WindowStore)context.getStateStore("store");
                    }

                    public void process(Integer key, byte[] value) {
                        long timestamp = this.context().timestamp();
                        KeyValueIterator iter = this.store.fetch((Object)(key - 10), (Object)(key + 10), Instant.ofEpochMilli(timestamp - 1000L), Instant.ofEpochMilli(timestamp));
                        while (iter.hasNext()) {
                            iter.next();
                        }
                        iter.close();
                        this.store.put((Object)key, (Object)value, timestamp);
                    }
                };
            }
        }, new String[]{"store"});
        KafkaStreams streams = this.createKafkaStreamsWithExceptionHandler(builder, this.props);
        this.runGenericBenchmark(streams, "Streams Stateful Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
    }

    private void countStreamsNonWindowed(String sourceTopic) {
        CountDownLatch latch = new CountDownLatch(1);
        this.setStreamProperties("simple-benchmark-nonwindowed-count");
        StreamsBuilder builder = new StreamsBuilder();
        KStream input = builder.stream(sourceTopic);
        input.peek((ForeachAction)new CountDownAction(latch)).groupByKey().count();
        KafkaStreams streams = this.createKafkaStreamsWithExceptionHandler(builder, this.props);
        this.runGenericBenchmark(streams, "Streams Count Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
    }

    private void countStreamsWindowed(String sourceTopic) {
        CountDownLatch latch = new CountDownLatch(1);
        this.setStreamProperties("simple-benchmark-windowed-count");
        StreamsBuilder builder = new StreamsBuilder();
        KStream input = builder.stream(sourceTopic);
        input.peek((ForeachAction)new CountDownAction(latch)).groupByKey().windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(1000L)).advanceBy(Duration.ofMillis(500L))).count();
        KafkaStreams streams = this.createKafkaStreamsWithExceptionHandler(builder, this.props);
        this.runGenericBenchmark(streams, "Streams Count Windowed Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
    }

    private void streamTableJoin(String kStreamTopic, String kTableTopic) {
        CountDownLatch latch = new CountDownLatch(1);
        this.setStreamProperties("simple-benchmark-stream-table-join");
        StreamsBuilder builder = new StreamsBuilder();
        KStream input1 = builder.stream(kStreamTopic);
        KTable input2 = builder.table(kTableTopic);
        input1.leftJoin(input2, VALUE_JOINER).foreach((ForeachAction)new CountDownAction(latch));
        KafkaStreams streams = this.createKafkaStreamsWithExceptionHandler(builder, this.props);
        this.runGenericBenchmark(streams, "Streams KStreamKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
    }

    private void streamStreamJoin(String kStreamTopic1, String kStreamTopic2) {
        CountDownLatch latch = new CountDownLatch(1);
        this.setStreamProperties("simple-benchmark-stream-stream-join");
        StreamsBuilder builder = new StreamsBuilder();
        KStream input1 = builder.stream(kStreamTopic1);
        KStream input2 = builder.stream(kStreamTopic2);
        input1.leftJoin(input2, VALUE_JOINER, JoinWindows.of((Duration)Duration.ofMillis(10000L))).foreach((ForeachAction)new CountDownAction(latch));
        KafkaStreams streams = this.createKafkaStreamsWithExceptionHandler(builder, this.props);
        this.runGenericBenchmark(streams, "Streams KStreamKStream LeftJoin Performance [records/latency/rec-sec/MB-sec  joined]: ", latch);
    }

    private void tableTableJoin(String kTableTopic1, String kTableTopic2) {
        CountDownLatch latch = new CountDownLatch(1);
        this.setStreamProperties("simple-benchmark-table-table-join");
        StreamsBuilder builder = new StreamsBuilder();
        KTable input1 = builder.table(kTableTopic1);
        KTable input2 = builder.table(kTableTopic2);
        input1.leftJoin(input2, VALUE_JOINER).toStream().foreach((ForeachAction)new CountDownAction(latch));
        KafkaStreams streams = this.createKafkaStreamsWithExceptionHandler(builder, this.props);
        this.runGenericBenchmark(streams, "Streams KTableKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
    }

    void printResults(String nameOfBenchmark, long latency) {
        System.out.println(nameOfBenchmark + this.processedRecords + "/" + latency + "/" + this.recordsPerSec(latency, this.processedRecords) + "/" + this.megabytesPerSec(latency, this.processedBytes));
    }

    void runGenericBenchmark(KafkaStreams streams, String nameOfBenchmark, CountDownLatch latch) {
        long startTime;
        streams.start();
        long endTime = startTime = System.currentTimeMillis();
        while (latch.getCount() > 0L && endTime - startTime < 180000L) {
            try {
                latch.await(1000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException ex) {
                Thread.interrupted();
            }
            endTime = System.currentTimeMillis();
        }
        streams.close();
        this.printResults(nameOfBenchmark, endTime - startTime);
    }

    private KafkaStreams createKafkaStreamsWithExceptionHandler(StreamsBuilder builder, Properties props) {
        final KafkaStreams streamsClient = new KafkaStreams(builder.build(), props);
        streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
                streamsClient.close(Duration.ofSeconds(30L));
            }
        });
        return streamsClient;
    }

    private double megabytesPerSec(long time, long processedBytes) {
        return (double)processedBytes / 1024.0 / 1024.0 / ((double)time / 1000.0);
    }

    private double recordsPerSec(long time, int numRecords) {
        return (double)numRecords / ((double)time / 1000.0);
    }

    private List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer, String ... topics) {
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
        for (String topic : topics) {
            for (PartitionInfo info : consumer.partitionsFor(topic)) {
                partitions.add(new TopicPartition(info.topic(), info.partition()));
            }
        }
        return partitions;
    }

    private void yahooBenchmark(String campaignsTopic, String eventsTopic) {
        YahooBenchmark benchmark = new YahooBenchmark(this, campaignsTopic, eventsTopic);
        benchmark.run();
    }

    private class ZipfGenerator {
        private final Random rand = new Random(System.currentTimeMillis());
        private final int size;
        private final double skew;
        private double bottom = 0.0;

        ZipfGenerator(int size, double skew) {
            this.size = size;
            this.skew = skew;
            for (int i = 1; i < size; ++i) {
                this.bottom += 1.0 / Math.pow(i, this.skew);
            }
        }

        int next() {
            if (this.skew == 0.0) {
                return this.rand.nextInt(this.size);
            }
            int rank = this.rand.nextInt(this.size);
            double frequency = 1.0 / Math.pow(rank, this.skew) / this.bottom;
            double dice = this.rand.nextDouble();
            while (!(dice < frequency)) {
                rank = this.rand.nextInt(this.size);
                frequency = 1.0 / Math.pow(rank, this.skew) / this.bottom;
                dice = this.rand.nextDouble();
            }
            return rank;
        }
    }

    private class CountDownAction
    implements ForeachAction<Integer, byte[]> {
        private final CountDownLatch latch;

        CountDownAction(CountDownLatch latch) {
            this.latch = latch;
        }

        public void apply(Integer key, byte[] value) {
            ++SimpleBenchmark.this.processedRecords;
            SimpleBenchmark.this.processedBytes += (long)(32 + value.length);
            if (SimpleBenchmark.this.processedRecords == SimpleBenchmark.this.numRecords) {
                this.latch.countDown();
            }
        }
    }
}

