/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.tests.SmokeTestClient;
import org.apache.kafka.streams.tests.SmokeTestUtil;
import org.apache.kafka.test.TestUtils;

public class SmokeTestDriver
extends SmokeTestUtil {
    public static final int MAX_RECORD_EMPTY_RETRIES = 60;

    public static void main(String[] args) throws InterruptedException {
        String kafka = "localhost:9092";
        File stateDir = TestUtils.tempDirectory();
        int numKeys = 20;
        int maxRecordsPerKey = 1000;
        Thread driver = new Thread(){

            @Override
            public void run() {
                try {
                    Map<String, Set<Integer>> allData = SmokeTestDriver.generate("localhost:9092", 20, 1000);
                    SmokeTestDriver.verify("localhost:9092", allData, 1000);
                }
                catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        };
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("state.dir", SmokeTestDriver.createDir(stateDir, "1").getAbsolutePath());
        SmokeTestClient streams1 = new SmokeTestClient(props);
        props.put("state.dir", SmokeTestDriver.createDir(stateDir, "2").getAbsolutePath());
        SmokeTestClient streams2 = new SmokeTestClient(props);
        props.put("state.dir", SmokeTestDriver.createDir(stateDir, "3").getAbsolutePath());
        SmokeTestClient streams3 = new SmokeTestClient(props);
        props.put("state.dir", SmokeTestDriver.createDir(stateDir, "4").getAbsolutePath());
        SmokeTestClient streams4 = new SmokeTestClient(props);
        System.out.println("starting the driver");
        driver.start();
        System.out.println("starting the first and second client");
        streams1.start();
        streams2.start();
        SmokeTestDriver.sleep(10000L);
        System.out.println("starting the third client");
        streams3.start();
        System.out.println("closing the first client");
        streams1.close();
        System.out.println("closed the first client");
        SmokeTestDriver.sleep(10000L);
        System.out.println("starting the forth client");
        streams4.start();
        driver.join();
        System.out.println("driver stopped");
        streams2.close();
        streams3.close();
        streams4.close();
        System.out.println("shutdown");
    }

    public static Map<String, Set<Integer>> generate(String kafka, int numKeys, int maxRecordsPerKey) {
        return SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, true);
    }

    public static Map<String, Set<Integer>> generate(String kafka, int numKeys, int maxRecordsPerKey, boolean autoTerminate) {
        Properties producerProps = new Properties();
        producerProps.put("client.id", "SmokeTest");
        producerProps.put("bootstrap.servers", kafka);
        producerProps.put("key.serializer", ByteArraySerializer.class);
        producerProps.put("value.serializer", ByteArraySerializer.class);
        producerProps.put("retries", (Object)Integer.MAX_VALUE);
        producerProps.put("acks", "all");
        producerProps.put("request.timeout.ms", (Object)80000);
        KafkaProducer producer = new KafkaProducer(producerProps);
        int numRecordsProduced = 0;
        HashMap allData = new HashMap();
        ValueList[] data = new ValueList[numKeys];
        for (int i = 0; i < numKeys; ++i) {
            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
            allData.put(data[i].key, new HashSet());
        }
        Random rand = new Random();
        int remaining = 1;
        if (autoTerminate) {
            remaining = data.length;
        }
        ArrayList<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<ProducerRecord<byte[], byte[]>>();
        while (remaining > 0) {
            int index = autoTerminate ? rand.nextInt(remaining) : rand.nextInt(numKeys);
            String key = data[index].key;
            int value = data[index].next();
            if (autoTerminate && value < 0) {
                data[index] = data[--remaining];
                continue;
            }
            ProducerRecord producerRecord = new ProducerRecord("data", (Object)stringSerde.serializer().serialize("", (Object)key), (Object)intSerde.serializer().serialize("", (Object)value));
            producer.send(producerRecord, (Callback)new TestCallback((ProducerRecord<byte[], byte[]>)producerRecord, needRetry));
            ((Set)allData.get(key)).add(value);
            if (++numRecordsProduced % 100 == 0) {
                System.out.println(numRecordsProduced + " records produced");
            }
            Utils.sleep((long)2L);
        }
        producer.flush();
        int remainingRetries = 5;
        while (!needRetry.isEmpty()) {
            ArrayList<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<ProducerRecord<byte[], byte[]>>();
            for (ProducerRecord producerRecord : needRetry) {
                producer.send(producerRecord, (Callback)new TestCallback((ProducerRecord<byte[], byte[]>)producerRecord, needRetry2));
            }
            producer.flush();
            needRetry = needRetry2;
            if (--remainingRetries != 0 || needRetry.isEmpty()) continue;
            System.err.println("Failed to produce all records after multiple retries");
            Exit.exit((int)1);
        }
        producer.close();
        return Collections.unmodifiableMap(allData);
    }

    private static void shuffle(int[] data, int windowSize) {
        Random rand = new Random();
        for (int i = 0; i < data.length; ++i) {
            int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i;
            int tmp = data[i];
            data[i] = data[j];
            data[j] = tmp;
        }
    }

    public static void verify(String kafka, Map<String, Set<Integer>> allData, int maxRecordsPerKey) {
        Properties props = new Properties();
        props.put("client.id", "verifier");
        props.put("bootstrap.servers", kafka);
        props.put("key.deserializer", ByteArrayDeserializer.class);
        props.put("value.deserializer", ByteArrayDeserializer.class);
        KafkaConsumer consumer = new KafkaConsumer(props);
        List<TopicPartition> partitions = SmokeTestDriver.getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg");
        consumer.assign(partitions);
        consumer.seekToBeginning(partitions);
        int recordsGenerated = allData.size() * maxRecordsPerKey;
        int recordsProcessed = 0;
        HashMap<String, Integer> max = new HashMap<String, Integer>();
        HashMap<String, Integer> min = new HashMap<String, Integer>();
        HashMap<String, Integer> dif = new HashMap<String, Integer>();
        HashMap<String, Long> sum = new HashMap<String, Long>();
        HashMap<String, Long> cnt = new HashMap<String, Long>();
        HashMap<String, Double> avg = new HashMap<String, Double>();
        HashMap<String, Object> wcnt = new HashMap<String, Object>();
        HashMap<String, Long> tagg = new HashMap<String, Long>();
        HashSet<String> keys = new HashSet<String>();
        HashMap received = new HashMap();
        for (String key : allData.keySet()) {
            keys.add(key);
            received.put(key, new HashSet());
        }
        int retry = 0;
        long start = System.currentTimeMillis();
        while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6L)) {
            ConsumerRecords records = consumer.poll(Duration.ofMillis(500L));
            if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
                if ((!SmokeTestDriver.verifyMin(min, allData, false) || !SmokeTestDriver.verifyMax(max, allData, false) || !SmokeTestDriver.verifyDif(dif, allData, false) || !SmokeTestDriver.verifySum(sum, allData, false) || !SmokeTestDriver.verifyCnt(cnt, allData, false) || !SmokeTestDriver.verifyAvg(avg, allData, false) || !SmokeTestDriver.verifyTAgg(tagg, allData, false)) && retry++ <= 60) continue;
                break;
            }
            block24: for (ConsumerRecord record : records) {
                String key = (String)stringSerde.deserializer().deserialize("", (byte[])record.key());
                switch (record.topic()) {
                    case "echo": {
                        Integer value = (Integer)intSerde.deserializer().deserialize("", (byte[])record.value());
                        if (++recordsProcessed % 100 == 0) {
                            System.out.println("Echo records processed = " + recordsProcessed);
                        }
                        ((Set)received.get(key)).add(value);
                        continue block24;
                    }
                    case "min": {
                        min.put(key, (Integer)intSerde.deserializer().deserialize("", (byte[])record.value()));
                        continue block24;
                    }
                    case "max": {
                        max.put(key, (Integer)intSerde.deserializer().deserialize("", (byte[])record.value()));
                        continue block24;
                    }
                    case "dif": {
                        dif.put(key, (Integer)intSerde.deserializer().deserialize("", (byte[])record.value()));
                        continue block24;
                    }
                    case "sum": {
                        sum.put(key, (Long)longSerde.deserializer().deserialize("", (byte[])record.value()));
                        continue block24;
                    }
                    case "cnt": {
                        cnt.put(key, (Long)longSerde.deserializer().deserialize("", (byte[])record.value()));
                        continue block24;
                    }
                    case "avg": {
                        avg.put(key, (Double)doubleSerde.deserializer().deserialize("", (byte[])record.value()));
                        continue block24;
                    }
                    case "wcnt": {
                        wcnt.put(key, longSerde.deserializer().deserialize("", (byte[])record.value()));
                        continue block24;
                    }
                    case "tagg": {
                        tagg.put(key, (Long)longSerde.deserializer().deserialize("", (byte[])record.value()));
                        continue block24;
                    }
                }
                System.out.println("unknown topic: " + record.topic());
            }
        }
        consumer.close();
        long finished = System.currentTimeMillis() - start;
        System.out.println("Verification time=" + finished);
        System.out.println("-------------------");
        System.out.println("Result Verification");
        System.out.println("-------------------");
        System.out.println("recordGenerated=" + recordsGenerated);
        System.out.println("recordProcessed=" + recordsProcessed);
        if (recordsProcessed > recordsGenerated) {
            System.out.println("PROCESSED-MORE-THAN-GENERATED");
        } else if (recordsProcessed < recordsGenerated) {
            System.out.println("PROCESSED-LESS-THAN-GENERATED");
        }
        boolean success = allData.equals(received);
        if (success) {
            System.out.println("ALL-RECORDS-DELIVERED");
        } else {
            int missedCount = 0;
            for (Map.Entry entry : allData.entrySet()) {
                missedCount += ((Set)received.get(entry.getKey())).size();
            }
            System.out.println("missedRecords=" + missedCount);
        }
        success &= SmokeTestDriver.verifyMin(min, allData, true);
        success &= SmokeTestDriver.verifyMax(max, allData, true);
        success &= SmokeTestDriver.verifyDif(dif, allData, true);
        success &= SmokeTestDriver.verifySum(sum, allData, true);
        success &= SmokeTestDriver.verifyCnt(cnt, allData, true);
        success &= SmokeTestDriver.verifyAvg(avg, allData, true);
        System.out.println((success &= SmokeTestDriver.verifyTAgg(tagg, allData, true)) ? "SUCCESS" : "FAILURE");
    }

    private static boolean verifyMin(Map<String, Integer> map, Map<String, Set<Integer>> allData, boolean print) {
        if (map.isEmpty()) {
            if (print) {
                System.out.println("min is empty");
            }
            return false;
        }
        if (print) {
            System.out.println("verifying min");
        }
        if (map.size() != allData.size()) {
            if (print) {
                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
            }
            return false;
        }
        for (Map.Entry<String, Integer> entry : map.entrySet()) {
            int expected = SmokeTestDriver.getMin(entry.getKey());
            if (expected == entry.getValue()) continue;
            if (print) {
                System.out.println("fail: key=" + entry.getKey() + " min=" + entry.getValue() + " expected=" + expected);
            }
            return false;
        }
        return true;
    }

    private static boolean verifyMax(Map<String, Integer> map, Map<String, Set<Integer>> allData, boolean print) {
        if (map.isEmpty()) {
            if (print) {
                System.out.println("max is empty");
            }
            return false;
        }
        if (print) {
            System.out.println("verifying max");
        }
        if (map.size() != allData.size()) {
            if (print) {
                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
            }
            return false;
        }
        for (Map.Entry<String, Integer> entry : map.entrySet()) {
            int expected = SmokeTestDriver.getMax(entry.getKey());
            if (expected == entry.getValue()) continue;
            if (print) {
                System.out.println("fail: key=" + entry.getKey() + " max=" + entry.getValue() + " expected=" + expected);
            }
            return false;
        }
        return true;
    }

    private static boolean verifyDif(Map<String, Integer> map, Map<String, Set<Integer>> allData, boolean print) {
        if (map.isEmpty()) {
            if (print) {
                System.out.println("dif is empty");
            }
            return false;
        }
        if (print) {
            System.out.println("verifying dif");
        }
        if (map.size() != allData.size()) {
            if (print) {
                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
            }
            return false;
        }
        for (Map.Entry<String, Integer> entry : map.entrySet()) {
            int min = SmokeTestDriver.getMin(entry.getKey());
            int max = SmokeTestDriver.getMax(entry.getKey());
            int expected = max - min;
            if (entry.getValue() != null && expected == entry.getValue()) continue;
            if (print) {
                System.out.println("fail: key=" + entry.getKey() + " dif=" + entry.getValue() + " expected=" + expected);
            }
            return false;
        }
        return true;
    }

    private static boolean verifyCnt(Map<String, Long> map, Map<String, Set<Integer>> allData, boolean print) {
        if (map.isEmpty()) {
            if (print) {
                System.out.println("cnt is empty");
            }
            return false;
        }
        if (print) {
            System.out.println("verifying cnt");
        }
        if (map.size() != allData.size()) {
            if (print) {
                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
            }
            return false;
        }
        for (Map.Entry<String, Long> entry : map.entrySet()) {
            int min = SmokeTestDriver.getMin(entry.getKey());
            int max = SmokeTestDriver.getMax(entry.getKey());
            long expected = (long)(max - min) + 1L;
            if (expected == entry.getValue()) continue;
            if (print) {
                System.out.println("fail: key=" + entry.getKey() + " cnt=" + entry.getValue() + " expected=" + expected);
            }
            return false;
        }
        return true;
    }

    private static boolean verifySum(Map<String, Long> map, Map<String, Set<Integer>> allData, boolean print) {
        if (map.isEmpty()) {
            if (print) {
                System.out.println("sum is empty");
            }
            return false;
        }
        if (print) {
            System.out.println("verifying sum");
        }
        if (map.size() != allData.size()) {
            if (print) {
                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
            }
            return false;
        }
        for (Map.Entry<String, Long> entry : map.entrySet()) {
            int max;
            int min = SmokeTestDriver.getMin(entry.getKey());
            long expected = ((long)min + (long)(max = SmokeTestDriver.getMax(entry.getKey()))) * ((long)(max - min) + 1L) / 2L;
            if (expected == entry.getValue()) continue;
            if (print) {
                System.out.println("fail: key=" + entry.getKey() + " sum=" + entry.getValue() + " expected=" + expected);
            }
            return false;
        }
        return true;
    }

    private static boolean verifyAvg(Map<String, Double> map, Map<String, Set<Integer>> allData, boolean print) {
        if (map.isEmpty()) {
            if (print) {
                System.out.println("avg is empty");
            }
            return false;
        }
        if (print) {
            System.out.println("verifying avg");
        }
        if (map.size() != allData.size()) {
            if (print) {
                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
            }
            return false;
        }
        for (Map.Entry<String, Double> entry : map.entrySet()) {
            int min = SmokeTestDriver.getMin(entry.getKey());
            int max = SmokeTestDriver.getMax(entry.getKey());
            double expected = (double)((long)min + (long)max) / 2.0;
            if (entry.getValue() != null && expected == entry.getValue()) continue;
            if (print) {
                System.out.println("fail: key=" + entry.getKey() + " avg=" + entry.getValue() + " expected=" + expected);
            }
            return false;
        }
        return true;
    }

    private static boolean verifyTAgg(Map<String, Long> map, Map<String, Set<Integer>> allData, boolean print) {
        if (map.isEmpty()) {
            if (print) {
                System.out.println("tagg is empty");
            }
            return false;
        }
        if (print) {
            System.out.println("verifying tagg");
        }
        HashMap<String, Long> expected = new HashMap<String, Long>();
        for (String string : allData.keySet()) {
            int min = SmokeTestDriver.getMin(string);
            int max = SmokeTestDriver.getMax(string);
            String cnt = Long.toString((long)(max - min) + 1L);
            if (expected.containsKey(cnt)) {
                expected.put(cnt, (Long)expected.get(cnt) + 1L);
                continue;
            }
            expected.put(cnt, 1L);
        }
        for (Map.Entry entry : map.entrySet()) {
            String key = (String)entry.getKey();
            Long expectedCount = (Long)expected.remove(key);
            if (expectedCount == null) {
                expectedCount = 0L;
            }
            if (entry.getValue() == expectedCount) continue;
            if (print) {
                System.out.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expected.get(key));
            }
            return false;
        }
        return true;
    }

    private static int getMin(String key) {
        return Integer.parseInt(key.split("-")[0]);
    }

    private static int getMax(String key) {
        return Integer.parseInt(key.split("-")[1]);
    }

    private static int getMinFromWKey(String key) {
        return SmokeTestDriver.getMin(key.split("@")[0]);
    }

    private static int getMaxFromWKey(String key) {
        return SmokeTestDriver.getMax(key.split("@")[0]);
    }

    private static long getStartFromWKey(String key) {
        return Long.parseLong(key.split("@")[1]);
    }

    private static List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer, String ... topics) {
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
        for (String topic : topics) {
            for (PartitionInfo info : consumer.partitionsFor(topic)) {
                partitions.add(new TopicPartition(info.topic(), info.partition()));
            }
        }
        return partitions;
    }

    private static class TestCallback
    implements Callback {
        private final ProducerRecord<byte[], byte[]> originalRecord;
        private final List<ProducerRecord<byte[], byte[]>> needRetry;

        TestCallback(ProducerRecord<byte[], byte[]> originalRecord, List<ProducerRecord<byte[], byte[]>> needRetry) {
            this.originalRecord = originalRecord;
            this.needRetry = needRetry;
        }

        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception != null) {
                if (exception instanceof TimeoutException) {
                    this.needRetry.add(this.originalRecord);
                } else {
                    exception.printStackTrace();
                    Exit.exit((int)1);
                }
            }
        }
    }

    private static class ValueList {
        public final String key;
        private final int[] values;
        private int index;

        ValueList(int min, int max) {
            this.key = min + "-" + max;
            this.values = new int[max - min + 1];
            for (int i = 0; i < this.values.length; ++i) {
                this.values[i] = min + i;
            }
            SmokeTestDriver.shuffle(this.values, 10);
            this.index = 0;
        }

        int next() {
            return this.index < this.values.length ? this.values[this.index++] : -1;
        }
    }
}

