/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.pubsub;

import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.EncodeException;
import io.vertx.core.json.Json;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherClient;
import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherFactory;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Lifecycle;
import org.eclipse.hono.util.LifecycleStatus;
import org.eclipse.hono.util.MessagingClient;
import org.eclipse.hono.util.MessagingType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractPubSubBasedMessageSender
implements MessagingClient,
ServiceClient,
Lifecycle {
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    protected final String projectId;
    protected final LifecycleStatus lifecycleStatus = new LifecycleStatus();
    private final PubSubPublisherFactory publisherFactory;
    private final String topic;
    private final Tracer tracer;

    protected AbstractPubSubBasedMessageSender(PubSubPublisherFactory publisherFactory, String topic, String projectId, Tracer tracer) {
        Objects.requireNonNull(publisherFactory);
        Objects.requireNonNull(topic);
        Objects.requireNonNull(projectId);
        Objects.requireNonNull(tracer);
        this.publisherFactory = publisherFactory;
        this.topic = topic;
        this.projectId = projectId;
        this.tracer = tracer;
    }

    private Span newSpan(String operationName, String referenceType, SpanContext parent) {
        return TracingHelper.buildSpan((Tracer)this.tracer, (SpanContext)parent, (String)operationName, (String)referenceType).ignoreActiveSpan().withTag(Tags.COMPONENT.getKey(), "hono-client-pubsub").withTag(Tags.SPAN_KIND.getKey(), "producer").withTag(Tags.MESSAGE_BUS_DESTINATION.getKey(), this.topic).withTag(Tags.PEER_SERVICE.getKey(), "pubsub").start();
    }

    public final MessagingType getMessagingType() {
        return MessagingType.pubsub;
    }

    public void registerReadinessChecks(HealthCheckHandler readinessHandler) {
        readinessHandler.register("%s-pub-sub-publisher-creation-%s".formatted(this.topic, UUID.randomUUID()), status -> status.tryComplete((Object)new Status().setOk(this.lifecycleStatus.isStarted())));
    }

    public Future<Void> start() {
        if (this.lifecycleStatus.isStarting()) {
            return Future.succeededFuture();
        }
        if (!this.lifecycleStatus.setStarting()) {
            return Future.failedFuture((Throwable)new IllegalStateException("sender is already started/stopping"));
        }
        this.lifecycleStatus.setStarted();
        return Future.succeededFuture();
    }

    public Future<Void> stop() {
        if (this.lifecycleStatus.isStopping()) {
            return Future.succeededFuture();
        }
        if (!this.lifecycleStatus.setStopping()) {
            return Future.failedFuture((Throwable)new IllegalStateException("sender is already stopping"));
        }
        this.lifecycleStatus.setStopped();
        return this.publisherFactory.closeAllPublisher();
    }

    protected final Future<Void> sendAndWaitForOutcome(String topic, String tenantId, String deviceId, Buffer payload, Map<String, Object> properties, Span currentSpan) {
        Objects.requireNonNull(topic);
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(properties);
        Objects.requireNonNull(currentSpan);
        if (!this.lifecycleStatus.isStarted()) {
            return Future.failedFuture((Throwable)new ServerErrorException(503, "sender not started"));
        }
        Map<String, String> pubSubAttributes = this.encodePropertiesAsPubSubAttributes(properties, currentSpan);
        PubsubMessage.Builder builder = PubsubMessage.newBuilder().putAllAttributes(pubSubAttributes).setOrderingKey(deviceId);
        Optional.ofNullable(payload).map(Buffer::getBytes).map(ByteString::copyFrom).ifPresent(arg_0 -> ((PubsubMessage.Builder)builder).setData(arg_0));
        PubsubMessage pubsubMessage = builder.build();
        this.log.debug("sending message to Pub/Sub [topic: {}, registry: {}, deviceId: {}]", new Object[]{topic, tenantId, deviceId});
        this.logPubSubMessage(currentSpan, pubsubMessage, topic, tenantId);
        return this.getOrCreatePublisher(topic).publish(pubsubMessage).onSuccess(recordMessage -> this.logPubSubMessageId(currentSpan, topic, (String)recordMessage)).recover(t -> this.retrySendToFallbackTopic(topic, currentSpan, tenantId, deviceId, (Throwable)t, pubsubMessage)).mapEmpty();
    }

    private Future<String> retrySendToFallbackTopic(String topic, Span currentSpan, String tenantId, String deviceId, Throwable t, PubsubMessage pubsubMessage) {
        this.log.debug("Failed to publish to topic {}", (Object)topic);
        String fallback = PubSubMessageHelper.getTopicEndpointFromTopic(topic, tenantId);
        if (fallback == null) {
            this.logError(currentSpan, topic, tenantId, deviceId, t);
            throw new ServerErrorException(tenantId, 503, t);
        }
        this.publisherFactory.closePublisher(topic);
        String fallbackTopic = PubSubMessageHelper.getTopicName(fallback, tenantId);
        this.log.debug("Retry sending message to Pub/Sub using the fallback topic [{}]", (Object)fallbackTopic);
        return this.getOrCreatePublisher(fallbackTopic).publish(pubsubMessage).onSuccess(recordMessage -> this.logPubSubMessageId(currentSpan, fallbackTopic, (String)recordMessage)).onFailure(thr -> {
            this.logError(currentSpan, fallbackTopic, tenantId, deviceId, (Throwable)thr);
            throw new ServerErrorException(tenantId, 503, thr);
        }).mapEmpty();
    }

    protected Span startSpan(String operationName, String tenantId, String deviceId, String referenceType, SpanContext context) {
        Objects.requireNonNull(operationName);
        Objects.requireNonNull(referenceType);
        return this.newSpan(operationName, referenceType, context).setTag(TracingHelper.TAG_TENANT_ID.getKey(), tenantId).setTag(TracingHelper.TAG_DEVICE_ID.getKey(), deviceId);
    }

    protected PubSubPublisherClient getOrCreatePublisher(String topic) {
        return this.publisherFactory.getOrCreatePublisher(topic);
    }

    private void logPubSubMessageId(Span span, String topic, String messageId) {
        this.log.debug("message published to Pub/Sub [topic: {}, id: {}]", (Object)topic, (Object)messageId);
        span.log("message published to Pub/Sub");
        Tags.HTTP_STATUS.set(span, Integer.valueOf(202));
    }

    private void logPubSubMessage(Span span, PubsubMessage message, String topic, String tenantId) {
        String attributesAsString = message.getAttributesMap().entrySet().stream().map(entry -> (String)entry.getKey() + "=" + (String)entry.getValue()).collect(Collectors.joining(",", "{", "}"));
        this.log.trace("producing message [topic: {}, tenant: {}, key: {}, timestamp: {}, attributes: {}]", new Object[]{topic, tenantId, message.getOrderingKey(), message.getPublishTime(), attributesAsString});
        span.log("publishing message with headers: " + attributesAsString);
    }

    private void logError(Span span, String topic, String tenantId, String deviceId, Throwable cause) {
        this.log.debug("sending message failed [topic: {}, key: {}, tenantId: {}, deviceId: {}]", new Object[]{topic, deviceId, tenantId, deviceId, cause});
        Tags.HTTP_STATUS.set(span, Integer.valueOf(503));
        TracingHelper.logError((Span)span, (Throwable)cause);
    }

    private Map<String, String> encodePropertiesAsPubSubAttributes(Map<String, Object> properties, Span span) {
        HashMap<String, String> attributes = new HashMap<String, String>();
        properties.forEach((key, value) -> {
            try {
                attributes.put((String)key, this.getStringEncodedValue(value));
            }
            catch (EncodeException e) {
                this.log.info("failed to serialize property with key [{}] to Pub/Sub attribute", key);
                span.log("failed to create Pub/Sub attributes from property: " + key);
            }
        });
        return attributes;
    }

    private String getStringEncodedValue(Object value) {
        if (value instanceof String) {
            String val = (String)value;
            return val;
        }
        return Json.encode((Object)value);
    }
}

