/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.kafka.embedded;

import io.atleon.kafka.embedded.EmbeddedKafkaConfig;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer;
import org.apache.kafka.common.utils.Time;

public final class EmbeddedKafka {
    public static final int DEFAULT_PORT = 9092;
    private static EmbeddedKafkaConfig config;

    private EmbeddedKafka() {
    }

    public static String startAndGetBootstrapServersConnect() {
        return EmbeddedKafka.start(9092).getConnect();
    }

    public static String startAndGetBootstrapServersConnect(int port) {
        return EmbeddedKafka.start(port).getConnect();
    }

    public static EmbeddedKafkaConfig start() {
        return EmbeddedKafka.start(9092);
    }

    public static synchronized EmbeddedKafkaConfig start(int port) {
        return config == null ? (config = EmbeddedKafka.initializeKafka(port)) : config;
    }

    private static EmbeddedKafkaConfig initializeKafka(int port) {
        KafkaConfig kafkaConfig = new KafkaConfig(EmbeddedKafka.createKafkaBrokerConfig(port), true);
        EmbeddedKafka.startLocalKafka(kafkaConfig);
        return EmbeddedKafkaConfig.fromKafkaConfig(kafkaConfig);
    }

    private static Map<String, Object> createKafkaBrokerConfig(int port) {
        int controllerPort = port + 1;
        HashMap<String, Object> kafkaBrokerConfig = new HashMap<String, Object>();
        kafkaBrokerConfig.put("offsets.topic.replication.factor", "1");
        kafkaBrokerConfig.put("transaction.state.log.replication.factor", "1");
        kafkaBrokerConfig.put("transaction.state.log.min.isr", "1");
        kafkaBrokerConfig.put("advertised.listeners", "PLAINTEXT://localhost:" + port);
        kafkaBrokerConfig.put("listener.security.protocol.map", "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
        kafkaBrokerConfig.put("listeners", "PLAINTEXT://localhost:" + port + ",CONTROLLER://localhost:" + controllerPort);
        kafkaBrokerConfig.put("controller.quorum.voters", "1@localhost:" + controllerPort);
        kafkaBrokerConfig.put("controller.listener.names", "CONTROLLER");
        kafkaBrokerConfig.put("node.id", 1);
        kafkaBrokerConfig.put("process.roles", "broker,controller");
        kafkaBrokerConfig.put("inter.broker.listener.name", "PLAINTEXT");
        kafkaBrokerConfig.put("log.dir", EmbeddedKafka.createLogDirectory().toString());
        kafkaBrokerConfig.put("num.partitions", 10);
        return kafkaBrokerConfig;
    }

    private static Path createLogDirectory() {
        try {
            Path path = Files.createTempDirectory(EmbeddedKafka.class.getSimpleName() + "_" + System.currentTimeMillis(), new FileAttribute[0]);
            EmbeddedKafka.writeMetaProperties(path);
            return path;
        }
        catch (Exception e) {
            throw new IllegalStateException("Could not create temporary log directory", e);
        }
    }

    private static void writeMetaProperties(Path path) {
        try (BufferedWriter writer = Files.newBufferedWriter(path.resolve("meta.properties"), StandardCharsets.UTF_8, new OpenOption[0]);){
            Properties properties = new Properties();
            properties.put("broker.id", "1");
            properties.put("cluster.id", "YCVE88x9RCiCfSDkQ0v9nQ");
            properties.store(writer, "KRaft meta properties");
        }
        catch (IOException e) {
            throw new IllegalStateException("Could not write meta.properties to log directory", e);
        }
    }

    private static void startLocalKafka(KafkaConfig kafkaConfig) {
        try {
            new KafkaRaftServer(kafkaConfig, Time.SYSTEM).startup();
        }
        catch (Exception e) {
            throw new IllegalStateException("Could not start local Kafka Server", e);
        }
    }
}

