/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.application.client.amqp;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageConsumer;
import org.eclipse.hono.application.client.amqp.AmqpApplicationClient;
import org.eclipse.hono.application.client.amqp.AmqpMessageContext;
import org.eclipse.hono.application.client.amqp.ProtonBasedCommandSender;
import org.eclipse.hono.application.client.amqp.ProtonBasedDownstreamMessage;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.amqp.GenericReceiverLink;
import org.eclipse.hono.util.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProtonBasedApplicationClient
extends ProtonBasedCommandSender
implements AmqpApplicationClient {
    private static final Logger LOG = LoggerFactory.getLogger(ProtonBasedApplicationClient.class);
    private final HonoConnection connection;

    public ProtonBasedApplicationClient(HonoConnection connection) {
        super(connection, SendMessageSampler.Factory.noop());
        this.connection = connection;
    }

    public final Future<HonoConnection> connect() {
        LOG.info("connecting to Hono endpoint");
        return this.connection.connect();
    }

    public final void disconnect() {
        this.disconnect((Handler<AsyncResult<Void>>)Promise.promise());
    }

    public final void disconnect(Handler<AsyncResult<Void>> completionHandler) {
        LOG.info("disconnecting from Hono endpoint");
        this.connection.disconnect(completionHandler);
    }

    public final Future<MessageConsumer> createTelemetryConsumer(String tenantId, Handler<DownstreamMessage<AmqpMessageContext>> messageHandler, Handler<Throwable> closeHandler) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(messageHandler);
        String sourceAddress = String.format("%s/%s", "telemetry", tenantId);
        return this.createConsumer(sourceAddress, messageHandler, closeHandler);
    }

    public final Future<MessageConsumer> createEventConsumer(String tenantId, Handler<DownstreamMessage<AmqpMessageContext>> messageHandler, Handler<Throwable> closeHandler) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(messageHandler);
        String sourceAddress = String.format("%s/%s", "event", tenantId);
        return this.createConsumer(sourceAddress, messageHandler, closeHandler);
    }

    public Future<MessageConsumer> createCommandResponseConsumer(String tenantId, String replyId, Handler<DownstreamMessage<AmqpMessageContext>> messageHandler, Handler<Throwable> closeHandler) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(replyId);
        Objects.requireNonNull(messageHandler);
        String sourceAddress = String.format("%s/%s/%s", "command_response", tenantId, replyId);
        return this.createConsumer(sourceAddress, messageHandler, closeHandler);
    }

    @Override
    public final Future<MessageConsumer> createTelemetryConsumer(String tenantId, Function<DownstreamMessage<AmqpMessageContext>, Future<Void>> messageHandler, Handler<Throwable> closeHandler) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(messageHandler);
        String sourceAddress = String.format("%s/%s", "telemetry", tenantId);
        return this.createAsyncConsumer(sourceAddress, messageHandler, closeHandler);
    }

    @Override
    public final Future<MessageConsumer> createEventConsumer(String tenantId, Function<DownstreamMessage<AmqpMessageContext>, Future<Void>> messageHandler, Handler<Throwable> closeHandler) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(messageHandler);
        String sourceAddress = String.format("%s/%s", "event", tenantId);
        return this.createAsyncConsumer(sourceAddress, messageHandler, closeHandler);
    }

    private Future<MessageConsumer> createConsumer(String sourceAddress, Handler<DownstreamMessage<AmqpMessageContext>> messageHandler, Handler<Throwable> closeHandler) {
        return GenericReceiverLink.create((HonoConnection)this.connection, (String)sourceAddress, (delivery, message) -> {
            try {
                ProtonBasedDownstreamMessage msg = ProtonBasedDownstreamMessage.from(message, delivery);
                messageHandler.handle((Object)msg);
                if (!delivery.isSettled()) {
                    LOG.debug("client provided message handler did not settle message, auto-accepting ...");
                    ProtonHelper.accepted((ProtonDelivery)delivery, (boolean)true);
                }
            }
            catch (Throwable t) {
                this.handleException(t, (ProtonDelivery)delivery);
            }
        }, (boolean)false, s -> Optional.ofNullable(closeHandler).ifPresent(h -> h.handle(null))).map(recv -> new MessageConsumer((GenericReceiverLink)recv){
            final /* synthetic */ GenericReceiverLink val$recv;
            {
                this.val$recv = genericReceiverLink;
            }

            public Future<Void> close() {
                return this.val$recv.close();
            }
        });
    }

    private Future<MessageConsumer> createAsyncConsumer(String sourceAddress, Function<DownstreamMessage<AmqpMessageContext>, Future<Void>> messageHandler, Handler<Throwable> closeHandler) {
        return GenericReceiverLink.create((HonoConnection)this.connection, (String)sourceAddress, (delivery, message) -> {
            try {
                ProtonBasedDownstreamMessage msg = ProtonBasedDownstreamMessage.from(message, delivery);
                ((Future)messageHandler.apply(msg)).onSuccess(ok -> {
                    if (!delivery.isSettled()) {
                        LOG.debug("client provided message handler did not settle message, auto-accepting ...");
                        ProtonHelper.accepted((ProtonDelivery)delivery, (boolean)true);
                    }
                }).onFailure(t -> this.handleException((Throwable)t, (ProtonDelivery)delivery));
            }
            catch (Throwable t2) {
                this.handleException(t2, (ProtonDelivery)delivery);
            }
        }, (boolean)false, s -> Optional.ofNullable(closeHandler).ifPresent(h -> h.handle(null))).map(recv -> new MessageConsumer((GenericReceiverLink)recv){
            final /* synthetic */ GenericReceiverLink val$recv;
            {
                this.val$recv = genericReceiverLink;
            }

            public Future<Void> close() {
                return this.val$recv.close();
            }
        });
    }

    private void handleException(Throwable error, ProtonDelivery delivery) {
        LOG.debug("client provided message handler threw exception [local state: {}, settled: {}]", new Object[]{Optional.ofNullable(delivery.getLocalState()).map(s -> s.getType().name()).orElse(null), delivery.isSettled(), error});
        if (!delivery.isSettled()) {
            DeliveryState localState = this.getDeliveryState(error);
            LOG.debug("settling transfer [local state: {}]", (Object)localState.getType().name());
            delivery.disposition(localState, true);
        }
    }

    private DeliveryState getDeliveryState(Throwable t) {
        if (t instanceof ClientErrorException) {
            Rejected rejected = new Rejected();
            rejected.setError(ProtonHelper.condition((Symbol)Constants.AMQP_BAD_REQUEST, (String)t.getMessage()));
            return rejected;
        }
        return new Released();
    }
}

