/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.commandrouter.impl.kafka;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.common.impl.Helper;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommandResponseSender;
import org.eclipse.hono.client.command.kafka.KafkaBasedInternalCommandSender;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.client.kafka.consumer.AsyncHandlingAutoCommitKafkaConsumer;
import org.eclipse.hono.client.kafka.consumer.MessagingKafkaConsumerConfigProperties;
import org.eclipse.hono.client.kafka.metrics.KafkaClientMetricsSupport;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.commandrouter.CommandConsumerFactory;
import org.eclipse.hono.commandrouter.CommandRouterMetrics;
import org.eclipse.hono.commandrouter.CommandTargetMapper;
import org.eclipse.hono.commandrouter.impl.kafka.InternalKafkaTopicCleanupService;
import org.eclipse.hono.commandrouter.impl.kafka.KafkaBasedMappingAndDelegatingCommandHandler;
import org.eclipse.hono.commandrouter.impl.kafka.KafkaCommandProcessingQueue;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.MessagingType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaBasedCommandConsumerFactoryImpl
implements CommandConsumerFactory {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBasedCommandConsumerFactoryImpl.class);
    private static final Pattern COMMANDS_TOPIC_PATTERN = Pattern.compile(Pattern.quote(HonoTopic.Type.COMMAND.prefix) + ".*");
    private static final String DEFAULT_GROUP_ID = "cmd-router-group";
    private final Vertx vertx;
    private final TenantClient tenantClient;
    private final CommandTargetMapper commandTargetMapper;
    private final InternalKafkaTopicCleanupService internalKafkaTopicCleanupService;
    private final MessagingKafkaConsumerConfigProperties kafkaConsumerConfig;
    private final Tracer tracer;
    private final CommandRouterMetrics metrics;
    private final KafkaBasedInternalCommandSender internalCommandSender;
    private final KafkaBasedCommandResponseSender kafkaBasedCommandResponseSender;
    private final KafkaClientMetricsSupport kafkaClientMetricsSupport;
    private String groupId = "cmd-router-group";
    private KafkaBasedMappingAndDelegatingCommandHandler commandHandler;
    private AsyncHandlingAutoCommitKafkaConsumer kafkaConsumer;

    public KafkaBasedCommandConsumerFactoryImpl(Vertx vertx, TenantClient tenantClient, CommandTargetMapper commandTargetMapper, KafkaProducerFactory<String, Buffer> kafkaProducerFactory, MessagingKafkaProducerConfigProperties internalCommandProducerConfig, MessagingKafkaProducerConfigProperties commandResponseProducerConfig, MessagingKafkaConsumerConfigProperties kafkaConsumerConfig, CommandRouterMetrics metrics, KafkaClientMetricsSupport kafkaClientMetricsSupport, Tracer tracer, InternalKafkaTopicCleanupService internalKafkaTopicCleanupService) {
        this.vertx = Objects.requireNonNull(vertx);
        this.tenantClient = Objects.requireNonNull(tenantClient);
        this.commandTargetMapper = Objects.requireNonNull(commandTargetMapper);
        Objects.requireNonNull(kafkaProducerFactory);
        Objects.requireNonNull(internalCommandProducerConfig);
        Objects.requireNonNull(commandResponseProducerConfig);
        this.kafkaConsumerConfig = Objects.requireNonNull(kafkaConsumerConfig);
        this.metrics = Objects.requireNonNull(metrics);
        this.kafkaClientMetricsSupport = Objects.requireNonNull(kafkaClientMetricsSupport);
        this.tracer = Objects.requireNonNull(tracer);
        this.internalKafkaTopicCleanupService = internalKafkaTopicCleanupService;
        this.internalCommandSender = new KafkaBasedInternalCommandSender(kafkaProducerFactory, internalCommandProducerConfig, tracer);
        this.kafkaBasedCommandResponseSender = new KafkaBasedCommandResponseSender(vertx, kafkaProducerFactory, commandResponseProducerConfig, tracer);
    }

    public void setGroupId(String groupId) {
        Objects.requireNonNull(groupId);
        if (this.kafkaConsumer != null) {
            throw new IllegalStateException("must be invoked before start()");
        }
        this.groupId = groupId;
    }

    public MessagingType getMessagingType() {
        return MessagingType.kafka;
    }

    public Future<Void> start() {
        Context context = Vertx.currentContext();
        if (context == null) {
            return Future.failedFuture((Throwable)new IllegalStateException("factory must be started in a Vert.x context"));
        }
        KafkaCommandProcessingQueue commandQueue = new KafkaCommandProcessingQueue(context);
        this.commandHandler = new KafkaBasedMappingAndDelegatingCommandHandler(this.vertx, this.tenantClient, commandQueue, this.commandTargetMapper, this.internalCommandSender, this.kafkaBasedCommandResponseSender, this.metrics, this.tracer);
        Map consumerConfig = this.kafkaConsumerConfig.getConsumerConfig("command");
        consumerConfig.put("group.id", this.groupId);
        consumerConfig.putIfAbsent("partition.assignment.strategy", CooperativeStickyAssignor.class.getName());
        consumerConfig.putIfAbsent("auto.offset.reset", "earliest");
        this.kafkaConsumer = new AsyncHandlingAutoCommitKafkaConsumer(this.vertx, COMMANDS_TOPIC_PATTERN, this.commandHandler::mapAndDelegateIncomingCommandMessage, consumerConfig);
        this.kafkaConsumer.setPollTimeout(Duration.ofMillis(this.kafkaConsumerConfig.getPollTimeout()));
        this.kafkaConsumer.setConsumerCreationRetriesTimeout(KafkaClientFactory.UNLIMITED_RETRIES_DURATION);
        this.kafkaConsumer.setMetricsSupport(this.kafkaClientMetricsSupport);
        this.kafkaConsumer.setOnRebalanceDoneHandler(partitions -> commandQueue.setCurrentlyHandledPartitions(Helper.to((Set)partitions)));
        this.kafkaConsumer.setOnPartitionsLostHandler(partitions -> commandQueue.setRevokedPartitions(Helper.to((Set)partitions)));
        Future cleanupServiceStartFuture = Optional.ofNullable(this.internalKafkaTopicCleanupService).map(InternalKafkaTopicCleanupService::start).orElseGet(Future::succeededFuture);
        return CompositeFuture.all(this.commandHandler.start(), (Future)this.kafkaConsumer.start(), (Future)cleanupServiceStartFuture).mapEmpty();
    }

    public Future<Void> stop() {
        Future cleanupServiceStopFuture = Optional.ofNullable(this.internalKafkaTopicCleanupService).map(InternalKafkaTopicCleanupService::stop).orElseGet(Future::succeededFuture);
        return CompositeFuture.join((Future)this.kafkaConsumer.stop(), this.commandHandler.stop(), (Future)this.internalCommandSender.stop(), (Future)this.kafkaBasedCommandResponseSender.stop(), (Future)cleanupServiceStopFuture).mapEmpty();
    }

    @Override
    public Future<Void> createCommandConsumer(String tenantId, SpanContext context) {
        String topic = new HonoTopic(HonoTopic.Type.COMMAND, tenantId).toString();
        if (this.kafkaConsumer.isAmongKnownSubscribedTopics(topic)) {
            LOG.debug("createCommandConsumer: topic is already subscribed [{}]", (Object)topic);
            return Future.succeededFuture();
        }
        LOG.debug("createCommandConsumer: topic not subscribed; check for its existence, triggering auto-creation if enabled [{}]", (Object)topic);
        Span span = TracingHelper.buildServerChildSpan((Tracer)this.tracer, (SpanContext)context, (String)"wait for topic subscription update", (String)CommandConsumerFactory.class.getSimpleName()).start();
        TracingHelper.TAG_TENANT_ID.set(span, tenantId);
        Tags.MESSAGE_BUS_DESTINATION.set(span, topic);
        return this.kafkaConsumer.ensureTopicIsAmongSubscribedTopicPatternTopics(topic).onComplete(ar -> {
            if (ar.failed()) {
                TracingHelper.logError((Span)span, (Throwable)ar.cause());
            }
            span.finish();
        });
    }
}

