/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.service.amqp;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.eclipse.hono.auth.HonoUser;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.config.ServiceConfigProperties;
import org.eclipse.hono.service.amqp.AbstractAmqpEndpoint;
import org.eclipse.hono.service.auth.AuthorizationService;
import org.eclipse.hono.service.auth.ClaimsBasedAuthorizationService;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.EventBusMessage;
import org.eclipse.hono.util.HonoProtonHelper;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.ResourceIdentifier;
import org.springframework.beans.factory.annotation.Autowired;

public abstract class RequestResponseEndpoint<T extends ServiceConfigProperties>
extends AbstractAmqpEndpoint<T> {
    private final Map<String, ProtonSender> replyToSenderMap = new HashMap<String, ProtonSender>();
    private AuthorizationService authorizationService = new ClaimsBasedAuthorizationService();

    protected RequestResponseEndpoint(Vertx vertx) {
        super(Objects.requireNonNull(vertx));
    }

    protected abstract String getEventBusServiceAddress();

    protected abstract Future<EventBusMessage> createEventBusRequestMessage(org.apache.qpid.proton.message.Message var1, ResourceIdentifier var2, HonoUser var3);

    protected abstract org.apache.qpid.proton.message.Message getAmqpReply(EventBusMessage var1);

    public final AuthorizationService getAuthorizationService() {
        return this.authorizationService;
    }

    @Autowired(required=false)
    public final void setAuthorizationService(AuthorizationService authService) {
        this.authorizationService = authService;
    }

    @Override
    public final void onLinkAttach(ProtonConnection con, ProtonReceiver receiver, ResourceIdentifier targetAddress) {
        if (ProtonQoS.AT_MOST_ONCE.equals((Object)receiver.getRemoteQoS())) {
            this.logger.debug("client wants to use unsupported AT MOST ONCE delivery mode for endpoint [{}], closing link ...", (Object)this.getName());
            receiver.setCondition(ProtonHelper.condition((String)AmqpError.PRECONDITION_FAILED.toString(), (String)"endpoint requires AT_LEAST_ONCE QoS"));
            receiver.close();
        } else {
            this.logger.debug("establishing link for receiving request messages from client [{}]", (Object)receiver.getName());
            receiver.setQoS(ProtonQoS.AT_LEAST_ONCE);
            receiver.setAutoAccept(true);
            receiver.setPrefetch(0);
            receiver.handler((delivery, message) -> this.handleRequestMessage(con, receiver, targetAddress, delivery, message));
            HonoProtonHelper.setCloseHandler((ProtonLink)receiver, remoteClose -> this.onLinkDetach(receiver));
            HonoProtonHelper.setDetachHandler((ProtonLink)receiver, remoteDetach -> this.onLinkDetach(receiver));
            receiver.open();
            this.logger.debug("flowing {} credits to client", (Object)((ServiceConfigProperties)this.config).getReceiverLinkCredit());
            receiver.flow(((ServiceConfigProperties)this.config).getReceiverLinkCredit());
        }
    }

    protected final void handleRequestMessage(ProtonConnection con, ProtonReceiver receiver, ResourceIdentifier targetAddress, ProtonDelivery delivery, org.apache.qpid.proton.message.Message requestMessage) {
        HonoUser clientPrincipal = Constants.getClientPrincipal((ProtonConnection)con);
        String replyTo = requestMessage.getReplyTo();
        SpanContext spanContext = TracingHelper.extractSpanContext((Tracer)this.tracer, (org.apache.qpid.proton.message.Message)requestMessage);
        Span currentSpan = this.tracer.buildSpan("process request message").asChildOf(spanContext).ignoreActiveSpan().withTag(Tags.COMPONENT.getKey(), this.getName()).withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.HTTP_METHOD.getKey(), requestMessage.getSubject()).withTag(Tags.MESSAGE_BUS_DESTINATION.getKey(), targetAddress.toString()).start();
        if (!this.passesFormalVerification(targetAddress, requestMessage)) {
            MessageHelper.rejected((ProtonDelivery)delivery, (ErrorCondition)new ErrorCondition(Constants.AMQP_BAD_REQUEST, "malformed request message"));
            this.flowCreditToRequestor(receiver, replyTo);
            TracingHelper.logError((Span)currentSpan, (String)"malformed request message");
            currentSpan.finish();
            return;
        }
        ProtonHelper.accepted((ProtonDelivery)delivery, (boolean)true);
        currentSpan.log("request message accepted");
        Future<ProtonSender> sender = this.getSenderForConnection(con, replyTo);
        Future<EventBusMessage> request = this.createEventBusRequestMessage(requestMessage, targetAddress, clientPrincipal);
        CompositeFuture.all(request, sender).compose(ok -> this.isAuthorized(clientPrincipal, targetAddress, requestMessage)).map(authorized -> {
            this.logger.debug("client [{}] is {}authorized to {}:{}", new Object[]{clientPrincipal.getName(), authorized != false ? "" : "not ", targetAddress, requestMessage.getSubject()});
            if (authorized.booleanValue()) {
                return authorized;
            }
            throw new ClientErrorException(403, "not authorized to invoke operation");
        }).compose(authorized -> {
            Future reply = Future.future();
            DeliveryOptions options = this.createEventBusMessageDeliveryOptions(currentSpan.context());
            this.vertx.eventBus().send(this.getEventBusServiceAddress(), (Object)((EventBusMessage)request.result()).toJson(), options, (Handler)reply);
            return reply;
        }).map(reply -> this.extractResponse((Message<Object>)reply)).compose(eventBusMessage -> this.filterResponse(clientPrincipal, (EventBusMessage)eventBusMessage)).map(filteredResponse -> {
            Tags.HTTP_STATUS.set(currentSpan, filteredResponse.getStatus());
            return this.getAmqpReply((EventBusMessage)filteredResponse);
        }).otherwise(t -> {
            int statusCode;
            this.logger.debug("error processing request [resource: {}, op: {}]: {}", new Object[]{targetAddress, requestMessage.getSubject(), t.getMessage()});
            currentSpan.log("error processing request");
            TracingHelper.logError((Span)currentSpan, (Throwable)t);
            if (t instanceof ReplyException) {
                ReplyException ex = (ReplyException)t;
                switch (ex.failureType()) {
                    case TIMEOUT: {
                        statusCode = 503;
                        break;
                    }
                    default: {
                        statusCode = 500;
                        break;
                    }
                }
            } else {
                statusCode = ServiceInvocationException.extractStatusCode((Throwable)t);
            }
            Tags.HTTP_STATUS.set(currentSpan, Integer.valueOf(statusCode));
            return this.getAmqpReply(((EventBusMessage)request.result()).getResponse(statusCode));
        }).map(amqpMessage -> {
            if (((ProtonSender)sender.result()).isOpen()) {
                ProtonDelivery responseDelivery = ((ProtonSender)sender.result()).send(amqpMessage);
                currentSpan.log("sent response message to client");
                return responseDelivery;
            }
            TracingHelper.logError((Span)currentSpan, (String)"cannot send response, reply-to link is closed");
            return null;
        }).setHandler(s -> {
            this.flowCreditToRequestor(receiver, replyTo);
            currentSpan.finish();
        });
    }

    protected Future<Boolean> isAuthorized(HonoUser clientPrincipal, ResourceIdentifier resource, org.apache.qpid.proton.message.Message message) {
        Objects.requireNonNull(message);
        return this.getAuthorizationService().isAuthorized(clientPrincipal, resource, message.getSubject());
    }

    private EventBusMessage extractResponse(Message<Object> response) {
        if (response.body() instanceof JsonObject) {
            JsonObject body = (JsonObject)response.body();
            return EventBusMessage.fromJson((JsonObject)body);
        }
        throw new ServerErrorException(500);
    }

    protected Future<EventBusMessage> filterResponse(HonoUser clientPrincipal, EventBusMessage response) {
        return Future.succeededFuture((Object)Objects.requireNonNull(response));
    }

    @Override
    public final void onLinkAttach(ProtonConnection con, ProtonSender sender, ResourceIdentifier replyToAddress) {
        if (!this.isValidReplyToAddress(replyToAddress)) {
            this.logger.debug("client [{}] provided invalid reply-to address", (Object)sender.getName());
            sender.setCondition(ProtonHelper.condition((Symbol)AmqpError.INVALID_FIELD, (String)String.format("reply-to address must have the following format %s/<tenant>/<reply-address>", this.getName())));
            sender.close();
            return;
        }
        String replyTo = replyToAddress.toString();
        if (this.replyToSenderMap.containsKey(replyTo)) {
            this.logger.debug("client [{}] wanted to subscribe to already subscribed reply-to address [{}]", (Object)sender.getName(), (Object)replyTo);
            sender.setCondition(ProtonHelper.condition((Symbol)AmqpError.ILLEGAL_STATE, (String)String.format("reply-to address [%s] is already subscribed", replyTo)));
            sender.close();
            return;
        }
        this.logger.debug("establishing response sender link with client [{}]", (Object)sender.getName());
        this.registerSenderForReplyTo(replyTo, sender);
        sender.setQoS(ProtonQoS.AT_LEAST_ONCE);
        HonoProtonHelper.setCloseHandler((ProtonLink)sender, remoteClose -> {
            this.logger.debug("client [{}] closed sender link", (Object)sender.getName());
            this.unregisterSenderForReplyTo(replyTo);
            sender.close();
        });
        HonoProtonHelper.setDetachHandler((ProtonLink)sender, remoteDetach -> {
            this.logger.debug("client [{}] detached sender link", (Object)sender.getName());
            this.unregisterSenderForReplyTo(replyTo);
            sender.close();
        });
        sender.open();
    }

    @Override
    public void onConnectionClosed(ProtonConnection connection) {
        Objects.requireNonNull(connection);
        this.deallocateAllSendersForConnection(connection);
    }

    private Future<ProtonSender> getSenderForConnection(ProtonConnection con, String replytoAddress) {
        Future result = Future.future();
        ProtonSender sender = this.replyToSenderMap.get(replytoAddress);
        if (sender != null && sender.isOpen() && sender.getSession().getConnection() == con) {
            result.complete((Object)sender);
        } else {
            result.fail((Throwable)new ClientErrorException(412, "must open receiver link for reply-to address first"));
        }
        return result;
    }

    private void registerSenderForReplyTo(String replyTo, ProtonSender sender) {
        ProtonSender oldSender = this.replyToSenderMap.put(replyTo, sender);
        if (oldSender == null || oldSender == sender) {
            this.logger.debug("registered sender [{}] for replies to [{}]", (Object)sender, (Object)replyTo);
        } else {
            this.logger.info("replaced existing sender [{}] for replies to [{}] with sender [{}]", new Object[]{oldSender, replyTo, sender});
        }
    }

    private void unregisterSenderForReplyTo(String replyTo) {
        ProtonSender sender = this.replyToSenderMap.remove(replyTo);
        if (sender == null) {
            this.logger.warn("sender was not allocated for replyTo address [{}]", (Object)replyTo);
        } else {
            this.logger.debug("deallocated sender [{}] for replies to [{}]", (Object)sender, (Object)replyTo);
        }
    }

    private void deallocateAllSendersForConnection(ProtonConnection connection) {
        this.replyToSenderMap.entrySet().removeIf(entry -> ((ProtonSender)entry.getValue()).getSession().getConnection() == connection);
    }

    private void flowCreditToRequestor(ProtonReceiver receiver, String replyTo) {
        receiver.flow(1);
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("replenished client [reply-to: {}, current credit: {}]", (Object)replyTo, (Object)receiver.getCredit());
        }
    }

    protected boolean isValidReplyToAddress(ResourceIdentifier replyToAddress) {
        if (replyToAddress == null) {
            return false;
        }
        return replyToAddress.getResourcePath().length >= 3;
    }
}

