/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.command.kafka;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
import org.eclipse.hono.client.NoConsumerException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandHandlerWrapper;
import org.eclipse.hono.client.command.CommandHandlers;
import org.eclipse.hono.client.command.CommandResponseSender;
import org.eclipse.hono.client.command.InternalCommandConsumer;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommand;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommandContext;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.KafkaAdminClientConfigProperties;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.client.kafka.consumer.MessagingKafkaConsumerConfigProperties;
import org.eclipse.hono.client.kafka.metrics.KafkaClientMetricsSupport;
import org.eclipse.hono.client.kafka.tracing.KafkaTracingHelper;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.client.registry.TenantDisabledOrNotRegisteredException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaBasedInternalCommandConsumer
implements InternalCommandConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBasedInternalCommandConsumer.class);
    private static final int NUM_PARTITIONS = 1;
    private static final long CREATE_TOPIC_RETRY_INTERVAL = 1000L;
    private final Vertx vertx;
    private final Supplier<Future<KafkaConsumer<String, Buffer>>> consumerCreator;
    private final Supplier<Future<Admin>> kafkaAdminClientCreator;
    private final String adapterInstanceId;
    private final String clientId;
    private final CommandHandlers commandHandlers;
    private final Tracer tracer;
    private final CommandResponseSender commandResponseSender;
    private final TenantClient tenantClient;
    private final AtomicBoolean isTopicCreated = new AtomicBoolean(false);
    private final AtomicBoolean retryCreateTopic = new AtomicBoolean(true);
    private final Map<String, Map<Integer, Long>> lastHandledPartitionOffsetsPerTenant = new HashMap<String, Map<Integer, Long>>();
    private KafkaConsumer<String, Buffer> consumer;
    private Admin adminClient;
    private Context context;
    private KafkaClientMetricsSupport metricsSupport;
    private long retryCreateTopicTimerId;

    public KafkaBasedInternalCommandConsumer(Vertx vertx, KafkaAdminClientConfigProperties adminClientConfigProperties, MessagingKafkaConsumerConfigProperties consumerConfigProperties, TenantClient tenantClient, CommandResponseSender commandResponseSender, String adapterInstanceId, CommandHandlers commandHandlers, Tracer tracer) {
        this.vertx = Objects.requireNonNull(vertx);
        Objects.requireNonNull(adminClientConfigProperties);
        Objects.requireNonNull(consumerConfigProperties);
        this.tenantClient = Objects.requireNonNull(tenantClient);
        this.commandResponseSender = Objects.requireNonNull(commandResponseSender);
        this.adapterInstanceId = Objects.requireNonNull(adapterInstanceId);
        this.commandHandlers = Objects.requireNonNull(commandHandlers);
        this.tracer = Objects.requireNonNull(tracer);
        Map adminClientConfig = adminClientConfigProperties.getAdminClientConfig("internal-cmd-admin");
        String bootstrapServersConfig = (String)adminClientConfig.get("bootstrap.servers");
        KafkaClientFactory kafkaClientFactory = new KafkaClientFactory(vertx);
        this.kafkaAdminClientCreator = () -> kafkaClientFactory.createClientWithRetries(() -> Admin.create(new HashMap(adminClientConfig)), bootstrapServersConfig, KafkaClientFactory.UNLIMITED_RETRIES_DURATION);
        Map consumerConfig = consumerConfigProperties.getConsumerConfig("internal-cmd");
        consumerConfig.put("group.id", adapterInstanceId);
        consumerConfig.put("enable.auto.commit", "true");
        consumerConfig.put("auto.offset.reset", "earliest");
        this.clientId = (String)consumerConfig.get("client.id");
        this.consumerCreator = () -> kafkaClientFactory.createKafkaConsumerWithRetries(consumerConfig, String.class, Buffer.class, KafkaClientFactory.UNLIMITED_RETRIES_DURATION);
    }

    KafkaBasedInternalCommandConsumer(Context context, Admin kafkaAdminClient, KafkaConsumer<String, Buffer> kafkaConsumer, String clientId, TenantClient tenantClient, CommandResponseSender commandResponseSender, String adapterInstanceId, CommandHandlers commandHandlers, Tracer tracer) {
        this.context = Objects.requireNonNull(context);
        Objects.requireNonNull(kafkaAdminClient);
        this.consumer = Objects.requireNonNull(kafkaConsumer);
        this.clientId = Objects.requireNonNull(clientId);
        this.tenantClient = Objects.requireNonNull(tenantClient);
        this.commandResponseSender = Objects.requireNonNull(commandResponseSender);
        this.adapterInstanceId = Objects.requireNonNull(adapterInstanceId);
        this.commandHandlers = Objects.requireNonNull(commandHandlers);
        this.tracer = Objects.requireNonNull(tracer);
        this.vertx = context.owner();
        this.consumerCreator = () -> Future.succeededFuture((Object)kafkaConsumer);
        this.kafkaAdminClientCreator = () -> Future.succeededFuture((Object)kafkaAdminClient);
    }

    public final KafkaBasedInternalCommandConsumer setMetricsSupport(KafkaClientMetricsSupport metricsSupport) {
        this.metricsSupport = metricsSupport;
        return this;
    }

    public Future<Void> start() {
        if (this.context == null) {
            this.context = Vertx.currentContext();
            if (this.context == null) {
                return Future.failedFuture((Throwable)new IllegalStateException("Consumer must be started in a Vert.x context"));
            }
        }
        return this.kafkaAdminClientCreator.get().onFailure(thr -> LOG.error("admin client creation failed", thr)).compose(client -> {
            this.adminClient = client;
            return this.createTopic();
        }).recover(e -> this.retryCreateTopic()).compose(v -> {
            this.isTopicCreated.set(true);
            return this.consumerCreator.get().onFailure(thr -> LOG.error("consumer creation failed", thr));
        }).compose(client -> {
            this.consumer = client;
            Optional.ofNullable(this.metricsSupport).ifPresent(ms -> ms.registerKafkaConsumer(this.consumer.unwrap()));
            return this.subscribeToTopic();
        });
    }

    public void registerReadinessChecks(HealthCheckHandler readinessHandler) {
        LOG.trace("registering readiness check using kafka based internal command consumer [adapter instance id: {}]", (Object)this.adapterInstanceId);
        readinessHandler.register(String.format("internal-command-consumer[%s]-readiness", this.adapterInstanceId), status -> {
            if (this.isTopicCreated.get()) {
                status.tryComplete((Object)Status.OK());
            } else {
                LOG.debug("readiness check failed [internal command topic is not created]");
                status.tryComplete((Object)Status.KO());
            }
        });
    }

    public void registerLivenessChecks(HealthCheckHandler livenessHandler) {
    }

    private Future<Void> createTopic() {
        Promise promise = Promise.promise();
        String topicName = this.getTopicName();
        NewTopic newTopic = new NewTopic(topicName, Optional.of(1), Optional.empty());
        this.adminClient.createTopics(List.of(newTopic)).all().whenComplete((v, ex) -> this.context.runOnContext(v1 -> Optional.ofNullable(ex).filter(e -> !(e instanceof TopicExistsException)).ifPresentOrElse(arg_0 -> ((Promise)promise).fail(arg_0), () -> ((Promise)promise).complete())));
        return promise.future().onSuccess(v -> LOG.debug("created topic [{}]", (Object)topicName)).onFailure(thr -> LOG.error("error creating topic [{}]", (Object)topicName, thr));
    }

    private Future<Void> retryCreateTopic() {
        Promise createTopicRetryPromise = Promise.promise();
        this.retryCreateTopicTimerId = this.vertx.setPeriodic(1000L, id -> {
            if (this.retryCreateTopic.compareAndSet(true, false)) {
                this.createTopic().onSuccess(ok -> {
                    this.vertx.cancelTimer(id.longValue());
                    createTopicRetryPromise.complete();
                }).onFailure(e -> this.retryCreateTopic.set(true));
            }
        });
        return createTopicRetryPromise.future();
    }

    private Future<Void> subscribeToTopic() {
        this.consumer.handler(this::handleCommandMessage);
        this.consumer.exceptionHandler(thr -> LOG.error("consumer error occurred [adapterInstanceId: {}, clientId: {}]", new Object[]{this.adapterInstanceId, this.clientId, thr}));
        this.consumer.partitionsRevokedHandler(this::onPartitionsRevoked);
        Promise partitionAssignedPromise = Promise.promise();
        this.consumer.partitionsAssignedHandler(partitionsSet -> {
            LOG.debug("partitions assigned: {}", partitionsSet);
            partitionAssignedPromise.tryComplete();
        });
        String topicName = this.getTopicName();
        Promise subscribedPromise = Promise.promise();
        this.consumer.subscribe(topicName, (Handler)subscribedPromise);
        return CompositeFuture.all((Future)subscribedPromise.future(), (Future)partitionAssignedPromise.future()).map((Object)null).onComplete(ar -> this.consumer.partitionsAssignedHandler(this::onPartitionsAssigned)).onSuccess(v -> LOG.debug("subscribed and got partition assignment for topic [{}]", (Object)topicName)).onFailure(thr -> LOG.error("error subscribing to topic [{}]", (Object)topicName, thr));
    }

    private void onPartitionsAssigned(Set<TopicPartition> partitionsSet) {
        LOG.debug("partitions assigned: {}", partitionsSet);
    }

    private void onPartitionsRevoked(Set<TopicPartition> partitionsSet) {
        LOG.debug("partitions revoked: {}", partitionsSet);
    }

    private String getTopicName() {
        return new HonoTopic(HonoTopic.Type.COMMAND_INTERNAL, this.adapterInstanceId).toString();
    }

    public Future<Void> stop() {
        this.retryCreateTopic.set(false);
        this.vertx.cancelTimer(this.retryCreateTopicTimerId);
        if (this.consumer == null) {
            return Future.failedFuture((String)"not started");
        }
        return CompositeFuture.all(this.closeAdminClient(), this.closeConsumer()).mapEmpty();
    }

    private Future<Void> closeAdminClient() {
        Promise adminClientClosePromise = Promise.promise();
        LOG.debug("stop: close admin client");
        this.context.executeBlocking(future -> {
            this.adminClient.close();
            LOG.debug("admin client closed");
            future.complete();
        }, (Handler)adminClientClosePromise);
        return adminClientClosePromise.future();
    }

    private Future<Void> closeConsumer() {
        Promise consumerClosePromise = Promise.promise();
        LOG.debug("stop: close consumer");
        this.consumer.close((Handler)consumerClosePromise);
        consumerClosePromise.future().onComplete(ar -> {
            LOG.debug("consumer closed");
            Optional.ofNullable(this.metricsSupport).ifPresent(ms -> ms.unregisterKafkaConsumer(this.consumer.unwrap()));
        });
        return consumerClosePromise.future();
    }

    void handleCommandMessage(KafkaConsumerRecord<String, Buffer> record) {
        KafkaBasedCommand command;
        Integer commandPartition = KafkaRecordHelper.getOriginalPartitionHeader((List)record.headers()).orElse(null);
        Long commandOffset = KafkaRecordHelper.getOriginalOffsetHeader((List)record.headers()).orElse(null);
        if (commandPartition == null || commandOffset == null) {
            LOG.warn("command record is invalid - missing required original partition/offset headers");
            return;
        }
        try {
            command = KafkaBasedCommand.fromRoutedCommandRecord(record);
        }
        catch (IllegalArgumentException e) {
            LOG.warn("command record is invalid [tenant-id: {}, device-id: {}]", new Object[]{KafkaRecordHelper.getTenantId((List)record.headers()).orElse(null), KafkaRecordHelper.getDeviceId((List)record.headers()).orElse(null), e});
            return;
        }
        Map lastHandledPartitionOffsets = this.lastHandledPartitionOffsetsPerTenant.computeIfAbsent(command.getTenant(), k -> new HashMap());
        Long lastHandledOffset = (Long)lastHandledPartitionOffsets.get(commandPartition);
        if (lastHandledOffset != null && commandOffset <= lastHandledOffset) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("ignoring command - record partition offset {} <= last handled offset {} [{}]", new Object[]{commandOffset, lastHandledOffset, command});
            }
        } else {
            lastHandledPartitionOffsets.put(commandPartition, commandOffset);
            CommandHandlerWrapper commandHandler = this.commandHandlers.getCommandHandler(command.getTenant(), command.getGatewayOrDeviceId());
            if (commandHandler != null && commandHandler.getGatewayId() != null) {
                command.setGatewayId(commandHandler.getGatewayId());
            }
            SpanContext spanContext = KafkaTracingHelper.extractSpanContext((Tracer)this.tracer, record);
            SpanContext followsFromSpanContext = commandHandler != null ? commandHandler.getConsumerCreationSpanContext() : null;
            Span currentSpan = CommandContext.createSpan((Tracer)this.tracer, (Command)command, (SpanContext)spanContext, (SpanContext)followsFromSpanContext);
            currentSpan.setTag("adapter_instance_id", this.adapterInstanceId);
            KafkaTracingHelper.TAG_OFFSET.set(currentSpan, Long.valueOf(record.offset()));
            KafkaBasedCommandContext commandContext = new KafkaBasedCommandContext(command, this.commandResponseSender, currentSpan);
            this.tenantClient.get(command.getTenant(), spanContext).onFailure(t -> {
                if (ServiceInvocationException.extractStatusCode((Throwable)t) == 404) {
                    commandContext.reject((Throwable)new TenantDisabledOrNotRegisteredException(command.getTenant(), 404));
                } else {
                    commandContext.release((Throwable)new ServerErrorException(command.getTenant(), 503, "error retrieving tenant configuration", t));
                }
            }).onSuccess(tenantConfig -> {
                commandContext.put("tenant-config", tenantConfig);
                if (commandHandler != null) {
                    LOG.trace("using [{}] for received command [{}]", (Object)commandHandler, (Object)command);
                    commandHandler.handleCommand((CommandContext)commandContext);
                } else {
                    LOG.info("no command handler found for command [{}]", (Object)command);
                    commandContext.release((Throwable)new NoConsumerException("no command handler found for command"));
                }
            });
        }
    }
}

