/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.testing.system.tools.kafka;

import io.debezium.testing.system.tools.OpenShiftUtils;
import io.debezium.testing.system.tools.WaitConditions;
import io.debezium.testing.system.tools.YAML;
import io.debezium.testing.system.tools.kafka.KafkaController;
import io.debezium.testing.system.tools.kafka.StrimziOperatorController;
import io.debezium.testing.system.tools.kafka.builders.FabricKafkaConnectBuilder;
import io.fabric8.kubernetes.api.model.Secret;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.openshift.client.OpenShiftClient;
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.KafkaList;
import io.strimzi.api.kafka.KafkaTopicList;
import io.strimzi.api.kafka.model.Kafka;
import io.strimzi.api.kafka.model.KafkaTopic;
import io.strimzi.api.kafka.model.status.ListenerAddress;
import io.strimzi.api.kafka.model.status.ListenerStatus;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.Base64;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OcpKafkaController
implements KafkaController {
    private static final Logger LOGGER = LoggerFactory.getLogger(OcpKafkaController.class);
    private final OpenShiftClient ocp;
    private final String project;
    private final String name;
    private final StrimziOperatorController operatorController;
    private Kafka kafka;

    public OcpKafkaController(Kafka kafka, StrimziOperatorController operatorController, OpenShiftClient ocp) {
        this.kafka = kafka;
        this.name = kafka.getMetadata().getName();
        this.ocp = ocp;
        this.project = kafka.getMetadata().getNamespace();
        this.operatorController = operatorController;
    }

    @Override
    public String getPublicBootstrapAddress() {
        if (OpenShiftUtils.isRunningFromOcp()) {
            return this.getBootstrapAddress();
        }
        List listeners = this.kafka.getStatus().getListeners();
        ListenerStatus listener = listeners.stream().filter(l -> l.getType().equalsIgnoreCase("external")).findAny().orElseThrow(() -> new IllegalStateException("No external listener found for Kafka cluster " + this.kafka.getMetadata().getName()));
        ListenerAddress address = (ListenerAddress)listener.getAddresses().get(0);
        return address.getHost() + ":" + address.getPort();
    }

    @Override
    public String getBootstrapAddress() {
        return this.name + "-kafka-bootstrap." + this.project + ".svc.cluster.local:9092";
    }

    public String getLocalBootstrapAddress() {
        return this.name + "-kafka-bootstrap:9093";
    }

    public KafkaTopic deployTopic(String yamlPath) throws InterruptedException {
        LOGGER.info("Deploying Kafka topic from " + yamlPath);
        KafkaTopic topic = (KafkaTopic)this.topicOperation().createOrReplace((Object[])new KafkaTopic[]{YAML.fromResource(yamlPath, KafkaTopic.class)});
        return this.waitForKafkaTopic(topic.getMetadata().getName());
    }

    @Override
    public boolean undeploy() {
        return Crds.kafkaOperation((KubernetesClient)this.ocp).delete((Object[])new Kafka[]{this.kafka});
    }

    @Override
    public void waitForCluster() throws InterruptedException {
        LOGGER.info("Waiting for Kafka cluster '" + this.name + "'");
        this.kafka = (Kafka)((Resource)this.kafkaOperation().withName(this.name)).waitUntilCondition(WaitConditions::kafkaReadyCondition, WaitConditions.scaled(7L), TimeUnit.MINUTES);
    }

    private KafkaTopic waitForKafkaTopic(String name) throws InterruptedException {
        return (KafkaTopic)((Resource)this.topicOperation().withName(name)).waitUntilCondition(WaitConditions::kafkaReadyCondition, WaitConditions.scaled(5L), TimeUnit.MINUTES);
    }

    private NonNamespaceOperation<KafkaTopic, KafkaTopicList, Resource<KafkaTopic>> topicOperation() {
        return (NonNamespaceOperation)Crds.topicOperation((KubernetesClient)this.ocp).inNamespace(this.project);
    }

    private NonNamespaceOperation<Kafka, KafkaList, Resource<Kafka>> kafkaOperation() {
        return (NonNamespaceOperation)Crds.kafkaOperation((KubernetesClient)this.ocp).inNamespace(this.project);
    }

    @Override
    public Properties getDefaultConsumerProperties() {
        Properties kafkaConsumerProps = new Properties();
        try {
            kafkaConsumerProps.put("ssl.truststore.location", this.getKafkaCaCertificate().getAbsolutePath());
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        kafkaConsumerProps.put("bootstrap.servers", this.getPublicBootstrapAddress());
        kafkaConsumerProps.put("group.id", "DEBEZIUM_IT_01");
        kafkaConsumerProps.put("auto.offset.reset", "earliest");
        kafkaConsumerProps.put("enable.auto.commit", (Object)false);
        kafkaConsumerProps.put("security.protocol", "SSL");
        kafkaConsumerProps.put("ssl.truststore.type", "PEM");
        return kafkaConsumerProps;
    }

    private File getKafkaCaCertificate() throws IOException {
        Secret secret = (Secret)((Resource)((NonNamespaceOperation)this.ocp.secrets().inNamespace(this.project)).withName(FabricKafkaConnectBuilder.KAFKA_CERT_SECRET)).get();
        if (secret == null) {
            throw new IllegalStateException("Kafka cluster certificate secret not found");
        }
        String cert = (String)secret.getData().get(FabricKafkaConnectBuilder.KAFKA_CERT_FILENAME);
        byte[] decodedBytes = Base64.getDecoder().decode(cert);
        cert = new String(decodedBytes);
        File crtFile = Files.createTempFile("kafka-cert-", null, new FileAttribute[0]).toFile();
        Files.writeString(crtFile.toPath(), (CharSequence)cert, new OpenOption[0]);
        return crtFile;
    }
}

