/*
 * Decompiled with CFR 0.152.
 */
package io.trino.testing.kafka;

import com.google.common.collect.ImmutableMap;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Futures;
import io.airlift.log.Logger;
import io.trino.testing.kafka.JsonSerializer;
import io.trino.testing.kafka.NumberPartitioner;
import io.trino.testing.kafka.PrintingLogConsumer;
import java.io.Closeable;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.UncheckedIOException;
import java.nio.charset.Charset;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.stream.Stream;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.Policy;
import net.jodah.failsafe.RetryPolicy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.utility.DockerImageName;

public final class TestingKafka
implements Closeable {
    private static final Logger log = Logger.get(TestingKafka.class);
    private static final String DEFAULT_CONFLUENT_PLATFORM_VERSION = "5.5.2";
    private static final int SCHEMA_REGISTRY_PORT = 8081;
    private static final DockerImageName KAFKA_IMAGE_NAME = DockerImageName.parse((String)"confluentinc/cp-kafka");
    private static final DockerImageName SCHEMA_REGISTRY_IMAGE_NAME = DockerImageName.parse((String)"confluentinc/cp-schema-registry");
    private final Network network;
    private final KafkaContainer kafka;
    private final GenericContainer<?> schemaRegistry;
    private final boolean withSchemaRegistry;
    private final Closer closer = Closer.create();

    public static TestingKafka create() {
        return TestingKafka.create(DEFAULT_CONFLUENT_PLATFORM_VERSION);
    }

    public static TestingKafka create(String confluentPlatformVersions) {
        return new TestingKafka(confluentPlatformVersions, false);
    }

    public static TestingKafka createWithSchemaRegistry() {
        return new TestingKafka(DEFAULT_CONFLUENT_PLATFORM_VERSION, true);
    }

    private TestingKafka(String confluentPlatformVersion, boolean withSchemaRegistry) {
        this.withSchemaRegistry = withSchemaRegistry;
        this.network = Network.newNetwork();
        this.closer.register(() -> ((Network)this.network).close());
        this.kafka = (KafkaContainer)((KafkaContainer)new KafkaContainer(KAFKA_IMAGE_NAME.withTag(confluentPlatformVersion)).withNetwork(this.network)).withNetworkAliases(new String[]{"kafka"});
        this.schemaRegistry = new GenericContainer(SCHEMA_REGISTRY_IMAGE_NAME.withTag(confluentPlatformVersion)).withNetwork(this.network).withNetworkAliases(new String[]{"schema-registry"}).withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://kafka:9092").withEnv("SCHEMA_REGISTRY_HOST_NAME", "0.0.0.0").withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081").withEnv("SCHEMA_REGISTRY_HEAP_OPTS", "-Xmx1G").withExposedPorts(new Integer[]{8081}).dependsOn(new Startable[]{this.kafka});
        this.closer.register(() -> ((KafkaContainer)this.kafka).stop());
        this.closer.register(() -> this.schemaRegistry.stop());
        try {
            PrintStream out = new PrintStream((OutputStream)new FileOutputStream(FileDescriptor.out), true, Charset.defaultCharset().name());
            this.kafka.withLogConsumer((Consumer)((Object)new PrintingLogConsumer(out, String.format("%-20s| ", "kafka"))));
            this.schemaRegistry.withLogConsumer((Consumer)((Object)new PrintingLogConsumer(out, String.format("%-20s| ", "schema-registry"))));
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public void start() {
        this.kafka.start();
        if (this.withSchemaRegistry) {
            this.schemaRegistry.start();
        }
    }

    @Override
    public void close() throws IOException {
        this.closer.close();
    }

    public void createTopic(String topic) {
        this.createTopic(2, 1, topic);
    }

    private void createTopic(int partitions, int replication, String topic) {
        try {
            ArrayList<String> command = new ArrayList<String>();
            command.add("kafka-topics");
            command.add("--partitions");
            command.add(Integer.toString(partitions));
            command.add("--replication-factor");
            command.add(Integer.toString(replication));
            command.add("--topic");
            command.add(topic);
            this.kafka.execInContainer(command.toArray(new String[0]));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void createTopicWithConfig(int partitions, int replication, String topic, boolean enableLogAppendTime) {
        try {
            ArrayList<String> command = new ArrayList<String>();
            command.add("kafka-topics");
            command.add("--create");
            command.add("--topic");
            command.add(topic);
            command.add("--partitions");
            command.add(Integer.toString(partitions));
            command.add("--replication-factor");
            command.add(Integer.toString(replication));
            command.add("--zookeeper");
            command.add("localhost:2181");
            if (enableLogAppendTime) {
                command.add("--config");
                command.add("message.timestamp.type=LogAppendTime");
            }
            this.kafka.execInContainer(command.toArray(new String[0]));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public <K, V> RecordMetadata sendMessages(Stream<ProducerRecord<K, V>> recordStream) {
        return this.sendMessages(recordStream, (Map<String, String>)ImmutableMap.of());
    }

    public <K, V> RecordMetadata sendMessages(Stream<ProducerRecord<K, V>> recordStream, Map<String, String> extraProducerProperties) {
        RecordMetadata recordMetadata;
        block9: {
            KafkaProducer producer = this.createProducer(extraProducerProperties);
            try {
                Future future = recordStream.map(record -> this.send(producer, (ProducerRecord)record)).reduce((first, second) -> second).orElse((Future)Futures.immediateFuture(null));
                producer.flush();
                recordMetadata = (RecordMetadata)future.get();
                if (producer == null) break block9;
            }
            catch (Throwable throwable) {
                try {
                    if (producer != null) {
                        try {
                            producer.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
                catch (ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }
            producer.close();
        }
        return recordMetadata;
    }

    private <K, V> Future<RecordMetadata> send(KafkaProducer<K, V> producer, ProducerRecord<K, V> record) {
        return (Future)Failsafe.with((Policy[])new RetryPolicy[]{new RetryPolicy().onRetry(event -> log.warn(event.getLastFailure(), "Retrying message send")).withMaxAttempts(10).withBackoff(1L, 10000L, ChronoUnit.MILLIS)}).get(() -> producer.send(record));
    }

    public String getConnectString() {
        return this.kafka.getContainerIpAddress() + ":" + this.kafka.getMappedPort(9093);
    }

    private <K, V> KafkaProducer<K, V> createProducer(Map<String, String> extraProperties) {
        HashMap<String, String> properties = new HashMap<String, String>(extraProperties);
        properties.putIfAbsent("bootstrap.servers", this.getConnectString());
        properties.putIfAbsent("key.serializer", LongSerializer.class.getName());
        properties.putIfAbsent("value.serializer", JsonSerializer.class.getName());
        properties.putIfAbsent("partitioner.class", NumberPartitioner.class.getName());
        properties.putIfAbsent("acks", "1");
        return new KafkaProducer(TestingKafka.toProperties(properties));
    }

    private static Properties toProperties(Map<String, String> map) {
        Properties properties = new Properties();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            properties.setProperty(entry.getKey(), entry.getValue());
        }
        return properties;
    }

    public String getSchemaRegistryConnectString() {
        return "http://" + this.schemaRegistry.getContainerIpAddress() + ":" + this.schemaRegistry.getMappedPort(8081);
    }

    public Network getNetwork() {
        return this.network;
    }
}

