/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.application.client.amqp;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageConsumer;
import org.eclipse.hono.application.client.amqp.AmqpApplicationClient;
import org.eclipse.hono.application.client.amqp.AmqpMessageContext;
import org.eclipse.hono.application.client.amqp.ProtonBasedCommandSender;
import org.eclipse.hono.application.client.amqp.ProtonBasedDownstreamMessage;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.amqp.GenericReceiverLink;
import org.eclipse.hono.client.amqp.connection.AmqpUtils;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.amqp.connection.SendMessageSampler;
import org.eclipse.hono.util.LifecycleStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProtonBasedApplicationClient
extends ProtonBasedCommandSender
implements AmqpApplicationClient {
    private static final Logger LOG = LoggerFactory.getLogger(ProtonBasedApplicationClient.class);
    protected final LifecycleStatus lifecycleStatus = new LifecycleStatus();
    private final List<Handler<Throwable>> consumerCloseHandlers = new ArrayList<Handler<Throwable>>();

    public ProtonBasedApplicationClient(HonoConnection connection) {
        super(connection, SendMessageSampler.Factory.noop());
    }

    private String getTenantScopedLinkAddress(String endpointName, String tenantId) {
        return String.format("%s/%s", endpointName, tenantId);
    }

    public void addOnClientReadyHandler(Handler<AsyncResult<Void>> handler) {
        if (handler != null) {
            this.lifecycleStatus.addOnStartedHandler(handler);
        }
    }

    public Future<Void> start() {
        if (this.lifecycleStatus.isStarting()) {
            return Future.succeededFuture();
        }
        if (!this.lifecycleStatus.setStarting()) {
            return Future.failedFuture((Throwable)new IllegalStateException("client is already started/stopping"));
        }
        return this.connectOnStart().onSuccess(v -> this.lifecycleStatus.setStarted());
    }

    public Future<Void> stop() {
        return this.lifecycleStatus.runStopAttempt(() -> this.disconnectOnStop());
    }

    protected void onDisconnect() {
        if (!this.connection.isShutdown()) {
            NoStackTraceThrowable error = new NoStackTraceThrowable("disconnected");
            this.consumerCloseHandlers.forEach(arg_0 -> ProtonBasedApplicationClient.lambda$onDisconnect$2((Throwable)error, arg_0));
            this.consumerCloseHandlers.clear();
        }
        super.onDisconnect();
    }

    public final Future<HonoConnection> connect() {
        LOG.info("connecting to Hono endpoint");
        return this.connection.connect();
    }

    public final void disconnect() {
        this.disconnect((Handler<AsyncResult<Void>>)Promise.promise());
    }

    public final void disconnect(Handler<AsyncResult<Void>> completionHandler) {
        LOG.info("disconnecting from Hono endpoint");
        this.consumerCloseHandlers.clear();
        this.connection.disconnect(completionHandler);
    }

    public final Future<MessageConsumer> createTelemetryConsumer(String tenantId, Handler<DownstreamMessage<AmqpMessageContext>> messageHandler, Handler<Throwable> closeHandler) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(messageHandler);
        String sourceAddress = this.getTenantScopedLinkAddress("telemetry", tenantId);
        return this.createConsumer(sourceAddress, messageHandler, closeHandler);
    }

    public final Future<MessageConsumer> createEventConsumer(String tenantId, Handler<DownstreamMessage<AmqpMessageContext>> messageHandler, Handler<Throwable> closeHandler) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(messageHandler);
        String sourceAddress = this.getTenantScopedLinkAddress("event", tenantId);
        return this.createConsumer(sourceAddress, messageHandler, closeHandler);
    }

    public Future<MessageConsumer> createCommandResponseConsumer(String tenantId, String replyId, Handler<DownstreamMessage<AmqpMessageContext>> messageHandler, Handler<Throwable> closeHandler) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(replyId);
        Objects.requireNonNull(messageHandler);
        String sourceAddress = String.format("%s/%s/%s", "command_response", tenantId, replyId);
        return this.createConsumer(sourceAddress, messageHandler, closeHandler);
    }

    @Override
    public final Future<MessageConsumer> createTelemetryConsumer(String tenantId, Function<DownstreamMessage<AmqpMessageContext>, Future<Void>> messageHandler, Handler<Throwable> closeHandler) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(messageHandler);
        String sourceAddress = this.getTenantScopedLinkAddress("telemetry", tenantId);
        return this.createAsyncConsumer(sourceAddress, messageHandler, closeHandler);
    }

    @Override
    public final Future<MessageConsumer> createEventConsumer(String tenantId, Function<DownstreamMessage<AmqpMessageContext>, Future<Void>> messageHandler, Handler<Throwable> closeHandler) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(messageHandler);
        String sourceAddress = this.getTenantScopedLinkAddress("event", tenantId);
        return this.createAsyncConsumer(sourceAddress, messageHandler, closeHandler);
    }

    private Future<MessageConsumer> createConsumer(String sourceAddress, Handler<DownstreamMessage<AmqpMessageContext>> messageHandler, Handler<Throwable> closeHandler) {
        return this.createConsumer(sourceAddress, (ProtonDelivery delivery, Message message) -> {
            try {
                ProtonBasedDownstreamMessage msg = ProtonBasedDownstreamMessage.from(message, delivery);
                messageHandler.handle((Object)msg);
                if (!delivery.isSettled()) {
                    LOG.debug("client provided message handler did not settle message, auto-accepting ...");
                    ProtonHelper.accepted((ProtonDelivery)delivery, (boolean)true);
                }
            }
            catch (Throwable t) {
                this.handleMessageHandlerError(t, (ProtonDelivery)delivery);
            }
        }, closeHandler);
    }

    private Future<MessageConsumer> createConsumer(String sourceAddress, BiConsumer<ProtonDelivery, Message> messageConsumer, Handler<Throwable> closeHandler) {
        final Handler wrappedCloseHandler = closeHandler != null ? thr -> closeHandler.handle(thr) : null;
        return this.connection.isConnected(this.getDefaultConnectionCheckTimeout()).compose(v -> GenericReceiverLink.create((HonoConnection)this.connection, (String)sourceAddress, (BiConsumer)messageConsumer, (boolean)false, s -> {
            if (closeHandler != null) {
                this.consumerCloseHandlers.remove(wrappedCloseHandler);
                closeHandler.handle(null);
            }
        })).onSuccess(v -> Optional.ofNullable(wrappedCloseHandler).ifPresent(this.consumerCloseHandlers::add)).map(receiverLink -> new MessageConsumer(){
            final /* synthetic */ GenericReceiverLink val$receiverLink;
            {
                this.val$receiverLink = genericReceiverLink;
            }

            public Future<Void> close() {
                Optional.ofNullable(wrappedCloseHandler).ifPresent(ProtonBasedApplicationClient.this.consumerCloseHandlers::remove);
                return this.val$receiverLink.close();
            }
        });
    }

    private Future<MessageConsumer> createAsyncConsumer(String sourceAddress, Function<DownstreamMessage<AmqpMessageContext>, Future<Void>> messageHandler, Handler<Throwable> closeHandler) {
        return this.createConsumer(sourceAddress, (ProtonDelivery delivery, Message message) -> {
            try {
                ProtonBasedDownstreamMessage msg = ProtonBasedDownstreamMessage.from(message, delivery);
                ((Future)messageHandler.apply(msg)).onSuccess(ok -> {
                    if (!delivery.isSettled()) {
                        LOG.debug("client provided message handler did not settle message, auto-accepting ...");
                        ProtonHelper.accepted((ProtonDelivery)delivery, (boolean)true);
                    }
                }).onFailure(t -> this.handleMessageHandlerError((Throwable)t, (ProtonDelivery)delivery));
            }
            catch (Throwable t2) {
                this.handleMessageHandlerError(t2, (ProtonDelivery)delivery);
            }
        }, closeHandler);
    }

    private void handleMessageHandlerError(Throwable error, ProtonDelivery delivery) {
        LOG.debug("client provided message handler threw exception [local state: {}, settled: {}]", new Object[]{Optional.ofNullable(delivery.getLocalState()).map(s -> s.getType().name()).orElse(null), delivery.isSettled(), error});
        if (!delivery.isSettled()) {
            DeliveryState localState = this.getDeliveryState(error);
            LOG.debug("settling transfer [local state: {}]", (Object)localState.getType().name());
            delivery.disposition(localState, true);
        }
    }

    private DeliveryState getDeliveryState(Throwable t) {
        if (t instanceof ClientErrorException) {
            Rejected rejected = new Rejected();
            rejected.setError(ProtonHelper.condition((Symbol)AmqpUtils.AMQP_BAD_REQUEST, (String)t.getMessage()));
            return rejected;
        }
        return new Released();
    }

    private static /* synthetic */ void lambda$onDisconnect$2(Throwable error, Handler h) {
        h.handle((Object)error);
    }
}

