/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.service.amqp;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import java.util.Objects;
import java.util.Optional;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.auth.HonoUser;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.config.ServiceConfigProperties;
import org.eclipse.hono.service.amqp.AbstractAmqpEndpoint;
import org.eclipse.hono.service.auth.AuthorizationService;
import org.eclipse.hono.service.auth.ClaimsBasedAuthorizationService;
import org.eclipse.hono.util.AmqpErrorException;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.EventBusMessage;
import org.eclipse.hono.util.HonoProtonHelper;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.ResourceIdentifier;
import org.springframework.beans.factory.annotation.Autowired;

public abstract class RequestResponseEndpoint<T extends ServiceConfigProperties>
extends AbstractAmqpEndpoint<T> {
    private AuthorizationService authorizationService = new ClaimsBasedAuthorizationService();

    protected RequestResponseEndpoint(Vertx vertx) {
        super(Objects.requireNonNull(vertx));
    }

    public abstract void processRequest(Message var1, ResourceIdentifier var2, HonoUser var3);

    protected abstract Message getAmqpReply(EventBusMessage var1);

    public final AuthorizationService getAuthorizationService() {
        return this.authorizationService;
    }

    @Autowired(required=false)
    public final void setAuthorizationService(AuthorizationService authService) {
        this.authorizationService = authService;
    }

    @Override
    public final void onLinkAttach(ProtonConnection con, ProtonReceiver receiver, ResourceIdentifier targetAddress) {
        if (ProtonQoS.AT_MOST_ONCE.equals((Object)receiver.getRemoteQoS())) {
            this.logger.debug("client wants to use unsupported AT MOST ONCE delivery mode for endpoint [{}], closing link ...", (Object)this.getName());
            receiver.setCondition(ProtonHelper.condition((String)AmqpError.PRECONDITION_FAILED.toString(), (String)"endpoint requires AT_LEAST_ONCE QoS"));
            receiver.close();
        } else {
            this.logger.debug("establishing link for receiving messages from client [{}]", (Object)receiver.getName());
            receiver.setQoS(ProtonQoS.AT_LEAST_ONCE);
            receiver.setAutoAccept(true);
            receiver.setPrefetch(((ServiceConfigProperties)this.config).getReceiverLinkCredit());
            receiver.handler((delivery, message) -> this.handleMessage(con, receiver, targetAddress, delivery, message));
            HonoProtonHelper.setCloseHandler((ProtonLink)receiver, clientDetached -> this.onLinkDetach(receiver));
            receiver.open();
        }
    }

    protected final void handleMessage(ProtonConnection con, ProtonReceiver receiver, ResourceIdentifier targetAddress, ProtonDelivery delivery, Message message) {
        Future formalCheck = Future.future();
        if (this.passesFormalVerification(targetAddress, message)) {
            formalCheck.complete();
        } else {
            formalCheck.fail((Throwable)new AmqpErrorException(AmqpError.DECODE_ERROR, "malformed payload"));
        }
        HonoUser clientPrincipal = Constants.getClientPrincipal((ProtonConnection)con);
        formalCheck.compose(ok -> this.isAuthorized(clientPrincipal, targetAddress, message)).compose(authorized -> {
            this.logger.debug("client [{}] is {}authorized to {}:{}", new Object[]{clientPrincipal.getName(), authorized != false ? "" : "not ", targetAddress, message.getSubject()});
            if (authorized.booleanValue()) {
                try {
                    this.processRequest(message, targetAddress, clientPrincipal);
                    ProtonHelper.accepted((ProtonDelivery)delivery, (boolean)true);
                    return Future.succeededFuture();
                }
                catch (DecodeException e) {
                    return Future.failedFuture((Throwable)new AmqpErrorException(AmqpError.DECODE_ERROR, "malformed payload"));
                }
            }
            return Future.failedFuture((Throwable)new AmqpErrorException(AmqpError.UNAUTHORIZED_ACCESS, "unauthorized"));
        }).otherwise(t -> {
            if (t instanceof AmqpErrorException) {
                AmqpErrorException cause = (AmqpErrorException)t;
                MessageHelper.rejected((ProtonDelivery)delivery, (ErrorCondition)cause.asErrorCondition());
            } else {
                this.logger.debug("error processing request [resource: {}, op: {}]: {}", new Object[]{targetAddress, message.getSubject(), t.getMessage()});
                MessageHelper.rejected((ProtonDelivery)delivery, (ErrorCondition)ProtonHelper.condition((Symbol)AmqpError.INTERNAL_ERROR, (String)"internal error"));
            }
            return null;
        });
    }

    protected Future<Boolean> isAuthorized(HonoUser clientPrincipal, ResourceIdentifier resource, Message message) {
        Objects.requireNonNull(message);
        return this.getAuthorizationService().isAuthorized(clientPrincipal, resource, message.getSubject());
    }

    protected Future<EventBusMessage> filterResponse(HonoUser clientPrincipal, EventBusMessage response) {
        return Future.succeededFuture((Object)Objects.requireNonNull(response));
    }

    @Override
    public final void onLinkAttach(ProtonConnection con, ProtonSender sender, ResourceIdentifier replyToAddress) {
        if (this.isValidReplyToAddress(replyToAddress)) {
            this.logger.debug("establishing sender link with client [{}]", (Object)sender.getName());
            MessageConsumer replyConsumer = this.vertx.eventBus().consumer(replyToAddress.toString(), message -> {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("forwarding reply to client [{}]: {}", (Object)sender.getName(), (Object)((JsonObject)message.body()).encodePrettily());
                }
                EventBusMessage response = EventBusMessage.fromJson((JsonObject)((JsonObject)message.body()));
                this.filterResponse(Constants.getClientPrincipal((ProtonConnection)con), response).recover(t -> {
                    int status = Optional.of(t).map(cause -> {
                        if (cause instanceof ServiceInvocationException) {
                            return ((ServiceInvocationException)cause).getErrorCode();
                        }
                        return null;
                    }).orElse(500);
                    return Future.succeededFuture((Object)response.getResponse(status));
                }).map(filteredResponse -> {
                    Message amqpReply = this.getAmqpReply((EventBusMessage)filteredResponse);
                    sender.send(amqpReply);
                    return null;
                });
            });
            sender.setQoS(ProtonQoS.AT_LEAST_ONCE);
            HonoProtonHelper.setCloseHandler((ProtonLink)sender, senderClosed -> {
                this.logger.debug("client [{}] closed sender link, removing associated event bus consumer [{}]", (Object)sender.getName(), (Object)replyConsumer.address());
                replyConsumer.unregister();
                if (senderClosed.succeeded()) {
                    ((ProtonSender)senderClosed.result()).close();
                }
            });
            sender.open();
        } else {
            this.logger.debug("client [{}] provided invalid reply-to address", (Object)sender.getName());
            sender.setCondition(ProtonHelper.condition((Symbol)AmqpError.INVALID_FIELD, (String)String.format("reply-to address must have the following format %s/<tenant>/<reply-address>", this.getName())));
            sender.close();
        }
    }

    protected boolean isValidReplyToAddress(ResourceIdentifier replyToAddress) {
        if (replyToAddress == null) {
            return false;
        }
        return replyToAddress.getResourcePath().length >= 3;
    }
}

