/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.kafka;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.kafka.admin.KafkaAdminClient;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaClientFactory {
    public static final Duration UNLIMITED_RETRIES_DURATION = Duration.ofSeconds(-1L);
    public static final int CLIENT_CREATION_RETRY_DELAY_MILLIS = 1000;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaClientFactory.class);
    private final Vertx vertx;
    private Clock clock = Clock.systemUTC();

    public KafkaClientFactory(Vertx vertx) {
        this.vertx = Objects.requireNonNull(vertx);
    }

    void setClock(Clock clock) {
        this.clock = Objects.requireNonNull(clock);
    }

    public <T> Future<T> createClientWithRetries(Supplier<T> clientSupplier, String bootstrapServersConfig, Duration retriesTimeout) {
        return this.createClientWithRetries(clientSupplier, () -> true, bootstrapServersConfig, retriesTimeout);
    }

    public <T> Future<T> createClientWithRetries(Supplier<T> clientSupplier, BooleanSupplier keepTrying, String bootstrapServersConfig, Duration retriesTimeout) {
        Objects.requireNonNull(clientSupplier);
        Objects.requireNonNull(keepTrying);
        Promise resultPromise = Promise.promise();
        this.createClientWithRetries(clientSupplier, keepTrying, this.getRetriesTimeLimit(retriesTimeout), () -> KafkaClientFactory.containsValidServerEntries(bootstrapServersConfig), resultPromise);
        return resultPromise.future();
    }

    public Future<KafkaAdminClient> createKafkaAdminClientWithRetries(Map<String, String> clientConfig, BooleanSupplier keepTrying, Duration retriesTimeout) {
        Objects.requireNonNull(clientConfig);
        Objects.requireNonNull(keepTrying);
        Promise resultPromise = Promise.promise();
        this.createClientWithRetries(() -> KafkaAdminClient.create((Vertx)this.vertx, (Map)clientConfig), keepTrying, this.getRetriesTimeLimit(retriesTimeout), () -> KafkaClientFactory.containsValidServerEntries((String)clientConfig.get("bootstrap.servers")), resultPromise);
        return resultPromise.future();
    }

    private Instant getRetriesTimeLimit(Duration retriesTimeout) {
        if (retriesTimeout == null || retriesTimeout.isNegative()) {
            return Instant.MAX;
        }
        if (retriesTimeout.isZero()) {
            return Instant.MIN;
        }
        return Instant.now(this.clock).plus(retriesTimeout);
    }

    private <T> void createClientWithRetries(Supplier<T> clientSupplier, BooleanSupplier keepTrying, Instant retriesTimeLimit, BooleanSupplier serverEntriesValid, Promise<T> resultPromise) {
        if (!keepTrying.getAsBoolean()) {
            resultPromise.fail("client code has canceled further attempts to create Kafka client");
            return;
        }
        try {
            T client = clientSupplier.get();
            LOG.debug("successfully created client [type: {}]", (Object)client.getClass().getName());
            resultPromise.complete(client);
        }
        catch (Exception e) {
            if (!retriesTimeLimit.equals(Instant.MIN) && e instanceof KafkaException && KafkaClientFactory.isBootstrapServersConfigException(e.getCause()) && serverEntriesValid.getAsBoolean()) {
                if (!keepTrying.getAsBoolean()) {
                    LOG.debug("client code has canceled further attempts to create Kafka client");
                    resultPromise.fail((Throwable)e);
                } else if (Instant.now(this.clock).isBefore(retriesTimeLimit)) {
                    LOG.debug("error creating Kafka client, will retry in {}ms: {}", (Object)1000, (Object)e.getCause().getMessage());
                    this.vertx.setTimer(1000L, tid -> this.createClientWithRetries(clientSupplier, keepTrying, retriesTimeLimit, () -> true, resultPromise));
                } else {
                    LOG.warn("error creating Kafka client (no further attempts will be done, timeout for retries reached): {}", (Object)e.getCause().getMessage());
                    resultPromise.fail((Throwable)e);
                }
            }
            LOG.warn("failed to create client due to terminal error (won't retry)", (Throwable)e);
            resultPromise.fail((Throwable)e);
        }
    }

    public static boolean isRetriableClientCreationError(Throwable exception, String bootstrapServersConfig) {
        return exception instanceof KafkaException && KafkaClientFactory.isBootstrapServersConfigException(exception.getCause()) && KafkaClientFactory.containsValidServerEntries(bootstrapServersConfig);
    }

    private static boolean isBootstrapServersConfigException(Throwable ex) {
        return ex instanceof ConfigException && ex.getMessage() != null && ex.getMessage().contains("bootstrap.servers");
    }

    private static boolean containsValidServerEntries(String bootstrapServersConfig) {
        List urlList = Optional.ofNullable(bootstrapServersConfig).map(String::trim).map(serversString -> {
            if (serversString.isEmpty()) {
                return List.of();
            }
            return Arrays.asList(serversString.split(","));
        }).orElseGet(List::of);
        return !urlList.isEmpty() && urlList.stream().allMatch(KafkaClientFactory::containsHostAndPort);
    }

    private static boolean containsHostAndPort(String url) {
        try {
            String trimmedUrl = url.trim();
            return Utils.getHost((String)trimmedUrl) != null && Utils.getPort((String)trimmedUrl) != null;
        }
        catch (IllegalArgumentException e) {
            return false;
        }
    }
}

