/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.tests.SmokeTestUtil;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.StringDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RelationalSmokeTest
extends SmokeTestUtil {
    private static final Logger LOG = LoggerFactory.getLogger(RelationalSmokeTest.class);
    static final String ARTICLE_SOURCE = "in-article";
    static final String COMMENT_SOURCE = "in-comment";
    static final String ARTICLE_RESULT_SINK = "out-augmented-article";
    static final String COMMENT_RESULT_SINK = "out-augmented-comment";
    private static final String[] TOPICS = new String[]{"in-article", "in-comment", "out-augmented-article", "out-augmented-comment"};

    public static String[] topics() {
        return Arrays.copyOf(TOPICS, TOPICS.length);
    }

    public static void main(String[] args) {
        System.out.println(Arrays.toString(args));
        String mode = args[0];
        String kafka = args[1];
        try {
            switch (mode) {
                case "driver": {
                    int numArticles = 1000;
                    int numComments = 10000;
                    DataSet dataSet = DataSet.generate(1000, 10000);
                    dataSet.produce(kafka, Duration.ofMinutes(1L));
                    LOG.info("Smoke test finished producing");
                    Thread.sleep(1000L);
                    LOG.info("Smoke test starting verification");
                    boolean pass = App.verifySync(kafka, Instant.now().plus(Duration.ofMinutes(10L)));
                    if (pass) {
                        LOG.info("Smoke test complete: passed");
                        break;
                    }
                    LOG.error("Smoke test complete: failed");
                    break;
                }
                case "application": {
                    String nodeId = args[2];
                    String stateDir = args[3];
                    App.startSync(kafka, UUID.randomUUID().toString(), nodeId, stateDir);
                    break;
                }
                default: {
                    LOG.error("Unknown command: {}", (Object)mode);
                    throw new RuntimeException("Unknown command: " + mode);
                }
            }
        }
        catch (InterruptedException e) {
            LOG.error("Interrupted", (Throwable)e);
        }
    }

    public static final class App {
        public static Topology getTopology() {
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            KTable articles = streamsBuilder.table(RelationalSmokeTest.ARTICLE_SOURCE, Consumed.with(SmokeTestUtil.intSerde, (Serde)new Article.ArticleSerde()));
            KTable comments = streamsBuilder.table(RelationalSmokeTest.COMMENT_SOURCE, Consumed.with(SmokeTestUtil.intSerde, (Serde)new Comment.CommentSerde()));
            KTable commentCounts = comments.groupBy((key, value) -> new KeyValue((Object)value.getArticleId(), (Object)1), Grouped.with((Serde)Serdes.Integer(), (Serde)Serdes.Short())).count();
            articles.leftJoin(commentCounts, AugmentedArticle.joiner(), Materialized.with(null, (Serde)new AugmentedArticle.AugmentedArticleSerde())).toStream().to(RelationalSmokeTest.ARTICLE_RESULT_SINK);
            comments.join(articles, Comment::getArticleId, AugmentedComment.joiner(), Materialized.with(null, (Serde)new AugmentedComment.AugmentedCommentSerde())).toStream().to(RelationalSmokeTest.COMMENT_RESULT_SINK);
            return streamsBuilder.build();
        }

        public static Properties getConfig(String broker, String application, String id, String stateDir) {
            return Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)broker), Utils.mkEntry((Object)"application.id", (Object)application), Utils.mkEntry((Object)"client.id", (Object)id), Utils.mkEntry((Object)"processing.guarantee", (Object)"exactly_once"), Utils.mkEntry((Object)"commit.interval.ms", (Object)"1000"), Utils.mkEntry((Object)"auto.offset.reset", (Object)"earliest"), Utils.mkEntry((Object)"state.dir", (Object)stateDir)}));
        }

        public static KafkaStreams startSync(String broker, String application, String id, String stateDir) throws InterruptedException {
            KafkaStreams kafkaStreams = new KafkaStreams(App.getTopology(), App.getConfig(broker, application, id, stateDir));
            CountDownLatch startUpLatch = new CountDownLatch(1);
            kafkaStreams.setStateListener((newState, oldState) -> {
                if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
                    startUpLatch.countDown();
                }
            });
            kafkaStreams.start();
            startUpLatch.await();
            LOG.info("Streams has started.");
            return kafkaStreams;
        }

        public static boolean verifySync(String broker, Instant deadline) throws InterruptedException {
            Deserializer keyDeserializer = SmokeTestUtil.intSerde.deserializer();
            Article.ArticleDeserializer articleDeserializer = new Article.ArticleDeserializer();
            AugmentedArticle.AugmentedArticleDeserializer augmentedArticleDeserializer = new AugmentedArticle.AugmentedArticleDeserializer();
            Comment.CommentDeserializer commentDeserializer = new Comment.CommentDeserializer();
            AugmentedComment.AugmentedCommentDeserializer augmentedCommentDeserializer = new AugmentedComment.AugmentedCommentDeserializer();
            Properties consumerProperties = new Properties();
            String id = "RelationalSmokeTestConsumer" + UUID.randomUUID();
            consumerProperties.put("client.id", id);
            consumerProperties.put("group.id", id);
            consumerProperties.put("bootstrap.servers", broker);
            consumerProperties.put("key.deserializer", IntegerDeserializer.class);
            consumerProperties.put("value.deserializer", ByteArrayDeserializer.class);
            consumerProperties.put("isolation.level", "read_committed");
            consumerProperties.put("enable.auto.commit", (Object)false);
            try (KafkaConsumer consumer = new KafkaConsumer(consumerProperties);){
                List articlePartitions = consumer.partitionsFor(RelationalSmokeTest.ARTICLE_SOURCE);
                List augmentedArticlePartitions = consumer.partitionsFor(RelationalSmokeTest.ARTICLE_RESULT_SINK);
                List commentPartitions = consumer.partitionsFor(RelationalSmokeTest.COMMENT_SOURCE);
                List augmentedCommentPartitions = consumer.partitionsFor(RelationalSmokeTest.COMMENT_RESULT_SINK);
                List assignment = Stream.concat(Stream.concat(articlePartitions.stream().map(p -> new TopicPartition(p.topic(), p.partition())), augmentedArticlePartitions.stream().map(p -> new TopicPartition(p.topic(), p.partition()))), Stream.concat(commentPartitions.stream().map(p -> new TopicPartition(p.topic(), p.partition())), augmentedCommentPartitions.stream().map(p -> new TopicPartition(p.topic(), p.partition())))).collect(Collectors.toList());
                consumer.assign(assignment);
                consumer.seekToBeginning(assignment);
                TreeMap<Integer, Article> consumedArticles = new TreeMap<Integer, Article>();
                TreeMap<Integer, AugmentedArticle> consumedAugmentedArticles = new TreeMap<Integer, AugmentedArticle>();
                TreeMap<Integer, Comment> consumedComments = new TreeMap<Integer, Comment>();
                TreeMap<Integer, AugmentedComment> consumedAugmentedComments = new TreeMap<Integer, AugmentedComment>();
                boolean printedConsumedArticle = false;
                boolean printedConsumedAugmentedArticle = false;
                boolean printedConsumedComment = false;
                boolean printedConsumedAugmentedComment = false;
                boolean passed = false;
                while (!passed && Instant.now().isBefore(deadline)) {
                    boolean lastPollWasEmpty = false;
                    while (!lastPollWasEmpty) {
                        ConsumerRecords poll = consumer.poll(Duration.ofSeconds(1L));
                        lastPollWasEmpty = poll.isEmpty();
                        block23: for (ConsumerRecord record : poll) {
                            Integer key = (Integer)record.key();
                            switch (record.topic()) {
                                case "in-article": {
                                    Article article = (Article)articleDeserializer.deserialize("", (byte[])record.value());
                                    if (consumedArticles.containsKey(key)) {
                                        LOG.warn("Duplicate article: {} and {}", consumedArticles.get(key), (Object)article);
                                    }
                                    consumedArticles.put(key, article);
                                    continue block23;
                                }
                                case "in-comment": {
                                    Comment comment = (Comment)commentDeserializer.deserialize("", (byte[])record.value());
                                    if (consumedComments.containsKey(key)) {
                                        LOG.warn("Duplicate comment: {} and {}", consumedComments.get(key), (Object)comment);
                                    }
                                    consumedComments.put(key, comment);
                                    continue block23;
                                }
                                case "out-augmented-article": {
                                    Article article = (AugmentedArticle)augmentedArticleDeserializer.deserialize("", (byte[])record.value());
                                    consumedAugmentedArticles.put(key, (AugmentedArticle)article);
                                    continue block23;
                                }
                                case "out-augmented-comment": {
                                    Comment comment = (AugmentedComment)augmentedCommentDeserializer.deserialize("", (byte[])record.value());
                                    consumedAugmentedComments.put(key, (AugmentedComment)comment);
                                    continue block23;
                                }
                            }
                            throw new IllegalArgumentException(record.toString());
                        }
                        consumer.commitSync();
                    }
                    if (!printedConsumedArticle && !consumedArticles.isEmpty()) {
                        LOG.info("Consumed first Article");
                        printedConsumedArticle = true;
                    }
                    if (!printedConsumedComment && !consumedComments.isEmpty()) {
                        LOG.info("Consumed first Comment");
                        printedConsumedComment = true;
                    }
                    if (!printedConsumedAugmentedArticle && !consumedAugmentedArticles.isEmpty()) {
                        LOG.info("Consumed first AugmentedArticle");
                        printedConsumedAugmentedArticle = true;
                    }
                    if (!printedConsumedAugmentedComment && !consumedAugmentedComments.isEmpty()) {
                        LOG.info("Consumed first AugmentedComment");
                        printedConsumedAugmentedComment = true;
                    }
                    if (passed = App.verifySync(false, consumedArticles, consumedComments, consumedAugmentedArticles, consumedAugmentedComments)) continue;
                    LOG.info("Verification has not passed yet. ");
                    Thread.sleep(500L);
                }
                boolean bl = App.verifySync(true, consumedArticles, consumedComments, consumedAugmentedArticles, consumedAugmentedComments);
                return bl;
            }
        }

        public static <T> void assertThat(AtomicBoolean pass, StringBuilder failures, String message, T actual, Matcher<? super T> matcher) {
            if (!matcher.matches(actual)) {
                if (failures != null) {
                    StringDescription description = new StringDescription((Appendable)failures);
                    description.appendText("\n" + message).appendText("\nExpected: ").appendDescriptionOf(matcher).appendText("\n     but: ");
                    matcher.describeMismatch(actual, (Description)description);
                    description.appendText("\n");
                }
                pass.set(false);
            }
        }

        static boolean verifySync(boolean logResults, Map<Integer, Article> consumedArticles, Map<Integer, Comment> consumedComments, Map<Integer, AugmentedArticle> consumedAugmentedArticles, Map<Integer, AugmentedComment> consumedAugmentedComments) {
            AtomicBoolean pass = new AtomicBoolean(true);
            StringBuilder report = logResults ? new StringBuilder() : null;
            App.assertThat(pass, report, "one article", consumedArticles.size(), Matchers.greaterThan((Comparable)Integer.valueOf(0)));
            App.assertThat(pass, report, "one comment", consumedComments.size(), Matchers.greaterThan((Comparable)Integer.valueOf(0)));
            App.assertThat(pass, report, "article size", consumedAugmentedArticles.size(), CoreMatchers.is((Object)consumedArticles.size()));
            App.assertThat(pass, report, "comment size", consumedAugmentedComments.size(), CoreMatchers.is((Object)consumedComments.size()));
            TreeMap<Integer, Long> commentCounts = new TreeMap<Integer, Long>();
            for (AugmentedComment augmentedComment : consumedAugmentedComments.values()) {
                int key = augmentedComment.getKey();
                App.assertThat(pass, report, "comment missing, but found in augmentedComment: " + key, consumedComments, Matchers.hasKey((Object)key));
                Comment comment = consumedComments.get(key);
                if (comment != null) {
                    App.assertThat(pass, report, "comment articleId [" + comment.getArticleId() + "] didn't match augmentedComment articleId [" + augmentedComment.getArticleId() + "]", comment.getArticleId(), CoreMatchers.is((Object)augmentedComment.getArticleId()));
                }
                commentCounts.put(augmentedComment.getArticleId(), commentCounts.getOrDefault(augmentedComment.getArticleId(), 0L) + 1L);
                App.assertThat(pass, report, "augmentedArticle [" + augmentedComment.getArticleId() + "] missing for augmentedComment [" + augmentedComment.getKey() + "]", consumedAugmentedArticles, Matchers.hasKey((Object)augmentedComment.getArticleId()));
                AugmentedArticle augmentedArticle = consumedAugmentedArticles.get(augmentedComment.getArticleId());
                if (augmentedArticle != null) {
                    App.assertThat(pass, report, "articlePrefix didn't match augmentedArticle: " + augmentedArticle.getText(), augmentedArticle.getText(), CoreMatchers.startsWith((String)augmentedComment.getArticlePrefix()));
                }
                App.assertThat(pass, report, "article " + augmentedComment.getArticleId() + " missing from consumedArticles", consumedArticles, Matchers.hasKey((Object)augmentedComment.getArticleId()));
                Article article = consumedArticles.get(augmentedComment.getArticleId());
                if (article == null) continue;
                App.assertThat(pass, report, "articlePrefix didn't match article: " + article.getText(), article.getText(), CoreMatchers.startsWith((String)augmentedComment.getArticlePrefix()));
            }
            for (AugmentedArticle augmentedArticle : consumedAugmentedArticles.values()) {
                App.assertThat(pass, report, "article " + augmentedArticle.getKey() + " comment count mismatch", augmentedArticle.getCommentCount(), CoreMatchers.is((Object)commentCounts.getOrDefault(augmentedArticle.getKey(), 0L)));
            }
            if (logResults) {
                if (pass.get()) {
                    LOG.info("Evaluation passed ({}/{}) articles and ({}/{}) comments", new Object[]{consumedAugmentedArticles.size(), consumedArticles.size(), consumedAugmentedComments.size(), consumedComments.size()});
                } else {
                    LOG.error("Evaluation failed\nReport: {}\nConsumed Input Articles: {}\nConsumed Input Comments: {}\nConsumed Augmented Articles: {}\nConsumed Augmented Comments: {}", new Object[]{report, consumedArticles, consumedComments, consumedAugmentedArticles, consumedAugmentedComments});
                }
            }
            return pass.get();
        }
    }

    public static final class AugmentedComment
    extends Comment {
        private final String articlePrefix;

        private AugmentedComment(int key, long timestamp, String text, int articleId, String articlePrefix) {
            super(key, timestamp, text, articleId);
            this.articlePrefix = articlePrefix;
        }

        public String getArticlePrefix() {
            return this.articlePrefix;
        }

        @Override
        public String toString() {
            return "AugmentedComment{key=" + ((Comment)this).key + ", timestamp=" + this.getTimestamp() + ", text='" + this.getText() + '\'' + ", articleId=" + this.getArticleId() + ", articlePrefix='" + this.articlePrefix + '\'' + '}';
        }

        private static String prefix(String text, int length) {
            return text.length() < length ? text : text.substring(0, length);
        }

        public static ValueJoiner<Comment, Article, AugmentedComment> joiner() {
            return (comment, article) -> new AugmentedComment(((Comment)comment).key, comment.getTimestamp(), comment.getText(), comment.getArticleId(), AugmentedComment.prefix(article.getText(), 10));
        }

        public static class AugmentedCommentSerde
        implements Serde<AugmentedComment> {
            public Serializer<AugmentedComment> serializer() {
                return new AugmentedCommentSerializer();
            }

            public Deserializer<AugmentedComment> deserializer() {
                return new AugmentedCommentDeserializer();
            }
        }

        public static class AugmentedCommentDeserializer
        implements Deserializer<AugmentedComment> {
            public AugmentedComment deserialize(String topic, byte[] data) {
                ByteBuffer wrap = ByteBuffer.wrap(data);
                Comment comment = Comment.CommentDeserializer.deserialize(topic, wrap);
                int prefixLength = wrap.getInt();
                byte[] serializedPrefix = new byte[prefixLength];
                wrap.get(serializedPrefix);
                String articlePrefix = (String)SmokeTestUtil.stringSerde.deserializer().deserialize(topic, serializedPrefix);
                return new AugmentedComment(comment.key, comment.getTimestamp(), comment.getText(), comment.getArticleId(), articlePrefix);
            }
        }

        public static class AugmentedCommentSerializer
        implements Serializer<AugmentedComment> {
            private final Comment.CommentSerializer commentSerializer = new Comment.CommentSerializer();

            public byte[] serialize(String topic, AugmentedComment data) {
                byte[] serializedComment = this.commentSerializer.serialize(topic, data);
                byte[] serializedPrefix = SmokeTestUtil.stringSerde.serializer().serialize(topic, (Object)data.getArticlePrefix());
                int length = serializedComment.length + 4 + serializedPrefix.length;
                ByteBuffer buffer = ByteBuffer.allocate(length).put(serializedComment).putInt(serializedPrefix.length).put(serializedPrefix);
                return Serdes.ByteBuffer().serializer().serialize(topic, (Object)buffer);
            }
        }
    }

    public static final class AugmentedArticle
    extends Article {
        private final long commentCount;

        private AugmentedArticle(int key, long timestamp, String text, long commentCount) {
            super(key, timestamp, text);
            this.commentCount = commentCount;
        }

        public long getCommentCount() {
            return this.commentCount;
        }

        @Override
        public String toString() {
            return "AugmentedArticle{key=" + ((Article)this).key + ", timestamp=" + this.getTimestamp() + ", text='" + this.getText() + '\'' + ", commentCount=" + this.commentCount + '}';
        }

        public static ValueJoiner<Article, Long, AugmentedArticle> joiner() {
            return (article, commentCount) -> new AugmentedArticle(article.getKey(), article.getTimestamp(), article.getText(), commentCount == null ? 0L : commentCount);
        }

        public static class AugmentedArticleSerde
        implements Serde<AugmentedArticle> {
            public Serializer<AugmentedArticle> serializer() {
                return new AugmentedArticleSerializer();
            }

            public Deserializer<AugmentedArticle> deserializer() {
                return new AugmentedArticleDeserializer();
            }
        }

        public static class AugmentedArticleDeserializer
        implements Deserializer<AugmentedArticle> {
            public AugmentedArticle deserialize(String topic, byte[] data) {
                ByteBuffer wrap = ByteBuffer.wrap(data);
                Article article = Article.ArticleDeserializer.deserialize(topic, wrap);
                long commentCount = wrap.getLong();
                return new AugmentedArticle(article.key, article.getTimestamp(), article.getText(), commentCount);
            }
        }

        public static class AugmentedArticleSerializer
        implements Serializer<AugmentedArticle> {
            private final Article.ArticleSerializer articleSerializer = new Article.ArticleSerializer();

            public byte[] serialize(String topic, AugmentedArticle data) {
                byte[] serializedArticle = this.articleSerializer.serialize(topic, data);
                int length = serializedArticle.length + 8;
                ByteBuffer buffer = ByteBuffer.allocate(length).put(serializedArticle).putLong(data.getCommentCount());
                return Serdes.ByteBuffer().serializer().serialize(topic, (Object)buffer);
            }
        }
    }

    public static final class DataSet {
        private final Article[] articles;
        private final Comment[] comments;

        private DataSet(Article[] articles, Comment[] comments) {
            this.articles = articles;
            this.comments = comments;
        }

        public Article[] getArticles() {
            return this.articles;
        }

        public Comment[] getComments() {
            return this.comments;
        }

        public String toString() {
            StringBuilder stringBuilder = new StringBuilder();
            stringBuilder.append(this.articles.length).append(" Articles").append('\n');
            for (Article article : this.articles) {
                stringBuilder.append("  ").append(article).append('\n');
            }
            stringBuilder.append(this.comments.length).append(" Comments").append("\n");
            for (Comment comment : this.comments) {
                stringBuilder.append("  ").append(comment).append('\n');
            }
            return stringBuilder.toString();
        }

        public static DataSet generate(int numArticles, int numComments) {
            int i;
            int timeSpan = 345600000;
            long dataStartTime = System.currentTimeMillis();
            long dataEndTime = dataStartTime + 345600000L;
            long seed = new Random().nextLong();
            Random random = new Random(seed);
            LOG.info("Dataset PRNG seed: {}", (Object)seed);
            Iterator<Integer> articlesToCommentOnSequence = DataSet.zipfNormal(random, numArticles);
            Article[] articles = new Article[numArticles];
            Comment[] comments = new Comment[numComments];
            for (i = 0; i < numArticles; ++i) {
                long timestamp = (long)random.nextInt(345600000) + dataStartTime;
                String text = DataSet.randomText(random, 2000);
                articles[i] = new Article(i, timestamp, text);
            }
            for (i = 0; i < numComments; ++i) {
                Comment comment;
                int articleId = articlesToCommentOnSequence.next();
                long articleTimestamp = articles[articleId].getTimestamp();
                long timestamp = (long)random.nextInt((int)(dataEndTime - articleTimestamp)) + articleTimestamp;
                String text = DataSet.randomText(random, 200);
                comments[i] = comment = new Comment(i, timestamp, text, articleId);
            }
            return new DataSet(articles, comments);
        }

        private static String randomText(Random random, int avgLength) {
            int lowChar = 97;
            int highChar = 122;
            int length = Math.max(0, (int)(random.nextGaussian() * (double)avgLength / 3.0) + avgLength);
            char[] chars = new char[length];
            for (int i = 0; i < chars.length; ++i) {
                chars[i] = (char)(random.nextInt(25) + 97);
            }
            return new String(chars);
        }

        private static Iterator<Integer> zipfNormal(final Random random, final int keySpace) {
            return new Iterator<Integer>(){

                @Override
                public boolean hasNext() {
                    return true;
                }

                @Override
                public Integer next() {
                    double gaussian = Math.abs(random.nextGaussian());
                    double scaled = gaussian / 3.0;
                    double sample = scaled > 1.0 ? 0.0 : scaled;
                    double keyDouble = sample * (double)keySpace;
                    return (int)keyDouble;
                }
            };
        }

        public void produce(String kafka, Duration timeToSpend) throws InterruptedException {
            Properties producerProps = new Properties();
            String id = "RelationalSmokeTestProducer" + UUID.randomUUID();
            producerProps.put("client.id", id);
            producerProps.put("bootstrap.servers", kafka);
            producerProps.put("key.serializer", IntegerSerializer.class);
            producerProps.put("value.serializer", ByteArraySerializer.class);
            producerProps.put("acks", "-1");
            Article.ArticleSerializer articleSerializer = new Article.ArticleSerializer();
            Comment.CommentSerializer commentSerializer = new Comment.CommentSerializer();
            long pauseTime = timeToSpend.toMillis() / (long)(this.articles.length + this.comments.length);
            try (KafkaProducer producer = new KafkaProducer(producerProps);){
                int a = 0;
                int c = 0;
                while (a < this.articles.length || c < this.comments.length) {
                    ProducerRecord producerRecord;
                    if (a < this.articles.length && c >= this.comments.length || a < this.articles.length && this.articles[a].getTimestamp() <= this.comments[c].timestamp) {
                        producerRecord = new ProducerRecord(RelationalSmokeTest.ARTICLE_SOURCE, null, Long.valueOf(this.articles[a].getTimestamp()), (Object)this.articles[a].getKey(), (Object)articleSerializer.serialize("", this.articles[a]));
                        ++a;
                    } else {
                        producerRecord = new ProducerRecord(RelationalSmokeTest.COMMENT_SOURCE, null, Long.valueOf(this.comments[c].getTimestamp()), (Object)this.comments[c].getKey(), (Object)commentSerializer.serialize("", this.comments[c]));
                        ++c;
                    }
                    producer.send(producerRecord);
                    producer.flush();
                    LOG.info("sent {} {}", (Object)producerRecord.topic(), producerRecord.key());
                    Thread.sleep(pauseTime);
                }
            }
        }
    }

    public static class Comment {
        private final int key;
        private final long timestamp;
        private final String text;
        private final int articleId;

        private Comment(int key, long timestamp, String text, int articleId) {
            this.key = key;
            this.timestamp = timestamp;
            this.text = text;
            this.articleId = articleId;
        }

        public int getKey() {
            return this.key;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public String getText() {
            return this.text;
        }

        public int getArticleId() {
            return this.articleId;
        }

        public String toString() {
            return "Comment{key=" + this.key + ", timestamp=" + Instant.ofEpochMilli(this.timestamp) + ", text='" + this.text + '\'' + ", articleId=" + this.articleId + '}';
        }

        public static class CommentSerde
        implements Serde<Comment> {
            public Serializer<Comment> serializer() {
                return new CommentSerializer();
            }

            public Deserializer<Comment> deserializer() {
                return new CommentDeserializer();
            }
        }

        public static class CommentDeserializer
        implements Deserializer<Comment> {
            public static Comment deserialize(String topic, ByteBuffer buffer) {
                int key = buffer.getInt();
                long timestamp = buffer.getLong();
                int textLength = buffer.getInt();
                byte[] textBytes = new byte[textLength];
                buffer.get(textBytes);
                String text = (String)SmokeTestUtil.stringSerde.deserializer().deserialize(topic, textBytes);
                int articleId = buffer.getInt();
                return new Comment(key, timestamp, text, articleId);
            }

            public Comment deserialize(String topic, byte[] data) {
                ByteBuffer buffer = (ByteBuffer)Serdes.ByteBuffer().deserializer().deserialize(topic, data);
                return CommentDeserializer.deserialize(topic, buffer);
            }
        }

        public static class CommentSerializer
        implements Serializer<Comment> {
            public byte[] serialize(String topic, Comment data) {
                byte[] serialText = SmokeTestUtil.stringSerde.serializer().serialize(topic, (Object)data.text);
                int length = 12 + (4 + serialText.length) + 4;
                ByteBuffer buffer = ByteBuffer.allocate(length).putInt(data.key).putLong(data.timestamp).putInt(serialText.length).put(serialText).putInt(data.articleId);
                return Serdes.ByteBuffer().serializer().serialize(topic, (Object)buffer);
            }
        }
    }

    public static class Article {
        private final int key;
        private final long timestamp;
        private final String text;

        private Article(int key, long timestamp, String text) {
            this.key = key;
            this.timestamp = timestamp;
            this.text = text;
        }

        public int getKey() {
            return this.key;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public String getText() {
            return this.text;
        }

        public String toString() {
            return "Article{key=" + this.key + ", timestamp=" + Instant.ofEpochMilli(this.timestamp) + ", text='" + this.text + '\'' + '}';
        }

        public static class ArticleSerde
        implements Serde<Article> {
            public Serializer<Article> serializer() {
                return new ArticleSerializer();
            }

            public Deserializer<Article> deserializer() {
                return new ArticleDeserializer();
            }
        }

        public static class ArticleDeserializer
        implements Deserializer<Article> {
            public static Article deserialize(String topic, ByteBuffer buffer) {
                int key = buffer.getInt();
                long timestamp = buffer.getLong();
                int textLength = buffer.getInt();
                byte[] serialText = new byte[textLength];
                buffer.get(serialText);
                String text = (String)SmokeTestUtil.stringSerde.deserializer().deserialize(topic, serialText);
                return new Article(key, timestamp, text);
            }

            public Article deserialize(String topic, byte[] data) {
                ByteBuffer buffer = (ByteBuffer)Serdes.ByteBuffer().deserializer().deserialize(topic, data);
                return ArticleDeserializer.deserialize(topic, buffer);
            }
        }

        public static class ArticleSerializer
        implements Serializer<Article> {
            public byte[] serialize(String topic, Article data) {
                byte[] serialText = SmokeTestUtil.stringSerde.serializer().serialize(topic, (Object)data.getText());
                int length = 16 + serialText.length;
                ByteBuffer buffer = ByteBuffer.allocate(length).putInt(data.getKey()).putLong(data.getTimestamp()).putInt(serialText.length).put(serialText);
                return Serdes.ByteBuffer().serializer().serialize(topic, (Object)buffer);
            }
        }
    }
}

