/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import kafka.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.tests.SmokeTestUtil;
import scala.collection.immutable.List;

public class EosTestDriver
extends SmokeTestUtil {
    private static final int MAX_NUMBER_OF_KEYS = 100;
    private static final long MAX_IDLE_TIME_MS = 300000L;
    private static boolean isRunning = true;

    static void generate(String kafka) throws Exception {
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                isRunning = false;
            }
        });
        Properties producerProps = new Properties();
        producerProps.put("client.id", "EosTest");
        producerProps.put("bootstrap.servers", kafka);
        producerProps.put("key.serializer", StringSerializer.class);
        producerProps.put("value.serializer", IntegerSerializer.class);
        producerProps.put("enable.idempotence", (Object)true);
        KafkaProducer producer = new KafkaProducer(producerProps);
        Random rand = new Random(System.currentTimeMillis());
        int numRecordsProduced = 0;
        while (isRunning) {
            String key = "" + rand.nextInt(100);
            int value = rand.nextInt(10000);
            ProducerRecord record = new ProducerRecord("data", (Object)key, (Object)value);
            producer.send(record, new Callback(){

                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        exception.printStackTrace();
                        Exit.exit((int)1);
                    }
                }
            });
            if (++numRecordsProduced % 1000 == 0) {
                System.out.println(numRecordsProduced + " records produced");
            }
            Utils.sleep((long)rand.nextInt(50));
        }
        producer.close();
        System.out.println(numRecordsProduced + " records produced");
    }

    public static void verify(String kafka, boolean withRepartitioning) {
        Map<String, Map<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>>> outputRecordsPerTopicPerPartition;
        java.util.List<TopicPartition> partitions;
        Throwable throwable;
        KafkaConsumer consumer;
        Map<String, Map<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>>> inputRecordsPerTopicPerPartition;
        String[] allOutputTopics;
        String[] allInputTopics;
        EosTestDriver.ensureStreamsApplicationDown(kafka);
        Map<TopicPartition, Long> committedOffsets = EosTestDriver.getCommittedOffsets(kafka, withRepartitioning);
        Properties props = new Properties();
        props.put("client.id", "verifier");
        props.put("bootstrap.servers", kafka);
        props.put("key.deserializer", ByteArrayDeserializer.class);
        props.put("value.deserializer", ByteArrayDeserializer.class);
        props.put("isolation.level", IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
        if (withRepartitioning) {
            allInputTopics = new String[]{"data", "repartition"};
            allOutputTopics = new String[]{"echo", "min", "sum", "repartition", "max", "cnt"};
        } else {
            allInputTopics = new String[]{"data"};
            allOutputTopics = new String[]{"echo", "min", "sum"};
        }
        try (KafkaConsumer consumer2 = new KafkaConsumer(props);){
            java.util.List<TopicPartition> partitions2 = EosTestDriver.getAllPartitions(consumer2, allInputTopics);
            consumer2.assign(partitions2);
            consumer2.seekToBeginning(partitions2);
            inputRecordsPerTopicPerPartition = EosTestDriver.getRecords((KafkaConsumer<byte[], byte[]>)consumer2, committedOffsets, withRepartitioning, true);
        }
        catch (Exception e) {
            e.printStackTrace(System.err);
            System.out.println("FAILED");
            return;
        }
        try {
            consumer = new KafkaConsumer(props);
            throwable = null;
            try {
                partitions = EosTestDriver.getAllPartitions(consumer, allOutputTopics);
                consumer.assign(partitions);
                consumer.seekToBeginning(partitions);
                outputRecordsPerTopicPerPartition = EosTestDriver.getRecords((KafkaConsumer<byte[], byte[]>)consumer, consumer.endOffsets(partitions), withRepartitioning, false);
            }
            catch (Throwable x2) {
                throwable = x2;
                throw x2;
            }
            finally {
                if (consumer != null) {
                    if (throwable != null) {
                        try {
                            consumer.close();
                        }
                        catch (Throwable x2) {
                            throwable.addSuppressed(x2);
                        }
                    } else {
                        consumer.close();
                    }
                }
            }
        }
        catch (Exception e) {
            e.printStackTrace(System.err);
            System.out.println("FAILED");
            return;
        }
        EosTestDriver.verifyReceivedAllRecords(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("echo"));
        if (withRepartitioning) {
            EosTestDriver.verifyReceivedAllRecords(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("repartition"));
        }
        EosTestDriver.verifyMin(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("min"));
        EosTestDriver.verifySum(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("sum"));
        if (withRepartitioning) {
            EosTestDriver.verifyMax(inputRecordsPerTopicPerPartition.get("repartition"), outputRecordsPerTopicPerPartition.get("max"));
            EosTestDriver.verifyCnt(inputRecordsPerTopicPerPartition.get("repartition"), outputRecordsPerTopicPerPartition.get("cnt"));
        }
        try {
            consumer = new KafkaConsumer(props);
            throwable = null;
            try {
                partitions = EosTestDriver.getAllPartitions(consumer, allOutputTopics);
                consumer.assign(partitions);
                consumer.seekToBeginning(partitions);
                EosTestDriver.verifyAllTransactionFinished((KafkaConsumer<byte[], byte[]>)consumer, kafka, withRepartitioning);
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (consumer != null) {
                    if (throwable != null) {
                        try {
                            consumer.close();
                        }
                        catch (Throwable x2) {
                            throwable.addSuppressed(x2);
                        }
                    } else {
                        consumer.close();
                    }
                }
            }
        }
        catch (Exception e) {
            e.printStackTrace(System.err);
            System.out.println("FAILED");
            return;
        }
        System.out.println("ALL-RECORDS-DELIVERED");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void ensureStreamsApplicationDown(String kafka) {
        try (AdminClient adminClient = null;){
            adminClient = AdminClient.createSimplePlaintext((String)kafka);
            long maxWaitTime = System.currentTimeMillis() + 300000L;
            while (!((List)adminClient.describeConsumerGroup("EosTest", 10000L).consumers().get()).isEmpty()) {
                if (System.currentTimeMillis() > maxWaitTime) {
                    throw new RuntimeException("Streams application not down after 30 seconds.");
                }
                EosTestDriver.sleep(1000L);
            }
        }
    }

    private static Map<TopicPartition, Long> getCommittedOffsets(String kafka, boolean withRepartitioning) {
        Properties props = new Properties();
        props.put("bootstrap.servers", kafka);
        props.put("group.id", "EosTest");
        props.put("client.id", "OffsetsClient");
        props.put("key.deserializer", ByteArrayDeserializer.class);
        props.put("value.deserializer", ByteArrayDeserializer.class);
        HashMap<TopicPartition, Long> committedOffsets = new HashMap<TopicPartition, Long>();
        try (KafkaConsumer consumer = new KafkaConsumer(props);){
            HashSet<String> topics = new HashSet<String>();
            topics.add("data");
            if (withRepartitioning) {
                topics.add("repartition");
            }
            consumer.subscribe(topics);
            consumer.poll(0L);
            HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
            for (String topic : topics) {
                for (PartitionInfo partition : consumer.partitionsFor(topic)) {
                    partitions.add(new TopicPartition(partition.topic(), partition.partition()));
                }
            }
            for (TopicPartition tp : partitions) {
                long offset = consumer.position(tp);
                committedOffsets.put(tp, offset);
            }
        }
        return committedOffsets;
    }

    private static Map<String, Map<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>>> getRecords(KafkaConsumer<byte[], byte[]> consumer, Map<TopicPartition, Long> readEndOffsets, boolean withRepartitioning, boolean isInputTopic) {
        HashMap<String, Map<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = new HashMap<String, Map<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>>>();
        long maxWaitTime = System.currentTimeMillis() + 300000L;
        boolean allRecordsReceived = false;
        while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) {
            ConsumerRecords receivedRecords = consumer.poll(500L);
            for (ConsumerRecord record : receivedRecords) {
                maxWaitTime = System.currentTimeMillis() + 300000L;
                TopicPartition tp = new TopicPartition(record.topic(), record.partition());
                long readEndOffset = readEndOffsets.get(tp);
                if (record.offset() < readEndOffset) {
                    EosTestDriver.addRecord((ConsumerRecord<byte[], byte[]>)record, recordPerTopicPerPartition, withRepartitioning);
                } else if (!isInputTopic) {
                    throw new RuntimeException("FAIL: did receive more records than expected for " + tp + " (expected EOL offset: " + readEndOffset + "; current offset: " + record.offset());
                }
                if (consumer.position(tp) < readEndOffset) continue;
                consumer.pause(Collections.singletonList(tp));
            }
            allRecordsReceived = consumer.paused().size() == readEndOffsets.keySet().size();
        }
        if (!allRecordsReceived) {
            throw new RuntimeException("FAIL: did not receive all records after 30 sec idle time.");
        }
        return recordPerTopicPerPartition;
    }

    private static void addRecord(ConsumerRecord<byte[], byte[]> record, Map<String, Map<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition, boolean withRepartitioning) {
        java.util.List<ConsumerRecord<byte[], byte[]>> records;
        String topic = record.topic();
        TopicPartition partition = new TopicPartition(topic, record.partition());
        if (EosTestDriver.verifyTopic(topic, withRepartitioning)) {
            Map<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>> topicRecordsPerPartition = recordPerTopicPerPartition.get(topic);
            if (topicRecordsPerPartition == null) {
                topicRecordsPerPartition = new HashMap<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>>();
                recordPerTopicPerPartition.put(topic, topicRecordsPerPartition);
            }
            if ((records = topicRecordsPerPartition.get(partition)) == null) {
                records = new ArrayList<ConsumerRecord<byte[], byte[]>>();
                topicRecordsPerPartition.put(partition, records);
            }
        } else {
            throw new RuntimeException("FAIL: received data from unexpected topic: " + record);
        }
        records.add(record);
    }

    private static boolean verifyTopic(String topic, boolean withRepartitioning) {
        boolean validTopic;
        boolean bl = validTopic = "data".equals(topic) || "echo".equals(topic) || "min".equals(topic) || "sum".equals(topic);
        if (withRepartitioning) {
            return validTopic || "repartition".equals(topic) || "max".equals(topic) || "cnt".equals(topic);
        }
        return validTopic;
    }

    private static void verifyReceivedAllRecords(Map<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>> expectedRecords, Map<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>> receivedRecords) {
        if (expectedRecords.size() != receivedRecords.size()) {
            throw new RuntimeException("Result verification failed. Received " + receivedRecords.size() + " records but expected " + expectedRecords.size());
        }
        StringDeserializer stringDeserializer = new StringDeserializer();
        IntegerDeserializer integerDeserializer = new IntegerDeserializer();
        for (Map.Entry<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>> partitionRecords : receivedRecords.entrySet()) {
            TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
            Iterator<ConsumerRecord<byte[], byte[]>> expectedRecord = expectedRecords.get(inputTopicPartition).iterator();
            for (ConsumerRecord<byte[], byte[]> receivedRecord : partitionRecords.getValue()) {
                ConsumerRecord<byte[], byte[]> expected = expectedRecord.next();
                String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.key());
                int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.value());
                String expectedKey = stringDeserializer.deserialize(expected.topic(), (byte[])expected.key());
                int expectedValue = integerDeserializer.deserialize(expected.topic(), (byte[])expected.value());
                if (receivedKey.equals(expectedKey) && receivedValue == expectedValue) continue;
                throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + expectedKey + "," + expectedValue + "> but was <" + receivedKey + "," + receivedValue + ">");
            }
        }
    }

    private static void verifyMin(Map<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition, Map<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
        StringDeserializer stringDeserializer = new StringDeserializer();
        IntegerDeserializer integerDeserializer = new IntegerDeserializer();
        HashMap<String, Integer> currentMinPerKey = new HashMap<String, Integer>();
        for (Map.Entry<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
            TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
            java.util.List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
            java.util.List<ConsumerRecord<byte[], byte[]>> partitionMin = partitionRecords.getValue();
            if (partitionInput.size() != partitionMin.size()) {
                throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " + partitionRecords.getKey() + " but received " + partitionMin.size());
            }
            Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
            for (ConsumerRecord<byte[], byte[]> receivedRecord : partitionMin) {
                ConsumerRecord<byte[], byte[]> input = inputRecords.next();
                String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.key());
                int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.value());
                String key = stringDeserializer.deserialize(input.topic(), (byte[])input.key());
                int value = integerDeserializer.deserialize(input.topic(), (byte[])input.value());
                Integer min = (Integer)currentMinPerKey.get(key);
                min = min == null ? Integer.valueOf(value) : Integer.valueOf(Math.min(min, value));
                currentMinPerKey.put(key, min);
                if (receivedKey.equals(key) && receivedValue == min) continue;
                throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + min + "> but was <" + receivedKey + "," + receivedValue + ">");
            }
        }
    }

    private static void verifySum(Map<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition, Map<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
        StringDeserializer stringDeserializer = new StringDeserializer();
        IntegerDeserializer integerDeserializer = new IntegerDeserializer();
        LongDeserializer longDeserializer = new LongDeserializer();
        HashMap<String, Long> currentSumPerKey = new HashMap<String, Long>();
        for (Map.Entry<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
            TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
            java.util.List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
            java.util.List<ConsumerRecord<byte[], byte[]>> partitionSum = partitionRecords.getValue();
            if (partitionInput.size() != partitionSum.size()) {
                throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " + partitionRecords.getKey() + " but received " + partitionSum.size());
            }
            Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
            for (ConsumerRecord<byte[], byte[]> receivedRecord : partitionSum) {
                ConsumerRecord<byte[], byte[]> input = inputRecords.next();
                String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.key());
                long receivedValue = longDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.value());
                String key = stringDeserializer.deserialize(input.topic(), (byte[])input.key());
                int value = integerDeserializer.deserialize(input.topic(), (byte[])input.value());
                Long sum = (Long)currentSumPerKey.get(key);
                sum = sum == null ? Long.valueOf(value) : Long.valueOf(sum + (long)value);
                currentSumPerKey.put(key, sum);
                if (receivedKey.equals(key) && receivedValue == sum) continue;
                throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + sum + "> but was <" + receivedKey + "," + receivedValue + ">");
            }
        }
    }

    private static void verifyMax(Map<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition, Map<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>> maxPerTopicPerPartition) {
        StringDeserializer stringDeserializer = new StringDeserializer();
        IntegerDeserializer integerDeserializer = new IntegerDeserializer();
        HashMap<String, Integer> currentMinPerKey = new HashMap<String, Integer>();
        for (Map.Entry<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>> partitionRecords : maxPerTopicPerPartition.entrySet()) {
            TopicPartition inputTopicPartition = new TopicPartition("repartition", partitionRecords.getKey().partition());
            java.util.List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
            java.util.List<ConsumerRecord<byte[], byte[]>> partitionMax = partitionRecords.getValue();
            if (partitionInput.size() != partitionMax.size()) {
                throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " + partitionRecords.getKey() + " but received " + partitionMax.size());
            }
            Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
            for (ConsumerRecord<byte[], byte[]> receivedRecord : partitionMax) {
                ConsumerRecord<byte[], byte[]> input = inputRecords.next();
                String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.key());
                int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.value());
                String key = stringDeserializer.deserialize(input.topic(), (byte[])input.key());
                int value = integerDeserializer.deserialize(input.topic(), (byte[])input.value());
                Integer max = (Integer)currentMinPerKey.get(key);
                if (max == null) {
                    max = Integer.MIN_VALUE;
                }
                max = Math.max(max, value);
                currentMinPerKey.put(key, max);
                if (receivedKey.equals(key) && receivedValue == max) continue;
                throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + max + "> but was <" + receivedKey + "," + receivedValue + ">");
            }
        }
    }

    private static void verifyCnt(Map<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition, Map<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>> cntPerTopicPerPartition) {
        StringDeserializer stringDeserializer = new StringDeserializer();
        IntegerDeserializer integerDeserializer = new IntegerDeserializer();
        LongDeserializer longDeserializer = new LongDeserializer();
        HashMap<String, Long> currentSumPerKey = new HashMap<String, Long>();
        for (Map.Entry<TopicPartition, java.util.List<ConsumerRecord<byte[], byte[]>>> partitionRecords : cntPerTopicPerPartition.entrySet()) {
            TopicPartition inputTopicPartition = new TopicPartition("repartition", partitionRecords.getKey().partition());
            java.util.List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
            java.util.List<ConsumerRecord<byte[], byte[]>> partitionCnt = partitionRecords.getValue();
            if (partitionInput.size() != partitionCnt.size()) {
                throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " + partitionRecords.getKey() + " but received " + partitionCnt.size());
            }
            Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
            for (ConsumerRecord<byte[], byte[]> receivedRecord : partitionCnt) {
                ConsumerRecord<byte[], byte[]> input = inputRecords.next();
                String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.key());
                long receivedValue = longDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.value());
                String key = stringDeserializer.deserialize(input.topic(), (byte[])input.key());
                Long cnt = (Long)currentSumPerKey.get(key);
                if (cnt == null) {
                    cnt = 0L;
                }
                cnt = cnt + 1L;
                currentSumPerKey.put(key, cnt);
                if (receivedKey.equals(key) && receivedValue == cnt) continue;
                throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + cnt + "> but was <" + receivedKey + "," + receivedValue + ">");
            }
        }
    }

    private static void verifyAllTransactionFinished(KafkaConsumer<byte[], byte[]> consumer, String kafka, boolean withRepartitioning) {
        String[] topics = withRepartitioning ? new String[]{"echo", "min", "sum", "repartition", "max", "min"} : new String[]{"echo", "min", "sum"};
        java.util.List<TopicPartition> partitions = EosTestDriver.getAllPartitions(consumer, topics);
        consumer.assign(partitions);
        consumer.seekToEnd(partitions);
        consumer.poll(0L);
        Properties producerProps = new Properties();
        producerProps.put("client.id", "VerifyProducer");
        producerProps.put("bootstrap.servers", kafka);
        producerProps.put("key.serializer", StringSerializer.class);
        producerProps.put("value.serializer", StringSerializer.class);
        producerProps.put("enable.idempotence", (Object)true);
        try (KafkaProducer producer = new KafkaProducer(producerProps);){
            for (TopicPartition tp : partitions) {
                ProducerRecord record = new ProducerRecord(tp.topic(), Integer.valueOf(tp.partition()), (Object)"key", (Object)"value");
                producer.send(record, new Callback(){

                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception != null) {
                            exception.printStackTrace();
                            Exit.exit((int)1);
                        }
                    }
                });
            }
        }
        StringDeserializer stringDeserializer = new StringDeserializer();
        long maxWaitTime = System.currentTimeMillis() + 300000L;
        while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
            ConsumerRecords records = consumer.poll(100L);
            for (ConsumerRecord record : records) {
                String topic = record.topic();
                TopicPartition tp = new TopicPartition(topic, record.partition());
                try {
                    String key = stringDeserializer.deserialize(topic, (byte[])record.key());
                    String value = stringDeserializer.deserialize(topic, (byte[])record.value());
                    if ("key".equals(key) && "value".equals(value) && partitions.remove(tp)) continue;
                    throw new RuntimeException("Post transactions verification failed. Received unexpected verification record: Expected record <'key','value'> from one of " + partitions + " but got" + " <" + key + "," + value + "> [" + record.topic() + ", " + record.partition() + "]");
                }
                catch (SerializationException e) {
                    throw new RuntimeException("Post transactions verification failed. Received unexpected verification record: Expected record <'key','value'> from one of " + partitions + " but got " + record, e);
                }
            }
        }
        if (!partitions.isEmpty()) {
            throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last 30 sec.");
        }
    }

    private static java.util.List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer, String ... topics) {
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
        for (String topic : topics) {
            for (PartitionInfo info : consumer.partitionsFor(topic)) {
                partitions.add(new TopicPartition(info.topic(), info.partition()));
            }
        }
        return partitions;
    }
}

