/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.amqp;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonSender;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.MessageNotProcessedException;
import org.eclipse.hono.client.MessageUndeliverableException;
import org.eclipse.hono.client.NoConsumerException;
import org.eclipse.hono.client.SendMessageTimeoutException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.amqp.AbstractHonoClient;
import org.eclipse.hono.client.amqp.config.AddressHelper;
import org.eclipse.hono.client.amqp.config.ClientConfigProperties;
import org.eclipse.hono.client.amqp.connection.AmqpUtils;
import org.eclipse.hono.client.amqp.connection.ErrorConverter;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.amqp.connection.HonoProtonHelper;
import org.eclipse.hono.client.amqp.connection.SendMessageSampler;
import org.eclipse.hono.tracing.TracingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GenericSenderLink
extends AbstractHonoClient {
    private static final AtomicLong MESSAGE_COUNTER = new AtomicLong();
    private static final Logger log = LoggerFactory.getLogger(GenericSenderLink.class);
    private final String tenantId;
    private final String targetAddress;
    private final SendMessageSampler sampler;
    private boolean errorInfoLoggingEnabled;

    protected GenericSenderLink(HonoConnection connection, ProtonSender sender, String tenantId, String targetAddress, SendMessageSampler sampler) {
        super(connection);
        this.sender = Objects.requireNonNull(sender);
        this.tenantId = tenantId;
        this.targetAddress = targetAddress;
        this.sampler = Objects.requireNonNull(sampler);
        if (sender.isOpen()) {
            this.offeredCapabilities = Optional.ofNullable(sender.getRemoteOfferedCapabilities()).map(List::of).orElse(Collections.emptyList());
        }
    }

    public static Future<GenericSenderLink> create(HonoConnection con, Handler<String> remoteCloseHook) {
        Objects.requireNonNull(con);
        return con.createSender(null, ProtonQoS.AT_LEAST_ONCE, remoteCloseHook).map(sender -> new GenericSenderLink(con, (ProtonSender)sender, null, null, SendMessageSampler.noop()));
    }

    public static Future<GenericSenderLink> create(HonoConnection con, String address, Handler<String> remoteCloseHook) {
        Objects.requireNonNull(con);
        Objects.requireNonNull(address);
        String targetAddress = AddressHelper.rewrite((String)address, (ClientConfigProperties)con.getConfig());
        return con.createSender(targetAddress, ProtonQoS.AT_LEAST_ONCE, remoteCloseHook).map(sender -> new GenericSenderLink(con, (ProtonSender)sender, null, targetAddress, SendMessageSampler.noop()));
    }

    public static Future<GenericSenderLink> create(HonoConnection con, String endpoint, String tenantId, SendMessageSampler sampler, Handler<String> remoteCloseHook) {
        Objects.requireNonNull(con);
        Objects.requireNonNull(endpoint);
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(sampler);
        return GenericSenderLink.create(con, endpoint, tenantId, null, sampler, remoteCloseHook);
    }

    public static Future<GenericSenderLink> create(HonoConnection con, String endpoint, String tenantId, String resourceId, SendMessageSampler sampler, Handler<String> remoteCloseHook) {
        Objects.requireNonNull(con);
        Objects.requireNonNull(endpoint);
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(sampler);
        String targetAddress = AddressHelper.getTargetAddress((String)endpoint, (String)tenantId, (String)resourceId, (ClientConfigProperties)con.getConfig());
        return con.createSender(targetAddress, ProtonQoS.AT_LEAST_ONCE, remoteCloseHook).map(sender -> new GenericSenderLink(con, (ProtonSender)sender, tenantId, targetAddress, sampler));
    }

    public final Future<Void> close() {
        log.debug("closing sender ...");
        return this.closeLinks();
    }

    public final boolean isOpen() {
        return HonoProtonHelper.isLinkOpenAndConnected((ProtonLink)this.sender);
    }

    public final Future<ProtonDelivery> send(Message message, Span currentSpan) {
        return this.checkForCreditAndSend(message, currentSpan, () -> this.sendMessage(message, currentSpan));
    }

    public Future<ProtonDelivery> sendAndWaitForOutcome(Message message, Span currentSpan) {
        return this.checkForCreditAndSend(message, currentSpan, () -> this.sendMessageAndWaitForOutcome(message, currentSpan, true));
    }

    public Future<ProtonDelivery> sendAndWaitForRawOutcome(Message message, Span currentSpan) {
        return this.checkForCreditAndSend(message, currentSpan, () -> this.sendMessageAndWaitForOutcome(message, currentSpan, false));
    }

    public void setErrorInfoLoggingEnabled(boolean errorInfoLoggingEnabled) {
        this.errorInfoLoggingEnabled = errorInfoLoggingEnabled;
    }

    private Future<ProtonDelivery> checkForCreditAndSend(Message message, Span currentSpan, Supplier<Future<ProtonDelivery>> sendOperation) {
        Objects.requireNonNull(message);
        Objects.requireNonNull(currentSpan);
        Objects.requireNonNull(sendOperation);
        Tags.MESSAGE_BUS_DESTINATION.set(currentSpan, this.getMessageAddress(message));
        TracingHelper.TAG_QOS.set(currentSpan, this.sender.getQoS().toString());
        Tags.SPAN_KIND.set(currentSpan, "producer");
        TracingHelper.setDeviceTags((Span)currentSpan, (String)this.tenantId, (String)AmqpUtils.getDeviceId((Message)message));
        AmqpUtils.injectSpanContext((Tracer)this.connection.getTracer(), (SpanContext)currentSpan.context(), (Message)message, (boolean)this.connection.getConfig().isUseLegacyTraceContextFormat());
        return this.connection.executeOnContext(result -> {
            if (this.sender.sendQueueFull()) {
                NoConsumerException e = new NoConsumerException("no credit available");
                this.logMessageSendingError("error sending message [ID: {}, address: {}], no credit available (drain={})", message.getMessageId(), this.getMessageAddress(message), this.sender.getDrain());
                TracingHelper.TAG_CREDIT.set(currentSpan, Integer.valueOf(0));
                this.logError(currentSpan, (Throwable)e);
                currentSpan.finish();
                result.fail((Throwable)e);
                this.sampler.noCredit(this.tenantId);
            } else {
                ((Future)sendOperation.get()).onComplete((Handler)result);
            }
        });
    }

    private Future<ProtonDelivery> sendMessage(Message message, Span currentSpan) {
        Objects.requireNonNull(message);
        Objects.requireNonNull(currentSpan);
        String messageId = String.format("%s-%d", this.getClass().getSimpleName(), MESSAGE_COUNTER.getAndIncrement());
        message.setMessageId((Object)messageId);
        this.logMessageIdAndSenderInfo(currentSpan, messageId);
        SendMessageSampler.Sample sample = this.sampler.start(this.tenantId);
        AtomicReference<ProtonDelivery> deliveryRef = new AtomicReference<ProtonDelivery>();
        ClientConfigProperties config = this.connection.getConfig();
        AtomicBoolean timeoutReached = new AtomicBoolean(false);
        Long timerId = config.getSendMessageTimeout() > 0L ? Long.valueOf(this.connection.getVertx().setTimer(config.getSendMessageTimeout(), id -> {
            if (timeoutReached.compareAndSet(false, true)) {
                this.handleSendMessageTimeout(message, config.getSendMessageTimeout(), (ProtonDelivery)deliveryRef.get(), sample, null, currentSpan);
            }
        })) : null;
        ProtonDelivery delivery = this.sender.send(message, deliveryUpdated -> {
            if (timerId != null) {
                this.connection.getVertx().cancelTimer(timerId.longValue());
            }
            DeliveryState remoteState = deliveryUpdated.getRemoteState();
            sample.completed(remoteState);
            if (timeoutReached.get()) {
                log.debug("ignoring received delivery update for message [ID: {}, address: {}]: waiting for the update has already timed out", (Object)messageId, (Object)this.getMessageAddress(message));
            } else if (deliveryUpdated.remotelySettled()) {
                this.logUpdatedDeliveryState(currentSpan, message, (ProtonDelivery)deliveryUpdated);
            } else {
                this.logMessageSendingError("peer did not settle message [ID: {}, address: {}, remote state: {}], failing delivery", messageId, this.getMessageAddress(message), remoteState.getClass().getSimpleName());
                TracingHelper.logError((Span)currentSpan, (Throwable)new ServerErrorException(500, "peer did not settle message, failing delivery"));
            }
            currentSpan.finish();
        });
        deliveryRef.set(delivery);
        log.trace("sent AT_MOST_ONCE message [ID: {}, address: {}], remaining credit: {}, queued messages: {}", new Object[]{messageId, this.getMessageAddress(message), this.sender.getCredit(), this.sender.getQueued()});
        return Future.succeededFuture((Object)delivery);
    }

    private Future<ProtonDelivery> sendMessageAndWaitForOutcome(Message message, Span currentSpan, boolean mapUnacceptedOutcomeToErrorResult) {
        Objects.requireNonNull(message);
        Objects.requireNonNull(currentSpan);
        AtomicReference<ProtonDelivery> deliveryRef = new AtomicReference<ProtonDelivery>();
        Promise result = Promise.promise();
        String messageId = String.format("%s-%d", this.getClass().getSimpleName(), MESSAGE_COUNTER.getAndIncrement());
        message.setMessageId((Object)messageId);
        this.logMessageIdAndSenderInfo(currentSpan, messageId);
        SendMessageSampler.Sample sample = this.sampler.start(this.tenantId);
        ClientConfigProperties config = this.connection.getConfig();
        Long timerId = config.getSendMessageTimeout() > 0L ? Long.valueOf(this.connection.getVertx().setTimer(config.getSendMessageTimeout(), id -> {
            if (!result.future().isComplete()) {
                this.handleSendMessageTimeout(message, config.getSendMessageTimeout(), (ProtonDelivery)deliveryRef.get(), sample, (Promise<ProtonDelivery>)result, null);
            }
        })) : null;
        deliveryRef.set(this.sender.send(message, deliveryUpdated -> {
            if (timerId != null) {
                this.connection.getVertx().cancelTimer(timerId.longValue());
            }
            DeliveryState remoteState = deliveryUpdated.getRemoteState();
            if (result.future().isComplete()) {
                log.debug("ignoring received delivery update for message [ID: {}, address: {}]: waiting for the update has already timed out", (Object)messageId, (Object)this.getMessageAddress(message));
            } else if (deliveryUpdated.remotelySettled()) {
                this.logUpdatedDeliveryState(currentSpan, message, (ProtonDelivery)deliveryUpdated);
                sample.completed(remoteState);
                if (Accepted.class.isInstance(remoteState)) {
                    result.complete(deliveryUpdated);
                } else if (mapUnacceptedOutcomeToErrorResult) {
                    result.handle(this.mapUnacceptedOutcomeToErrorResult((ProtonDelivery)deliveryUpdated));
                } else {
                    result.complete(deliveryUpdated);
                }
            } else {
                this.logMessageSendingError("peer did not settle message [ID: {}, address: {}, remote state: {}], failing delivery", messageId, this.getMessageAddress(message), remoteState.getClass().getSimpleName());
                ServerErrorException e = new ServerErrorException(500, "peer did not settle message, failing delivery");
                result.fail((Throwable)e);
            }
        }));
        log.trace("sent AT_LEAST_ONCE message [ID: {}, address: {}], remaining credit: {}, queued messages: {}", new Object[]{messageId, this.getMessageAddress(message), this.sender.getCredit(), this.sender.getQueued()});
        return result.future().onSuccess(delivery -> Tags.HTTP_STATUS.set(currentSpan, Integer.valueOf(202))).onFailure(t -> {
            TracingHelper.logError((Span)currentSpan, (Throwable)t);
            Tags.HTTP_STATUS.set(currentSpan, Integer.valueOf(ServiceInvocationException.extractStatusCode((Throwable)t)));
        }).onComplete(r -> currentSpan.finish());
    }

    protected void handleSendMessageTimeout(Message message, long sendMessageTimeout, ProtonDelivery delivery, SendMessageSampler.Sample sample, Promise<ProtonDelivery> resultPromise, Span spanToLogToAndFinish) {
        Objects.requireNonNull(message);
        String linkOrConnectionClosedInfo = HonoProtonHelper.isLinkOpenAndConnected((ProtonLink)this.sender) ? "" : " (link or connection already closed)";
        String exceptionMsg = "waiting for delivery update timed out after " + sendMessageTimeout + "ms";
        SendMessageTimeoutException exception = HonoProtonHelper.isLinkOpenAndConnected((ProtonLink)this.sender) ? new SendMessageTimeoutException(exceptionMsg) : new ServerErrorException(503, exceptionMsg + linkOrConnectionClosedInfo);
        this.logMessageSendingError("waiting for delivery update timed out for message [ID: {}, address: {}] after {}ms{}", message.getMessageId(), this.getMessageAddress(message), sendMessageTimeout, linkOrConnectionClosedInfo);
        if (delivery != null) {
            ProtonHelper.released((ProtonDelivery)delivery, (boolean)true);
        }
        if (spanToLogToAndFinish != null) {
            TracingHelper.logError((Span)spanToLogToAndFinish, (String)exception.getMessage());
            Tags.HTTP_STATUS.set(spanToLogToAndFinish, Integer.valueOf(503));
            spanToLogToAndFinish.finish();
        }
        if (resultPromise != null) {
            resultPromise.fail((Throwable)exception);
        }
        if (sample != null) {
            sample.timeout();
        }
    }

    private Future<ProtonDelivery> mapUnacceptedOutcomeToErrorResult(ProtonDelivery delivery) {
        DeliveryState remoteState = delivery.getRemoteState();
        if (remoteState instanceof Accepted) {
            throw new IllegalStateException("delivery is expected to be rejected, released or modified here, not accepted");
        }
        Object e = null;
        if (remoteState instanceof Rejected) {
            Rejected rejected = (Rejected)remoteState;
            e = Optional.ofNullable(rejected.getError()).map(ErrorConverter::fromTransferError).orElseGet(() -> new ClientErrorException(400));
        } else if (remoteState instanceof Released) {
            e = new MessageNotProcessedException();
        } else if (remoteState instanceof Modified) {
            Modified modified = (Modified)remoteState;
            e = modified.getUndeliverableHere() != false ? new MessageUndeliverableException() : new MessageNotProcessedException();
        }
        return Future.failedFuture((Throwable)e);
    }

    protected final void logMessageIdAndSenderInfo(Span currentSpan, String messageId) {
        HashMap<String, Object> details = new HashMap<String, Object>(2);
        details.put(TracingHelper.TAG_MESSAGE_ID.getKey(), messageId);
        details.put(TracingHelper.TAG_CREDIT.getKey(), this.sender.getCredit());
        currentSpan.log(details);
    }

    protected final void logUpdatedDeliveryState(Span currentSpan, Message message, ProtonDelivery delivery) {
        Objects.requireNonNull(currentSpan);
        String messageId = message.getMessageId() != null ? message.getMessageId().toString() : "";
        String messageAddress = this.getMessageAddress(message);
        DeliveryState remoteState = delivery.getRemoteState();
        if (remoteState instanceof Accepted) {
            log.trace("message [ID: {}, address: {}] accepted by peer", (Object)messageId, (Object)messageAddress);
            currentSpan.log("message accepted by peer");
            Tags.HTTP_STATUS.set(currentSpan, Integer.valueOf(202));
        } else {
            HashMap<String, Object> events = new HashMap<String, Object>();
            if (remoteState instanceof Rejected) {
                Rejected rejected = (Rejected)remoteState;
                Tags.HTTP_STATUS.set(currentSpan, Integer.valueOf(400));
                if (rejected.getError() == null) {
                    this.logMessageSendingError("message [ID: {}, address: {}] rejected by peer", messageId, messageAddress);
                    events.put("message", "message rejected by peer");
                } else {
                    this.logMessageSendingError("message [ID: {}, address: {}] rejected by peer: {}, {}", messageId, messageAddress, rejected.getError().getCondition(), rejected.getError().getDescription());
                    events.put("message", String.format("message rejected by peer: %s, %s", rejected.getError().getCondition(), rejected.getError().getDescription()));
                }
            } else if (remoteState instanceof Released) {
                this.logMessageSendingError("message [ID: {}, address: {}] not accepted by peer, remote state: {}", messageId, messageAddress, remoteState.getClass().getSimpleName());
                Tags.HTTP_STATUS.set(currentSpan, Integer.valueOf(503));
                events.put("message", "message not accepted by peer, remote state: " + String.valueOf(remoteState));
            } else if (remoteState instanceof Modified) {
                Modified modified = (Modified)remoteState;
                this.logMessageSendingError("message [ID: {}, address: {}] not accepted by peer, remote state: {}", messageId, messageAddress, modified);
                Tags.HTTP_STATUS.set(currentSpan, Integer.valueOf(modified.getUndeliverableHere() != false ? 404 : 503));
                events.put("message", "message not accepted by peer, remote state: " + String.valueOf(remoteState));
            }
            TracingHelper.logError((Span)currentSpan, events);
        }
    }

    private String getMessageAddress(Message message) {
        Objects.requireNonNull(message);
        return Optional.ofNullable(message.getAddress()).orElse(this.targetAddress);
    }

    private void logMessageSendingError(String format, Object ... arguments) {
        if (this.errorInfoLoggingEnabled) {
            log.info(format, arguments);
        } else {
            log.debug(format, arguments);
        }
    }
}

