/*
 * Decompiled with CFR 0.152.
 */
package net.christophschubert.cp.testcontainers;

import java.time.Duration;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
import net.christophschubert.cp.testcontainers.CPTestContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.images.builder.ImageFromDockerfile;
import org.testcontainers.images.builder.dockerfile.DockerfileBuilder;
import org.testcontainers.utility.DockerImageName;

public class KafkaConnectContainer
extends CPTestContainer<KafkaConnectContainer> {
    static final int defaultPort = 8083;
    private static final String PROPERTY_PREFIX = "CONNECT";
    private final String configsPostfix = "-configs";
    private final String offsetsPostfix = "-offsets";
    private final String statusPostfix = "-status";
    private String clusterId = "connect";

    public Set<String> getInternalTopics() {
        return Set.of(this.clusterId + "-configs", this.clusterId + "-offsets", this.clusterId + "-status");
    }

    public KafkaConnectContainer withReplicationFactors(int rf) {
        this.withProperty("replication.factor", rf);
        this.withProperty("confluent.topic.replication.factor", rf);
        this.withProperty("config.storage.replication.factor", rf);
        this.withProperty("offset.storage.replication.factor", rf);
        this.withProperty("status.storage.replication.factor", rf);
        return this;
    }

    public KafkaConnectContainer withClusterId(String clusterId) {
        this.withProperty("group.id", clusterId);
        this.withProperty("config.storage.topic", clusterId + "-configs");
        this.withProperty("offset.storage.topic", clusterId + "-offsets");
        this.withProperty("status.storage.topic", clusterId + "-status");
        return this;
    }

    public String getClusterId() {
        return this.clusterId;
    }

    private void _configure(KafkaContainer bootstrap) {
        this.waitingFor((WaitStrategy)Wait.forHttp((String)"/connectors").forStatusCode(200).forStatusCode(401));
        this.withStartupTimeout(Duration.ofMinutes(5L));
        this.withProperty("bootstrap.servers", KafkaConnectContainer.getInternalBootstrap(bootstrap));
        this.withProperty("rest.port", "" + this.httpPort);
        this.withClusterId(this.clusterId);
        this.withReplicationFactors(1);
        this.withProperty("rest.advertised.host.name", "localhost");
        this.withProperty("connector.client.config.override.policy", "All");
        this.withProperty("listeners", this.getHttpPortListener());
        this.withProperty("plugin.path", "/usr/share/java");
        this.withProperty("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        this.withProperty("value.converter", "org.apache.kafka.connect.json.JsonConverter");
    }

    KafkaConnectContainer(DockerImageName dockerImageName, KafkaContainer bootstrap, Network network) {
        super(dockerImageName, bootstrap, network, 8083, PROPERTY_PREFIX);
        this._configure(bootstrap);
    }

    protected KafkaConnectContainer(ImageFromDockerfile image, KafkaContainer bootstrap, Network network) {
        super(image, bootstrap, network, 8083, PROPERTY_PREFIX);
        this._configure(bootstrap);
    }

    public static ImageFromDockerfile customImage(Collection<String> connectorNames, String baseImageName) {
        String commandPrefix = "confluent-hub install --no-prompt ";
        String command = connectorNames.stream().map(s -> "confluent-hub install --no-prompt " + s).collect(Collectors.joining(" && "));
        return (ImageFromDockerfile)new ImageFromDockerfile().withDockerfileFromBuilder(builder -> ((DockerfileBuilder)((DockerfileBuilder)builder.from(baseImageName)).run(command)).build());
    }

    @Override
    public CPTestContainer<KafkaConnectContainer> withLogLevel(String logLevel) {
        this.withEnv("CONNECT_LOG4J_ROOT_LOGLEVEL", logLevel);
        return this;
    }
}

