/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration.utils;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import kafka.api.Request;
import kafka.server.KafkaServer;
import kafka.server.MetadataCache;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import scala.Option;

public class IntegrationTestUtils {
    public static final long DEFAULT_TIMEOUT = 30000L;
    public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = "internal.leave.group.on.close";

    public static void purgeLocalStreamsState(Properties streamsConfiguration) throws IOException {
        File node;
        String tmpDir = TestUtils.IO_TMP_DIR.getPath();
        String path = streamsConfiguration.getProperty("state.dir");
        if (path != null && (node = Paths.get(path, new String[0]).normalize().toFile()).getAbsolutePath().startsWith(tmpDir)) {
            Utils.delete((File)new File(node.getAbsolutePath()));
        }
    }

    public static <K, V> void produceKeyValuesSynchronously(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Time time) throws ExecutionException, InterruptedException {
        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, time, false);
    }

    public static <K, V> void produceKeyValuesSynchronously(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Headers headers, Time time) throws ExecutionException, InterruptedException {
        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, headers, time, false);
    }

    public static <K, V> void produceKeyValuesSynchronously(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Time time, boolean enableTransactions) throws ExecutionException, InterruptedException {
        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, null, time, enableTransactions);
    }

    public static <K, V> void produceKeyValuesSynchronously(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Headers headers, Time time, boolean enableTransactions) throws ExecutionException, InterruptedException {
        for (KeyValue<K, V> record : records) {
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, Collections.singleton(record), producerConfig, headers, time.milliseconds(), enableTransactions);
            time.sleep(1L);
        }
    }

    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Long timestamp) throws ExecutionException, InterruptedException {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, timestamp, false);
    }

    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Headers headers, Long timestamp) throws ExecutionException, InterruptedException {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, headers, timestamp, false);
    }

    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Long timestamp, boolean enableTransactions) throws ExecutionException, InterruptedException {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, null, timestamp, enableTransactions);
    }

    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Headers headers, Long timestamp, boolean enabledTransactions) throws ExecutionException, InterruptedException {
        try (KafkaProducer producer = new KafkaProducer(producerConfig);){
            if (enabledTransactions) {
                producer.initTransactions();
                producer.beginTransaction();
            }
            for (KeyValue<K, V> record : records) {
                Future f = producer.send(new ProducerRecord(topic, null, timestamp, record.key, record.value, (Iterable)headers));
                f.get();
            }
            if (enabledTransactions) {
                producer.commitTransaction();
            }
            producer.flush();
        }
    }

    public static <K, V> void produceAbortedKeyValuesSynchronouslyWithTimestamp(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Long timestamp) throws ExecutionException, InterruptedException {
        try (KafkaProducer producer = new KafkaProducer(producerConfig);){
            producer.initTransactions();
            for (KeyValue<K, V> record : records) {
                producer.beginTransaction();
                Future f = producer.send(new ProducerRecord(topic, null, timestamp, record.key, record.value));
                f.get();
                producer.abortTransaction();
            }
        }
    }

    public static <V> void produceValuesSynchronously(String topic, Collection<V> records, Properties producerConfig, Time time) throws ExecutionException, InterruptedException {
        IntegrationTestUtils.produceValuesSynchronously(topic, records, producerConfig, time, false);
    }

    public static <V> void produceValuesSynchronously(String topic, Collection<V> records, Properties producerConfig, Time time, boolean enableTransactions) throws ExecutionException, InterruptedException {
        ArrayList keyedRecords = new ArrayList();
        for (V value : records) {
            KeyValue kv = new KeyValue(null, value);
            keyedRecords.add(kv);
        }
        IntegrationTestUtils.produceKeyValuesSynchronously(topic, keyedRecords, producerConfig, time, enableTransactions);
    }

    public static void waitForCompletion(KafkaStreams streams, int expectedPartitions, int timeoutMilliseconds) {
        double totalLag;
        int lagMetrics;
        long start = System.currentTimeMillis();
        do {
            lagMetrics = 0;
            totalLag = 0.0;
            for (Metric metric : streams.metrics().values()) {
                if (!metric.metricName().name().equals("records-lag")) continue;
                ++lagMetrics;
                totalLag += ((Number)metric.metricValue()).doubleValue();
            }
            if (lagMetrics < expectedPartitions || totalLag != 0.0) continue;
            return;
        } while (System.currentTimeMillis() - start < (long)timeoutMilliseconds);
        throw new RuntimeException(String.format("Timed out waiting for completion. lagMetrics=[%s/%s] totalLag=[%s]", lagMetrics, expectedPartitions, totalLag));
    }

    public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(Properties consumerConfig, String topic, int expectedNumRecords) throws InterruptedException {
        return IntegrationTestUtils.waitUntilMinRecordsReceived(consumerConfig, topic, expectedNumRecords, 30000L);
    }

    public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig, String topic, int expectedNumRecords) throws InterruptedException {
        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, 30000L);
    }

    public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig, final String topic, int expectedNumRecords, long waitTime) throws InterruptedException {
        ArrayList<KeyValue<K, V>> accumData = new ArrayList<KeyValue<K, V>>();
        try (KafkaConsumer<K, V> consumer = IntegrationTestUtils.createConsumer(consumerConfig);){
            TestCondition valuesRead = new TestCondition((Consumer)consumer, waitTime, expectedNumRecords, accumData){
                final /* synthetic */ Consumer val$consumer;
                final /* synthetic */ long val$waitTime;
                final /* synthetic */ int val$expectedNumRecords;
                final /* synthetic */ List val$accumData;
                {
                    this.val$consumer = consumer;
                    this.val$waitTime = l;
                    this.val$expectedNumRecords = n;
                    this.val$accumData = list;
                }

                public boolean conditionMet() {
                    List readData = IntegrationTestUtils.readKeyValues(topic, this.val$consumer, this.val$waitTime, this.val$expectedNumRecords);
                    this.val$accumData.addAll(readData);
                    return this.val$accumData.size() >= this.val$expectedNumRecords;
                }
            };
            String conditionDetails = "Did not receive all " + expectedNumRecords + " records from topic " + topic;
            TestUtils.waitForCondition((TestCondition)valuesRead, (long)waitTime, (String)conditionDetails);
        }
        return accumData;
    }

    public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(Properties consumerConfig, final String topic, int expectedNumRecords, long waitTime) throws InterruptedException {
        ArrayList<ConsumerRecord<K, V>> accumData = new ArrayList<ConsumerRecord<K, V>>();
        try (KafkaConsumer<K, V> consumer = IntegrationTestUtils.createConsumer(consumerConfig);){
            TestCondition valuesRead = new TestCondition((Consumer)consumer, waitTime, expectedNumRecords, accumData){
                final /* synthetic */ Consumer val$consumer;
                final /* synthetic */ long val$waitTime;
                final /* synthetic */ int val$expectedNumRecords;
                final /* synthetic */ List val$accumData;
                {
                    this.val$consumer = consumer;
                    this.val$waitTime = l;
                    this.val$expectedNumRecords = n;
                    this.val$accumData = list;
                }

                public boolean conditionMet() {
                    List readData = IntegrationTestUtils.readRecords(topic, this.val$consumer, this.val$waitTime, this.val$expectedNumRecords);
                    this.val$accumData.addAll(readData);
                    return this.val$accumData.size() >= this.val$expectedNumRecords;
                }
            };
            String conditionDetails = "Did not receive all " + expectedNumRecords + " records from topic " + topic;
            TestUtils.waitForCondition((TestCondition)valuesRead, (long)waitTime, (String)conditionDetails);
        }
        return accumData;
    }

    public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties consumerConfig, String topic, int expectedNumRecords) throws InterruptedException {
        return IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig, topic, expectedNumRecords, 30000L);
    }

    public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties consumerConfig, final String topic, int expectedNumRecords, long waitTime) throws InterruptedException {
        ArrayList accumData = new ArrayList();
        try (KafkaConsumer consumer = IntegrationTestUtils.createConsumer(consumerConfig);){
            TestCondition valuesRead = new TestCondition((Consumer)consumer, waitTime, expectedNumRecords, accumData){
                final /* synthetic */ Consumer val$consumer;
                final /* synthetic */ long val$waitTime;
                final /* synthetic */ int val$expectedNumRecords;
                final /* synthetic */ List val$accumData;
                {
                    this.val$consumer = consumer;
                    this.val$waitTime = l;
                    this.val$expectedNumRecords = n;
                    this.val$accumData = list;
                }

                public boolean conditionMet() {
                    List readData = IntegrationTestUtils.readValues(topic, this.val$consumer, this.val$waitTime, this.val$expectedNumRecords);
                    this.val$accumData.addAll(readData);
                    return this.val$accumData.size() >= this.val$expectedNumRecords;
                }
            };
            String conditionDetails = "Did not receive all " + expectedNumRecords + " records from topic " + topic;
            TestUtils.waitForCondition((TestCondition)valuesRead, (long)waitTime, (String)conditionDetails);
        }
        return accumData;
    }

    public static void waitForTopicPartitions(List<KafkaServer> servers, List<TopicPartition> partitions, long timeout) throws InterruptedException {
        long end = System.currentTimeMillis() + timeout;
        for (TopicPartition partition : partitions) {
            long remaining = end - System.currentTimeMillis();
            if (remaining <= 0L) {
                throw new AssertionError((Object)("timed out while waiting for partitions to become available. Timeout=" + timeout));
            }
            IntegrationTestUtils.waitUntilMetadataIsPropagated(servers, partition.topic(), partition.partition(), remaining);
        }
    }

    public static void waitUntilMetadataIsPropagated(final List<KafkaServer> servers, final String topic, final int partition, long timeout) throws InterruptedException {
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                for (KafkaServer server : servers) {
                    MetadataCache metadataCache = server.apis().metadataCache();
                    Option partitionInfo = metadataCache.getPartitionInfo(topic, partition);
                    if (partitionInfo.isEmpty()) {
                        return false;
                    }
                    UpdateMetadataRequest.PartitionState metadataPartitionState = (UpdateMetadataRequest.PartitionState)partitionInfo.get();
                    if (Request.isValidBrokerId((int)metadataPartitionState.basePartitionState.leader)) continue;
                    return false;
                }
                return true;
            }
        }, (long)timeout, (String)("metadata for topic=" + topic + " partition=" + partition + " not propagated to all brokers"));
    }

    public static <V> List<V> readValues(String topic, Properties consumerConfig, long waitTime, int maxMessages) {
        List<V> returnList;
        try (KafkaConsumer consumer = IntegrationTestUtils.createConsumer(consumerConfig);){
            returnList = IntegrationTestUtils.readValues(topic, consumer, waitTime, maxMessages);
        }
        return returnList;
    }

    public static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Properties consumerConfig, long waitTime, int maxMessages) {
        List<KeyValue<K, V>> consumedValues;
        try (KafkaConsumer<K, V> consumer = IntegrationTestUtils.createConsumer(consumerConfig);){
            consumedValues = IntegrationTestUtils.readKeyValues(topic, consumer, waitTime, maxMessages);
        }
        return consumedValues;
    }

    private static <V> List<V> readValues(String topic, Consumer<Object, V> consumer, long waitTime, int maxMessages) {
        ArrayList<Object> returnList = new ArrayList<Object>();
        List<KeyValue<Object, V>> kvs = IntegrationTestUtils.readKeyValues(topic, consumer, waitTime, maxMessages);
        for (KeyValue<Object, V> kv : kvs) {
            returnList.add(kv.value);
        }
        return returnList;
    }

    private static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Consumer<K, V> consumer, long waitTime, int maxMessages) {
        ArrayList<KeyValue<K, V>> consumedValues = new ArrayList<KeyValue<K, V>>();
        List<ConsumerRecord<K, V>> records = IntegrationTestUtils.readRecords(topic, consumer, waitTime, maxMessages);
        for (ConsumerRecord<K, V> record : records) {
            consumedValues.add(new KeyValue(record.key(), record.value()));
        }
        return consumedValues;
    }

    private static <K, V> List<ConsumerRecord<K, V>> readRecords(String topic, Consumer<K, V> consumer, long waitTime, int maxMessages) {
        consumer.subscribe(Collections.singletonList(topic));
        int pollIntervalMs = 100;
        ArrayList<ConsumerRecord<K, V>> consumerRecords = new ArrayList<ConsumerRecord<K, V>>();
        int totalPollTimeMs = 0;
        while ((long)totalPollTimeMs < waitTime && IntegrationTestUtils.continueConsuming(consumerRecords.size(), maxMessages)) {
            totalPollTimeMs += 100;
            ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
            for (ConsumerRecord record : records) {
                consumerRecords.add(record);
            }
        }
        return consumerRecords;
    }

    private static boolean continueConsuming(int messagesConsumed, int maxMessages) {
        return maxMessages <= 0 || messagesConsumed < maxMessages;
    }

    private static <K, V> KafkaConsumer<K, V> createConsumer(Properties consumerConfig) {
        Properties filtered = new Properties();
        filtered.putAll((Map<?, ?>)consumerConfig);
        filtered.setProperty("auto.offset.reset", "earliest");
        filtered.setProperty("enable.auto.commit", "true");
        return new KafkaConsumer(filtered);
    }
}

