/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.kafka.producer;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.json.EncodeException;
import io.vertx.core.json.Json;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.RecordMetadata;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.client.kafka.producer.KafkaProducerConfigProperties;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.tracing.KafkaTracingHelper;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Lifecycle;
import org.eclipse.hono.util.LifecycleStatus;
import org.eclipse.hono.util.MessagingClient;
import org.eclipse.hono.util.MessagingType;
import org.eclipse.hono.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractKafkaBasedMessageSender<V>
implements MessagingClient,
ServiceClient,
Lifecycle {
    private static final String DEFAULT_SPAN_NAME = "send message";
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    protected final Tracer tracer;
    protected final LifecycleStatus lifecycleStatus = new LifecycleStatus();
    private final KafkaProducerConfigProperties config;
    private final KafkaProducerFactory<String, V> producerFactory;
    private final String producerName;

    public AbstractKafkaBasedMessageSender(KafkaProducerFactory<String, V> producerFactory, String producerName, KafkaProducerConfigProperties config, Tracer tracer) {
        Objects.requireNonNull(producerFactory);
        Objects.requireNonNull(producerName);
        Objects.requireNonNull(config);
        Objects.requireNonNull(tracer);
        this.producerFactory = producerFactory;
        this.producerName = producerName;
        this.config = config;
        this.tracer = tracer;
    }

    public final MessagingType getMessagingType() {
        return MessagingType.kafka;
    }

    public final void addOnKafkaProducerReadyHandler(Handler<AsyncResult<Void>> handler) {
        if (handler != null) {
            this.lifecycleStatus.addOnStartedHandler(handler);
        }
    }

    public void registerReadinessChecks(HealthCheckHandler readinessHandler) {
        readinessHandler.register("%s-kafka-producer-creation-%s".formatted(this.producerName, UUID.randomUUID()), status -> status.tryComplete((Object)new Status().setOk(this.lifecycleStatus.isStarted())));
    }

    public void registerLivenessChecks(HealthCheckHandler livenessHandler) {
    }

    public Future<Void> start() {
        if (this.lifecycleStatus.isStarting()) {
            return Future.succeededFuture();
        }
        if (!this.lifecycleStatus.setStarting()) {
            return Future.failedFuture((Throwable)new IllegalStateException("sender is already started/stopping"));
        }
        this.producerFactory.getOrCreateProducerWithRetries(this.producerName, this.config, () -> ((LifecycleStatus)this.lifecycleStatus).isStarting(), KafkaClientFactory.UNLIMITED_RETRIES_DURATION).onSuccess(producer -> this.lifecycleStatus.setStarted());
        return Future.succeededFuture();
    }

    public Future<Void> stop() {
        return this.lifecycleStatus.runStopAttempt(this::stopProducer);
    }

    protected final Future<Void> sendAndWaitForOutcome(String topic, String tenantId, String deviceId, V payload, Map<String, Object> properties, Span currentSpan) {
        Objects.requireNonNull(topic);
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(properties);
        Objects.requireNonNull(currentSpan);
        List<KafkaHeader> headers = this.encodePropertiesAsKafkaHeaders(properties, currentSpan);
        return this.sendAndWaitForOutcome(topic, tenantId, deviceId, payload, headers, currentSpan);
    }

    protected final Future<Void> sendAndWaitForOutcome(String topic, String tenantId, String deviceId, V payload, List<KafkaHeader> headers, Span currentSpan) {
        Objects.requireNonNull(topic);
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(headers);
        Objects.requireNonNull(currentSpan);
        if (!this.lifecycleStatus.isStarted()) {
            return Future.failedFuture((Throwable)new ServerErrorException(503, "sender not started"));
        }
        KafkaProducerRecord record = KafkaProducerRecord.create((String)topic, (Object)deviceId, payload);
        this.log.trace("sending message to Kafka [topic: {}, tenantId: {}, deviceId: {}]", new Object[]{topic, tenantId, deviceId});
        record.addHeaders(headers);
        KafkaTracingHelper.injectSpanContext(this.tracer, record, currentSpan.context());
        this.logProducerRecord(currentSpan, record);
        return this.getOrCreateProducer().send(record).onSuccess(recordMetadata -> this.logRecordMetadata(currentSpan, deviceId, (RecordMetadata)recordMetadata)).otherwise(t -> {
            this.logError(currentSpan, topic, tenantId, deviceId, (Throwable)t);
            throw new ServerErrorException(tenantId, this.getErrorCode((Throwable)t), t);
        }).mapEmpty();
    }

    protected final KafkaProducer<String, V> getOrCreateProducer() {
        return this.producerFactory.getOrCreateProducer(this.producerName, this.config);
    }

    protected final Future<Void> stopProducer() {
        return this.producerFactory.closeProducer(this.producerName);
    }

    private List<KafkaHeader> encodePropertiesAsKafkaHeaders(Map<String, Object> properties, Span span) {
        ArrayList<KafkaHeader> headers = new ArrayList<KafkaHeader>();
        properties.forEach((k, v) -> {
            try {
                headers.add(KafkaRecordHelper.createKafkaHeader(k, v));
            }
            catch (EncodeException e) {
                this.log.info("failed to serialize property with key [{}] to Kafka header", k);
                span.log("failed to create Kafka header from property: " + k);
            }
        });
        if (!properties.containsKey("creation-time")) {
            headers.add(KafkaRecordHelper.createKafkaHeader("creation-time", Json.encode((Object)Instant.now().toEpochMilli())));
        }
        return headers;
    }

    protected Span startChildSpan(String operationName, String topic, String tenantId, String deviceId, SpanContext context) {
        return this.startSpan(operationName, topic, tenantId, deviceId, "child_of", context);
    }

    protected Span startSpan(String operationName, String topic, String tenantId, String deviceId, String referenceType, SpanContext context) {
        String operationNameToUse = Strings.isNullOrEmpty((Object)operationName) ? DEFAULT_SPAN_NAME : operationName;
        return KafkaTracingHelper.newProducerSpan(this.tracer, operationNameToUse, topic, referenceType, context).setTag(TracingHelper.TAG_TENANT_ID.getKey(), tenantId).setTag(TracingHelper.TAG_DEVICE_ID.getKey(), deviceId);
    }

    private void logProducerRecord(Span span, KafkaProducerRecord<String, V> record) {
        String headersAsString = record.headers().stream().map(header -> header.key() + "=" + header.value()).collect(Collectors.joining(",", "{", "}"));
        this.log.trace("producing message [topic: {}, key: {}, partition: {}, timestamp: {}, headers: {}]", new Object[]{record.topic(), record.key(), record.partition(), record.timestamp(), headersAsString});
        span.log("producing message with headers: " + headersAsString);
    }

    private void logRecordMetadata(Span span, String recordKey, RecordMetadata metadata) {
        this.log.trace("message produced to Kafka [topic: {}, key: {}, partition: {}, offset: {}, timestamp: {}]", new Object[]{metadata.getTopic(), recordKey, metadata.getPartition(), metadata.getOffset(), metadata.getTimestamp()});
        span.log("message produced to Kafka");
        KafkaTracingHelper.setRecordMetadataTags(span, metadata);
        Tags.HTTP_STATUS.set(span, Integer.valueOf(202));
    }

    private void logError(Span span, String topic, String tenantId, String deviceId, Throwable cause) {
        this.log.debug("sending message failed [topic: {}, key: {}, tenantId: {}, deviceId: {}]", new Object[]{topic, deviceId, tenantId, deviceId, cause});
        Tags.HTTP_STATUS.set(span, Integer.valueOf(this.getErrorCode(cause)));
        TracingHelper.logError((Span)span, (Throwable)cause);
    }

    private int getErrorCode(Throwable t) {
        return 503;
    }
}

