/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.application.client.kafka.impl;

import io.opentracing.Tracer;
import io.opentracing.noop.NoopTracerFactory;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageConsumer;
import org.eclipse.hono.application.client.kafka.KafkaApplicationClient;
import org.eclipse.hono.application.client.kafka.KafkaMessageContext;
import org.eclipse.hono.application.client.kafka.impl.KafkaBasedCommandSender;
import org.eclipse.hono.application.client.kafka.impl.KafkaDownstreamMessage;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.KafkaProducerConfigProperties;
import org.eclipse.hono.client.kafka.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerConfigProperties;

public class KafkaApplicationClientImpl
extends KafkaBasedCommandSender
implements KafkaApplicationClient {
    private final Vertx vertx;
    private final KafkaConsumerConfigProperties consumerConfig;
    private final List<MessageConsumer> consumersToCloseOnStop = new LinkedList<MessageConsumer>();
    private Supplier<Consumer<String, Buffer>> kafkaConsumerSupplier;

    public KafkaApplicationClientImpl(Vertx vertx, KafkaConsumerConfigProperties consumerConfig, KafkaProducerFactory<String, Buffer> producerFactory, KafkaProducerConfigProperties producerConfig) {
        this(vertx, consumerConfig, producerFactory, producerConfig, (Tracer)NoopTracerFactory.create());
    }

    public KafkaApplicationClientImpl(Vertx vertx, KafkaConsumerConfigProperties consumerConfig, KafkaProducerFactory<String, Buffer> producerFactory, KafkaProducerConfigProperties producerConfig, Tracer tracer) {
        super(vertx, consumerConfig, producerFactory, producerConfig, tracer);
        Objects.requireNonNull(vertx);
        Objects.requireNonNull(consumerConfig);
        if (!consumerConfig.isConfigured() || !producerConfig.isConfigured()) {
            throw new IllegalArgumentException("No Kafka configuration found!");
        }
        this.vertx = vertx;
        this.consumerConfig = consumerConfig;
    }

    @Override
    public Future<Void> stop() {
        List closeKafkaClientsTracker = this.consumersToCloseOnStop.stream().map(MessageConsumer::close).collect(Collectors.toList());
        closeKafkaClientsTracker.add(super.stop());
        return CompositeFuture.join(closeKafkaClientsTracker).mapEmpty();
    }

    @Override
    public Future<MessageConsumer> createTelemetryConsumer(String tenantId, Handler<DownstreamMessage<KafkaMessageContext>> messageHandler, Handler<Throwable> closeHandler) {
        return this.createKafkaBasedDownstreamMessageConsumer(tenantId, HonoTopic.Type.TELEMETRY, messageHandler);
    }

    @Override
    public Future<MessageConsumer> createEventConsumer(String tenantId, Handler<DownstreamMessage<KafkaMessageContext>> messageHandler, Handler<Throwable> closeHandler) {
        return this.createKafkaBasedDownstreamMessageConsumer(tenantId, HonoTopic.Type.EVENT, messageHandler);
    }

    public Future<MessageConsumer> createCommandResponseConsumer(String tenantId, String replyId, Handler<DownstreamMessage<KafkaMessageContext>> messageHandler, Handler<Throwable> closeHandler) {
        return this.createKafkaBasedDownstreamMessageConsumer(tenantId, HonoTopic.Type.COMMAND_RESPONSE, messageHandler);
    }

    void setKafkaConsumerFactory(Supplier<Consumer<String, Buffer>> kafkaConsumerSupplier) {
        this.kafkaConsumerSupplier = Objects.requireNonNull(kafkaConsumerSupplier);
    }

    private Future<MessageConsumer> createKafkaBasedDownstreamMessageConsumer(String tenantId, HonoTopic.Type type, Handler<DownstreamMessage<KafkaMessageContext>> messageHandler) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(type);
        Objects.requireNonNull(messageHandler);
        String topic = new HonoTopic(type, tenantId).toString();
        Handler recordHandler = record -> messageHandler.handle((Object)new KafkaDownstreamMessage((KafkaConsumerRecord<String, Buffer>)record));
        final HonoKafkaConsumer consumer = new HonoKafkaConsumer(this.vertx, Set.of(topic), recordHandler, this.consumerConfig.getConsumerConfig(type.toString()));
        Optional.ofNullable(this.kafkaConsumerSupplier).ifPresent(arg_0 -> ((HonoKafkaConsumer)consumer).setKafkaConsumerSupplier(arg_0));
        return consumer.start().map(v -> new MessageConsumer(){

            public Future<Void> close() {
                return consumer.stop();
            }
        }).onSuccess(this.consumersToCloseOnStop::add);
    }
}

