/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.testing.system.tools.kafka;

import io.debezium.testing.system.tools.HttpUtils;
import io.debezium.testing.system.tools.OpenShiftUtils;
import io.debezium.testing.system.tools.WaitConditions;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
import io.debezium.testing.system.tools.kafka.StrimziOperatorController;
import io.debezium.testing.system.tools.kafka.connectors.ConnectorDeployer;
import io.debezium.testing.system.tools.kafka.connectors.ConnectorMetricsReader;
import io.debezium.testing.system.tools.kafka.connectors.CustomResourceConnectorDeployer;
import io.debezium.testing.system.tools.kafka.connectors.JsonConnectorDeployer;
import io.debezium.testing.system.tools.kafka.connectors.RestPrometheusMetricReader;
import io.fabric8.kubernetes.api.model.IntOrString;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyPort;
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyPortBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import io.fabric8.kubernetes.client.dsl.ServiceResource;
import io.fabric8.openshift.api.model.Route;
import io.fabric8.openshift.client.OpenShiftClient;
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.model.KafkaConnect;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OcpKafkaConnectController
implements KafkaConnectController {
    private static final Logger LOGGER = LoggerFactory.getLogger(OcpKafkaConnectController.class);
    private static final int METRICS_PORT = 9404;
    private final OpenShiftClient ocp;
    private final OkHttpClient http;
    private final String project;
    private final StrimziOperatorController operatorController;
    private final OpenShiftUtils ocpUtils;
    private final HttpUtils httpUtils;
    private final String name;
    private KafkaConnect kafkaConnect;
    private Route apiRoute;
    private Route metricsRoute;
    private Service metricsService;

    public OcpKafkaConnectController(KafkaConnect kafkaConnect, StrimziOperatorController operatorController, OpenShiftClient ocp, OkHttpClient http) {
        this.kafkaConnect = kafkaConnect;
        this.name = kafkaConnect.getMetadata().getName();
        this.operatorController = operatorController;
        this.ocp = ocp;
        this.http = http;
        this.project = kafkaConnect.getMetadata().getNamespace();
        this.ocpUtils = new OpenShiftUtils(ocp);
        this.httpUtils = new HttpUtils(http);
    }

    @Override
    public void disable() {
        LOGGER.info("Disabling KafkaConnect deployment (scaling to ZERO).");
        ((RollableScalableResource)((NonNamespaceOperation)this.ocp.apps().deployments().inNamespace(this.project)).withName(this.name + "-connect")).scale(0);
        Awaitility.await().atMost(WaitConditions.scaled(30L), TimeUnit.SECONDS).pollDelay(5L, TimeUnit.SECONDS).pollInterval(3L, TimeUnit.SECONDS).until(() -> ((PodList)((FilterWatchListDeletable)((NonNamespaceOperation)this.ocp.pods().inNamespace(this.project)).withLabel("strimzi.io/kind", "KafkaConnect")).list()).getItems().isEmpty());
    }

    @Override
    public void destroy() {
        LOGGER.info("Force deleting all KafkaConnect pods.");
        KafkaConnect kafkaConnect = (KafkaConnect)((Resource)((NonNamespaceOperation)Crds.kafkaConnectOperation((KubernetesClient)this.ocp).inNamespace(this.project)).withName(this.name)).get();
        kafkaConnect.getSpec().setReplicas(Integer.valueOf(0));
        ((NonNamespaceOperation)Crds.kafkaConnectOperation((KubernetesClient)this.ocp).inNamespace(this.project)).createOrReplace((Object[])new KafkaConnect[]{kafkaConnect});
        Awaitility.await().atMost(WaitConditions.scaled(30L), TimeUnit.SECONDS).pollDelay(5L, TimeUnit.SECONDS).pollInterval(3L, TimeUnit.SECONDS).until(() -> ((PodList)((FilterWatchListDeletable)((NonNamespaceOperation)this.ocp.pods().inNamespace(this.project)).withLabel("strimzi.io/kind", "KafkaConnect")).list()).getItems().isEmpty());
    }

    @Override
    public void restore() throws InterruptedException {
        KafkaConnect kafkaConnect = (KafkaConnect)((Resource)((NonNamespaceOperation)Crds.kafkaConnectOperation((KubernetesClient)this.ocp).inNamespace(this.project)).withName(this.name)).get();
        kafkaConnect.getSpec().setReplicas(Integer.valueOf(1));
        ((NonNamespaceOperation)Crds.kafkaConnectOperation((KubernetesClient)this.ocp).inNamespace(this.project)).createOrReplace((Object[])new KafkaConnect[]{kafkaConnect});
        this.waitForCluster();
    }

    @Override
    public void waitForCluster() throws InterruptedException {
        LOGGER.info("Waiting for Kafka Connect cluster '" + this.name + "'");
        this.kafkaConnect = (KafkaConnect)((Resource)((NonNamespaceOperation)Crds.kafkaConnectOperation((KubernetesClient)this.ocp).inNamespace(this.project)).withName(this.name)).waitUntilCondition(WaitConditions::kafkaReadyCondition, WaitConditions.scaled(5L), TimeUnit.MINUTES);
    }

    public NetworkPolicy allowServiceAccess() {
        LOGGER.info("Creating NetworkPolicy allowing public access to " + this.kafkaConnect.getMetadata().getName() + "'s services");
        HashMap<String, String> labels = new HashMap<String, String>();
        labels.put("strimzi.io/cluster", this.kafkaConnect.getMetadata().getName());
        labels.put("strimzi.io/kind", "KafkaConnect");
        labels.put("strimzi.io/name", this.kafkaConnect.getMetadata().getName() + "-connect");
        List<NetworkPolicyPort> ports = Stream.of(8083, 8404, 9404).map(IntOrString::new).map(p -> ((NetworkPolicyPortBuilder)((NetworkPolicyPortBuilder)new NetworkPolicyPortBuilder().withProtocol("TCP")).withPort(p)).build()).collect(Collectors.toList());
        return this.ocpUtils.createNetworkPolicy(this.project, this.kafkaConnect.getMetadata().getName() + "-allowed", labels, ports);
    }

    public Route exposeApi() {
        LOGGER.info("Exposing KafkaConnect API");
        String name = this.kafkaConnect.getMetadata().getName() + "-connect-api";
        Service service = (Service)((ServiceResource)((NonNamespaceOperation)this.ocp.services().inNamespace(this.project)).withName(name)).get();
        this.apiRoute = this.ocpUtils.createRoute(this.project, name, name, "rest-api", service.getMetadata().getLabels());
        this.httpUtils.awaitApi(this.getApiURL());
        return this.apiRoute;
    }

    public Route exposeMetrics() {
        LOGGER.info("Exposing KafkaConnect metrics");
        String namePort = "tcp-prometheus";
        String name = this.kafkaConnect.getMetadata().getName() + "-connect-metrics";
        String nameApiSvc = this.kafkaConnect.getMetadata().getName() + "-connect-api";
        Service apiService = (Service)((ServiceResource)((NonNamespaceOperation)this.ocp.services().inNamespace(this.project)).withName(nameApiSvc)).get();
        Map selector = apiService.getSpec().getSelector();
        Map labels = apiService.getMetadata().getLabels();
        this.metricsService = this.ocpUtils.createService(this.project, name, namePort, 9404, selector, labels);
        this.metricsRoute = this.ocpUtils.createRoute(this.project, name, name, namePort, labels);
        this.httpUtils.awaitApi(this.getMetricsURL());
        return this.metricsRoute;
    }

    @Override
    public void deployConnector(ConnectorConfigBuilder config) throws IOException, InterruptedException {
        LOGGER.info("Deploying connector " + config.getConnectorName());
        this.getConnectorDeployer().deploy(config);
    }

    private boolean hasConnectorResourcesEnabled() {
        Map annotations = this.kafkaConnect.getMetadata().getAnnotations();
        return "true".equals(annotations.get("strimzi.io/use-connector-resources"));
    }

    private ConnectorDeployer getConnectorDeployer() {
        if (this.hasConnectorResourcesEnabled()) {
            return new CustomResourceConnectorDeployer(this.kafkaConnect, this.ocp);
        }
        return new JsonConnectorDeployer(this.getApiURL(), this.http);
    }

    @Override
    public void undeployConnector(String name) throws IOException {
        LOGGER.info("Undeploying kafka connector " + name);
        this.getConnectorDeployer().undeploy(name);
    }

    @Override
    public HttpUrl getApiURL() {
        if (this.apiRoute == null) {
            throw new IllegalStateException("KafkaConnect API was not exposed");
        }
        return new HttpUrl.Builder().scheme("http").host(this.apiRoute.getSpec().getHost()).build();
    }

    public HttpUrl getMetricsURL() {
        return new HttpUrl.Builder().scheme("http").host(this.metricsRoute.getSpec().getHost()).build();
    }

    @Override
    public boolean undeploy() {
        return Crds.kafkaConnectOperation((KubernetesClient)this.ocp).delete((Object[])new KafkaConnect[]{this.kafkaConnect});
    }

    @Override
    public ConnectorMetricsReader getMetricsReader() {
        return new RestPrometheusMetricReader(this.getMetricsURL());
    }
}

