/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.device.amqp.impl;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.amqp.GenericSenderLink;
import org.eclipse.hono.client.amqp.connection.AmqpUtils;
import org.eclipse.hono.client.amqp.connection.ConnectionLifecycle;
import org.eclipse.hono.client.amqp.connection.ConnectionLifecycleWrapper;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.command.CommandConsumer;
import org.eclipse.hono.client.device.amqp.AmqpAdapterClient;
import org.eclipse.hono.client.device.amqp.impl.AmqpAdapterClientCommandConsumer;
import org.eclipse.hono.client.util.CachingClientFactory;
import org.eclipse.hono.client.util.ClientFactory;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.QoS;
import org.eclipse.hono.util.ResourceIdentifier;

public final class ProtonBasedAmqpAdapterClient
extends ConnectionLifecycleWrapper<HonoConnection>
implements AmqpAdapterClient {
    private final HonoConnection connection;
    private final CachingClientFactory<GenericSenderLink> telemetrySenderClientFactory;
    private final CachingClientFactory<GenericSenderLink> eventSenderClientFactory;
    private final CachingClientFactory<GenericSenderLink> commandResponseSenderClientFactory;
    private final ClientFactory<CommandConsumer> commandConsumerFactory;

    public ProtonBasedAmqpAdapterClient(HonoConnection connection) {
        super((ConnectionLifecycle)connection);
        this.connection = Objects.requireNonNull(connection);
        this.connection.addDisconnectListener(con -> this.onDisconnect());
        this.telemetrySenderClientFactory = new CachingClientFactory(connection.getVertx(), GenericSenderLink::isOpen);
        this.eventSenderClientFactory = new CachingClientFactory(connection.getVertx(), GenericSenderLink::isOpen);
        this.commandResponseSenderClientFactory = new CachingClientFactory(connection.getVertx(), GenericSenderLink::isOpen);
        this.commandConsumerFactory = new ClientFactory();
    }

    private void onDisconnect() {
        this.telemetrySenderClientFactory.onDisconnect();
        this.eventSenderClientFactory.onDisconnect();
        this.commandResponseSenderClientFactory.onDisconnect();
    }

    private long getDefaultConnectionCheckTimeout() {
        return this.connection.getConfig().getLinkEstablishmentTimeout();
    }

    private Future<GenericSenderLink> getOrCreateGenericTelemetrySender() {
        String cacheKey = "telemetry";
        return this.connection.isConnected(this.getDefaultConnectionCheckTimeout()).compose(v -> this.connection.executeOnContext(result -> this.telemetrySenderClientFactory.getOrCreateClient("telemetry", () -> GenericSenderLink.create((HonoConnection)this.connection, onSenderClosed -> this.telemetrySenderClientFactory.removeClient("telemetry")), (Handler)result)));
    }

    private Future<GenericSenderLink> getOrCreateGenericEventSender() {
        String cacheKey = "event";
        return this.connection.isConnected(this.getDefaultConnectionCheckTimeout()).compose(v -> this.connection.executeOnContext(result -> this.eventSenderClientFactory.getOrCreateClient("event", () -> GenericSenderLink.create((HonoConnection)this.connection, onSenderClosed -> this.telemetrySenderClientFactory.removeClient("event")), (Handler)result)));
    }

    private Future<GenericSenderLink> getOrCreateCommandResponseSender() {
        String cacheKey = "command_response";
        return this.connection.isConnected(this.getDefaultConnectionCheckTimeout()).compose(v -> this.connection.executeOnContext(result -> this.commandResponseSenderClientFactory.getOrCreateClient("command_response", () -> GenericSenderLink.create((HonoConnection)this.connection, onSenderClosed -> this.commandResponseSenderClientFactory.removeClient("command_response")), (Handler)result)));
    }

    private Span createSpan(String operationName, String tenantId, String deviceId, SpanContext context) {
        Objects.requireNonNull(operationName);
        Span span = TracingHelper.buildChildSpan((Tracer)this.connection.getTracer(), (SpanContext)context, (String)operationName, (String)this.getClass().getSimpleName()).ignoreActiveSpan().withTag(Tags.PEER_HOSTNAME.getKey(), this.connection.getConfig().getHost()).withTag(Tags.PEER_PORT.getKey(), (Number)this.connection.getConfig().getPort()).withTag(TracingHelper.TAG_PEER_CONTAINER.getKey(), this.connection.getRemoteContainerId()).start();
        TracingHelper.setDeviceTags((Span)span, (String)tenantId, (String)deviceId);
        return span;
    }

    private Message createAmqpMessage(Buffer payload, String contentType, String targetAddress) {
        Message msg = ProtonHelper.message();
        msg.setAddress(targetAddress);
        AmqpUtils.setCreationTime((Message)msg);
        Optional.ofNullable(contentType).ifPresent(arg_0 -> ((Message)msg).setContentType(arg_0));
        Optional.ofNullable(payload).map(Buffer::getBytes).map(Binary::new).map(Data::new).ifPresent(arg_0 -> ((Message)msg).setBody(arg_0));
        return msg;
    }

    private void checkDeviceSpec(String tenantId, String deviceId) {
        if (tenantId != null && deviceId == null) {
            throw new IllegalArgumentException("device ID is required if tenant ID is not null");
        }
    }

    @Override
    public Future<ProtonDelivery> sendTelemetry(QoS qos, Buffer payload, String contentType, String tenantId, String deviceId, SpanContext context) {
        Objects.requireNonNull(qos);
        this.checkDeviceSpec(tenantId, deviceId);
        return this.getOrCreateGenericTelemetrySender().compose(sender -> {
            Span currentSpan = this.createSpan("send telemetry", tenantId, deviceId, context);
            Message message = this.createAmqpMessage(payload, contentType, ResourceIdentifier.fromPath((String[])new String[]{"telemetry", tenantId, deviceId}).toString());
            if (qos == QoS.AT_MOST_ONCE) {
                return sender.send(message, currentSpan);
            }
            return sender.sendAndWaitForOutcome(message, currentSpan);
        });
    }

    @Override
    public Future<ProtonDelivery> sendEvent(Buffer payload, String contentType, String tenantId, String deviceId, SpanContext context) {
        this.checkDeviceSpec(tenantId, deviceId);
        return this.getOrCreateGenericEventSender().compose(sender -> {
            Span currentSpan = this.createSpan("send event", tenantId, deviceId, context);
            Message message = this.createAmqpMessage(payload, contentType, ResourceIdentifier.fromPath((String[])new String[]{"event", tenantId, deviceId}).toString());
            return sender.sendAndWaitForOutcome(message, currentSpan);
        });
    }

    @Override
    public Future<CommandConsumer> createDeviceSpecificCommandConsumer(String tenantId, String deviceId, Consumer<Message> messageHandler) {
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(messageHandler);
        return this.connection.executeOnContext(result -> this.commandConsumerFactory.createClient(() -> AmqpAdapterClientCommandConsumer.create(this.connection, tenantId, deviceId, (delivery, message) -> messageHandler.accept((Message)message)), (Handler)result));
    }

    @Override
    public Future<CommandConsumer> createCommandConsumer(Consumer<Message> messageHandler) {
        Objects.requireNonNull(messageHandler);
        return this.connection.executeOnContext(result -> this.commandConsumerFactory.createClient(() -> AmqpAdapterClientCommandConsumer.create(this.connection, (delivery, message) -> messageHandler.accept((Message)message)), (Handler)result));
    }

    @Override
    public Future<ProtonDelivery> sendCommandResponse(String targetAddress, String correlationId, int status, Buffer payload, String contentType, SpanContext context) {
        Objects.requireNonNull(targetAddress);
        Objects.requireNonNull(correlationId);
        return this.getOrCreateCommandResponseSender().compose(sender -> {
            Span currentSpan = this.createSpan("send command response", null, null, context);
            Message message = this.createAmqpMessage(payload, contentType, targetAddress);
            message.setCorrelationId((Object)correlationId);
            AmqpUtils.addStatus((Message)message, (int)status);
            return sender.sendAndWaitForOutcome(message, currentSpan);
        });
    }
}

