/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.servicebus.primitives;

import com.microsoft.azure.servicebus.amqp.DispatchHandler;
import com.microsoft.azure.servicebus.amqp.IAmqpReceiver;
import com.microsoft.azure.servicebus.amqp.IAmqpSender;
import com.microsoft.azure.servicebus.amqp.ReceiveLinkHandler;
import com.microsoft.azure.servicebus.amqp.SendLinkHandler;
import com.microsoft.azure.servicebus.amqp.SessionHandler;
import com.microsoft.azure.servicebus.primitives.AsyncUtil;
import com.microsoft.azure.servicebus.primitives.ClientConstants;
import com.microsoft.azure.servicebus.primitives.ClientEntity;
import com.microsoft.azure.servicebus.primitives.ExceptionUtil;
import com.microsoft.azure.servicebus.primitives.MessagingEntityType;
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
import com.microsoft.azure.servicebus.primitives.Pair;
import com.microsoft.azure.servicebus.primitives.PayloadSizeExceededException;
import com.microsoft.azure.servicebus.primitives.RequestResponseUtils;
import com.microsoft.azure.servicebus.primitives.RequestResponseWorkItem;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.servicebus.primitives.StringUtil;
import com.microsoft.azure.servicebus.primitives.TimeoutException;
import com.microsoft.azure.servicebus.primitives.Timer;
import com.microsoft.azure.servicebus.primitives.TimerType;
import com.microsoft.azure.servicebus.primitives.Util;
import java.io.IOException;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RequestResponseLink
extends ClientEntity {
    private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(RequestResponseLink.class);
    private final Object recreateLinksLock = new Object();
    private final MessagingFactory underlyingFactory;
    private final String linkPath;
    private final String sasTokenAudienceURI;
    private final CompletableFuture<RequestResponseLink> createFuture;
    private final ConcurrentHashMap<String, RequestResponseWorkItem> pendingRequests;
    private final AtomicInteger requestCounter;
    private final String replyTo;
    private ScheduledFuture<?> sasTokenRenewTimerFuture;
    private InternalReceiver amqpReceiver;
    private InternalSender amqpSender;
    private boolean isRecreateLinksInProgress = false;
    private MessagingEntityType entityType;
    private boolean isInnerLinksCloseHandled;
    private int internalLinkGeneration = 1;

    public static CompletableFuture<RequestResponseLink> createAsync(MessagingFactory messagingFactory, String linkName, String linkPath, String sasTokenAudienceURI, MessagingEntityType entityType) {
        final RequestResponseLink requestReponseLink = new RequestResponseLink(messagingFactory, linkName, linkPath, sasTokenAudienceURI, entityType);
        Timer.schedule(new Runnable(){

            @Override
            public void run() {
                if (!requestReponseLink.createFuture.isDone()) {
                    requestReponseLink.amqpSender.closeInternals(false);
                    requestReponseLink.amqpReceiver.closeInternals(false);
                    requestReponseLink.cancelSASTokenRenewTimer();
                    TimeoutException operationTimedout = new TimeoutException(String.format(Locale.US, "Open operation on RequestResponseLink(%s) on Entity(%s) timed out at %s.", requestReponseLink.getClientId(), requestReponseLink.linkPath, ZonedDateTime.now().toString()));
                    TRACE_LOGGER.error("RequestResponseLink open timed out.", (Throwable)operationTimedout);
                    AsyncUtil.completeFutureExceptionally(requestReponseLink.createFuture, operationTimedout);
                }
            }
        }, messagingFactory.getOperationTimeout(), TimerType.OneTimeRun);
        requestReponseLink.sendTokenAndSetRenewTimer(false).handleAsync((v, sasTokenEx) -> {
            if (sasTokenEx != null) {
                Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sasTokenEx);
                TRACE_LOGGER.error("Sending SAS Token failed. RequestResponseLink path:{}", (Object)requestReponseLink.linkPath, (Object)cause);
                requestReponseLink.createFuture.completeExceptionally(cause);
            } else {
                try {
                    messagingFactory.scheduleOnReactorThread(new DispatchHandler(){

                        @Override
                        public void onEvent() {
                            requestReponseLink.createInternalLinks();
                        }
                    });
                }
                catch (IOException ioException) {
                    requestReponseLink.cancelSASTokenRenewTimer();
                    requestReponseLink.createFuture.completeExceptionally(new ServiceBusException(false, "Failed to create internal links, see cause for more details.", ioException));
                }
            }
            return null;
        }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
        CompletableFuture.allOf(requestReponseLink.amqpSender.openFuture, requestReponseLink.amqpReceiver.openFuture).handleAsync((v, ex) -> {
            if (ex == null) {
                TRACE_LOGGER.info("Opened requestresponselink to {}", (Object)requestReponseLink.linkPath);
                requestReponseLink.createFuture.complete(requestReponseLink);
            } else {
                requestReponseLink.cancelSASTokenRenewTimer();
                requestReponseLink.createFuture.completeExceptionally((Throwable)ex);
            }
            return null;
        }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
        return requestReponseLink.createFuture;
    }

    public static String getManagementNodeLinkPath(String entityPath) {
        return String.format("%s/%s", entityPath, "$management");
    }

    public static String getCBSNodeLinkPath() {
        return "$cbs";
    }

    private RequestResponseLink(MessagingFactory messagingFactory, String linkName, String linkPath, String sasTokenAudienceURI, MessagingEntityType entityType) {
        super(linkName);
        this.underlyingFactory = messagingFactory;
        this.linkPath = linkPath;
        this.sasTokenAudienceURI = sasTokenAudienceURI;
        this.amqpSender = new InternalSender(linkName + ":internalSender", this, null);
        this.amqpReceiver = new InternalReceiver(linkName + ":interalReceiver", this);
        this.pendingRequests = new ConcurrentHashMap();
        this.requestCounter = new AtomicInteger();
        this.replyTo = UUID.randomUUID().toString();
        this.createFuture = new CompletableFuture();
        this.entityType = entityType;
        this.isInnerLinksCloseHandled = false;
    }

    public String getLinkPath() {
        return this.linkPath;
    }

    private CompletableFuture<Void> sendTokenAndSetRenewTimer(boolean retryOnFailure) {
        if (this.getIsClosingOrClosed() || this.sasTokenAudienceURI == null) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<ScheduledFuture<?>> sendTokenFuture = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendTokenAndSetRenewTimer(true));
        return sendTokenFuture.thenAccept(f -> {
            this.sasTokenRenewTimerFuture = f;
        });
    }

    private void onInnerLinksClosed(int linkGeneration, Exception exception) {
        if (this.internalLinkGeneration == linkGeneration && !this.isInnerLinksCloseHandled) {
            this.isInnerLinksCloseHandled = true;
            this.cancelSASTokenRenewTimer();
            if (this.pendingRequests.size() > 0) {
                if (exception != null && exception instanceof ServiceBusException && ((ServiceBusException)exception).getIsTransient()) {
                    Duration nextRetryInterval = this.underlyingFactory.getRetryPolicy().getNextRetryInterval(this.getClientId(), exception, this.underlyingFactory.getOperationTimeout());
                    if (nextRetryInterval != null) {
                        Timer.schedule(() -> this.ensureUniqueLinkRecreation(), nextRetryInterval, TimerType.OneTimeRun);
                    } else {
                        this.completeAllPendingRequestsWithException(exception);
                    }
                } else {
                    this.completeAllPendingRequestsWithException(exception);
                }
            }
        }
    }

    private void cancelSASTokenRenewTimer() {
        if (this.sasTokenRenewTimerFuture != null && !this.sasTokenRenewTimerFuture.isDone()) {
            TRACE_LOGGER.debug("Cancelling SAS Token renew timer");
            this.sasTokenRenewTimerFuture.cancel(true);
        }
    }

    private void createInternalLinks() {
        this.isInnerLinksCloseHandled = false;
        HashMap<Symbol, Object> commonLinkProperties = new HashMap<Symbol, Object>();
        commonLinkProperties.put(ClientConstants.LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf((long)Util.adjustServerTimeout(this.underlyingFactory.getOperationTimeout()).toMillis()));
        if (this.entityType != null) {
            commonLinkProperties.put(ClientConstants.ENTITY_TYPE_PROPERTY, this.entityType.getIntValue());
        }
        Connection connection = this.underlyingFactory.getConnection();
        Session session = connection.session();
        session.setOutgoingWindow(Integer.MAX_VALUE);
        session.open();
        BaseHandler.setHandler((Extendable)session, (Handler)new SessionHandler(this.linkPath));
        String sendLinkNamePrefix = "RequestResponseLink-Sender".concat("_").concat(StringUtil.getShortRandomString());
        String sendLinkName = !StringUtil.isNullOrEmpty(connection.getRemoteContainer()) ? sendLinkNamePrefix.concat("_").concat(connection.getRemoteContainer()) : sendLinkNamePrefix;
        Sender sender = session.sender(sendLinkName);
        Target sednerTarget = new Target();
        sednerTarget.setAddress(this.linkPath);
        sender.setTarget((org.apache.qpid.proton.amqp.transport.Target)sednerTarget);
        Source senderSource = new Source();
        senderSource.setAddress(this.replyTo);
        sender.setSource((org.apache.qpid.proton.amqp.transport.Source)senderSource);
        sender.setSenderSettleMode(SenderSettleMode.SETTLED);
        sender.setProperties(commonLinkProperties);
        SendLinkHandler sendLinkHandler = new SendLinkHandler(this.amqpSender);
        BaseHandler.setHandler((Extendable)sender, (Handler)sendLinkHandler);
        session = connection.session();
        session.setOutgoingWindow(Integer.MAX_VALUE);
        session.open();
        BaseHandler.setHandler((Extendable)session, (Handler)new SessionHandler(this.linkPath));
        String receiveLinkNamePrefix = "RequestResponseLink-Receiver".concat("_").concat(StringUtil.getShortRandomString());
        String receiveLinkName = !StringUtil.isNullOrEmpty(connection.getRemoteContainer()) ? receiveLinkNamePrefix.concat("_").concat(connection.getRemoteContainer()) : receiveLinkNamePrefix;
        Receiver receiver = session.receiver(receiveLinkName);
        Source receiverSource = new Source();
        receiverSource.setAddress(this.linkPath);
        receiver.setSource((org.apache.qpid.proton.amqp.transport.Source)receiverSource);
        Target receiverTarget = new Target();
        receiverTarget.setAddress(this.replyTo);
        receiver.setTarget((org.apache.qpid.proton.amqp.transport.Target)receiverTarget);
        receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
        receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        receiver.setProperties(commonLinkProperties);
        ReceiveLinkHandler receiveLinkHandler = new ReceiveLinkHandler(this.amqpReceiver);
        BaseHandler.setHandler((Extendable)receiver, (Handler)receiveLinkHandler);
        this.amqpSender.setLinks(sender, receiver);
        this.amqpReceiver.setLinks(sender, receiver);
        TRACE_LOGGER.debug("RequestReponseLink - opening send link to {}", (Object)this.linkPath);
        sender.open();
        this.underlyingFactory.registerForConnectionError((Link)sender);
        TRACE_LOGGER.debug("RequestReponseLink - opening receive link to {}", (Object)this.linkPath);
        receiver.open();
        this.underlyingFactory.registerForConnectionError((Link)receiver);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void ensureUniqueLinkRecreation() {
        Object object = this.recreateLinksLock;
        synchronized (object) {
            if (!this.isRecreateLinksInProgress) {
                this.isRecreateLinksInProgress = true;
                this.recreateInternalLinks().handleAsync((v, recreationEx) -> {
                    if (recreationEx != null) {
                        TRACE_LOGGER.warn("Recreating internal links of reqestresponselink '{}' failed.", (Object)this.linkPath, (Object)ExceptionUtil.extractAsyncCompletionCause(recreationEx));
                    }
                    Object object = this.recreateLinksLock;
                    synchronized (object) {
                        this.isRecreateLinksInProgress = false;
                    }
                    return null;
                }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
            }
        }
    }

    private CompletableFuture<Void> recreateInternalLinks() {
        TRACE_LOGGER.info("RequestResponseLink - recreating internal send and receive links to {}", (Object)this.linkPath);
        this.amqpSender.closeInternals(false);
        this.amqpReceiver.closeInternals(false);
        this.cancelSASTokenRenewTimer();
        ++this.internalLinkGeneration;
        this.amqpSender = new InternalSender(this.getClientId() + ":internalSender", this, this.amqpSender);
        this.amqpReceiver = new InternalReceiver(this.getClientId() + ":interalReceiver", this);
        final CompletableFuture<Void> recreateInternalLinksFuture = new CompletableFuture<Void>();
        this.sendTokenAndSetRenewTimer(false).handleAsync((v, sasTokenEx) -> {
            if (sasTokenEx != null) {
                Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sasTokenEx);
                TRACE_LOGGER.error("Sending SAS Token failed. RequestResponseLink path:{}", (Object)this.linkPath, (Object)cause);
                recreateInternalLinksFuture.completeExceptionally(cause);
            } else {
                try {
                    this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler(){

                        @Override
                        public void onEvent() {
                            RequestResponseLink.this.createInternalLinks();
                        }
                    });
                }
                catch (IOException ioException) {
                    this.cancelSASTokenRenewTimer();
                    recreateInternalLinksFuture.completeExceptionally(new ServiceBusException(false, "Failed to create internal links, see cause for more details.", ioException));
                }
            }
            return null;
        }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
        CompletableFuture.allOf(this.amqpSender.openFuture, this.amqpReceiver.openFuture).handleAsync((v, ex) -> {
            if (ex == null) {
                TRACE_LOGGER.info("Recreated internal links to {}", (Object)this.linkPath);
                this.underlyingFactory.getRetryPolicy().resetRetryCount(this.getClientId());
                recreateInternalLinksFuture.complete(null);
            } else {
                this.cancelSASTokenRenewTimer();
                recreateInternalLinksFuture.completeExceptionally((Throwable)ex);
            }
            return null;
        }, (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
        Timer.schedule(new Runnable(){

            @Override
            public void run() {
                if (!recreateInternalLinksFuture.isDone()) {
                    TimeoutException operationTimedout = new TimeoutException(String.format(Locale.US, "Recreating internal links of requestresponselink to %s timed out.", RequestResponseLink.this.linkPath));
                    TRACE_LOGGER.warn("Recreating internal links of requestresponselink timed out.", (Throwable)operationTimedout);
                    RequestResponseLink.this.cancelSASTokenRenewTimer();
                    AsyncUtil.completeFutureExceptionally(recreateInternalLinksFuture, operationTimedout);
                }
            }
        }, this.underlyingFactory.getOperationTimeout(), TimerType.OneTimeRun);
        return recreateInternalLinksFuture;
    }

    private void completeAllPendingRequestsWithException(Exception exception) {
        TRACE_LOGGER.warn("Completing all pending requests with exception in request response link to {}", (Object)this.linkPath);
        for (RequestResponseWorkItem workItem : this.pendingRequests.values()) {
            AsyncUtil.completeFutureExceptionally(workItem.getWork(), exception);
            workItem.cancelTimeoutTask(true);
        }
        this.pendingRequests.clear();
    }

    public CompletableFuture<Message> requestAysnc(Message requestMessage, Duration timeout) {
        this.throwIfClosed(null);
        CompletableFuture<Message> responseFuture = new CompletableFuture<Message>();
        RequestResponseWorkItem workItem = new RequestResponseWorkItem(requestMessage, responseFuture, timeout);
        String requestId = "request:" + this.requestCounter.incrementAndGet();
        requestMessage.setMessageId((Object)requestId);
        requestMessage.setReplyTo(this.replyTo);
        this.pendingRequests.put(requestId, workItem);
        workItem.setTimeoutTask((ScheduledFuture)this.scheduleRequestTimeout(requestId, timeout));
        TRACE_LOGGER.debug("Sending request with id:{}", (Object)requestId);
        this.amqpSender.sendRequest(requestId, false);
        if (this.amqpSender.sendLink.getLocalState() != EndpointState.ACTIVE || this.amqpSender.sendLink.getRemoteState() != EndpointState.ACTIVE || this.amqpReceiver.receiveLink.getLocalState() != EndpointState.ACTIVE || this.amqpReceiver.receiveLink.getRemoteState() != EndpointState.ACTIVE) {
            this.ensureUniqueLinkRecreation();
        }
        return responseFuture;
    }

    private ScheduledFuture<?> scheduleRequestTimeout(final String requestId, Duration timeout) {
        return Timer.schedule(new Runnable(){

            @Override
            public void run() {
                TRACE_LOGGER.warn("Request with id:{} timed out", (Object)requestId);
                RequestResponseWorkItem completedWorkItem = RequestResponseLink.this.exceptionallyCompleteRequest(requestId, new TimeoutException("Request timed out."), true);
                boolean isRetriedWorkItem = completedWorkItem.getLastKnownException() != null;
                RequestResponseLink.this.amqpSender.removeEnqueuedRequest(requestId, isRetriedWorkItem);
            }
        }, timeout, TimerType.OneTimeRun);
    }

    private RequestResponseWorkItem exceptionallyCompleteRequest(String requestId, Exception exception, boolean useLastKnownException) {
        RequestResponseWorkItem workItem = this.pendingRequests.remove(requestId);
        if (workItem != null) {
            Exception exceptionToReport = exception;
            if (useLastKnownException && workItem.getLastKnownException() != null) {
                exceptionToReport = workItem.getLastKnownException();
            }
            workItem.getWork().completeExceptionally(exceptionToReport);
            AsyncUtil.completeFutureExceptionally(workItem.getWork(), exceptionToReport);
            workItem.cancelTimeoutTask(true);
        }
        return workItem;
    }

    private RequestResponseWorkItem completeRequestWithResponse(final String requestId, Message responseMessage) {
        RequestResponseWorkItem workItem = this.pendingRequests.get(requestId);
        if (workItem != null) {
            int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
            TRACE_LOGGER.debug("Response for request with id:{} has status code:{}", (Object)requestId, (Object)statusCode);
            if (statusCode == 503) {
                TRACE_LOGGER.warn("Request with id:{} received ServerBusy response from '{}'", (Object)requestId, (Object)this.linkPath);
                Exception responseException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage);
                this.underlyingFactory.getRetryPolicy().incrementRetryCount(this.getClientId());
                Duration retryInterval = this.underlyingFactory.getRetryPolicy().getNextRetryInterval(this.getClientId(), responseException, workItem.getTimeoutTracker().remaining());
                if (retryInterval == null) {
                    TRACE_LOGGER.error("Request with id:{} cannot be retried. So completing with excetion.", (Object)requestId, (Object)responseException);
                    this.exceptionallyCompleteRequest(requestId, responseException, false);
                } else {
                    TRACE_LOGGER.info("Request with id:{} will be retried after {}.", (Object)requestId, (Object)retryInterval);
                    workItem.setLastKnownException(responseException);
                    try {
                        this.underlyingFactory.scheduleOnReactorThread((int)retryInterval.toMillis(), new DispatchHandler(){

                            @Override
                            public void onEvent() {
                                RequestResponseLink.this.amqpSender.sendRequest(requestId, true);
                            }
                        });
                    }
                    catch (IOException e) {
                        this.exceptionallyCompleteRequest(requestId, responseException, false);
                    }
                }
            } else {
                TRACE_LOGGER.debug("Completing request with id:{}", (Object)requestId);
                this.underlyingFactory.getRetryPolicy().resetRetryCount(this.getClientId());
                this.pendingRequests.remove(requestId);
                workItem.getWork().complete(responseMessage);
                workItem.cancelTimeoutTask(true);
            }
        } else {
            TRACE_LOGGER.warn("Request with id:{} not found in the requestresponse link.", (Object)requestId);
        }
        return workItem;
    }

    @Override
    protected CompletableFuture<Void> onClose() {
        TRACE_LOGGER.info("Closing requestresponselink to {} by closing both internal sender and receiver links.", (Object)this.linkPath);
        this.cancelSASTokenRenewTimer();
        return this.amqpSender.closeAsync().thenComposeAsync(v -> this.amqpReceiver.closeAsync(), (Executor)MessagingFactory.INTERNAL_THREAD_POOL);
    }

    private static void scheduleLinkCloseTimeout(final CompletableFuture<Void> closeFuture, Duration timeout, final String linkName) {
        Timer.schedule(new Runnable(){

            @Override
            public void run() {
                if (!closeFuture.isDone()) {
                    TimeoutException operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Link(%s) timed out at %s", "Close", linkName, ZonedDateTime.now()));
                    TRACE_LOGGER.warn("Closing link timed out", (Throwable)operationTimedout);
                    AsyncUtil.completeFutureExceptionally(closeFuture, operationTimedout);
                }
            }
        }, timeout, TimerType.OneTimeRun);
    }

    private class InternalSender
    extends ClientEntity
    implements IAmqpSender {
        private Sender sendLink;
        private Receiver matchingReceiveLink;
        private RequestResponseLink parent;
        private CompletableFuture<Void> openFuture;
        private CompletableFuture<Void> closeFuture;
        private AtomicInteger availableCredit;
        private LinkedList<String> pendingFreshSends;
        private LinkedList<String> pendingRetrySends;
        private Object pendingSendsSyncLock;
        private boolean isSendLoopRunning;
        private int maxMessageSize;
        private int linkGeneration;

        protected InternalSender(String clientId, RequestResponseLink parent, InternalSender senderToBeCopied) {
            super(clientId);
            this.parent = parent;
            this.linkGeneration = parent.internalLinkGeneration;
            this.availableCredit = new AtomicInteger(0);
            this.pendingSendsSyncLock = new Object();
            this.isSendLoopRunning = false;
            this.openFuture = new CompletableFuture();
            this.closeFuture = new CompletableFuture();
            if (senderToBeCopied == null) {
                this.pendingFreshSends = new LinkedList();
                this.pendingRetrySends = new LinkedList();
            } else {
                this.pendingFreshSends = senderToBeCopied.pendingFreshSends;
                this.pendingRetrySends = senderToBeCopied.pendingRetrySends;
            }
        }

        @Override
        protected CompletableFuture<Void> onClose() {
            this.closeInternals(true);
            return this.closeFuture;
        }

        void closeInternals(final boolean waitForCloseCompletion) {
            if (!this.getIsClosed()) {
                if (this.sendLink != null && this.sendLink.getLocalState() != EndpointState.CLOSED) {
                    try {
                        this.parent.underlyingFactory.scheduleOnReactorThread(new DispatchHandler(){

                            @Override
                            public void onEvent() {
                                if (InternalSender.this.sendLink != null && InternalSender.this.sendLink.getLocalState() != EndpointState.CLOSED) {
                                    TRACE_LOGGER.debug("Closing internal send link of requestresponselink to {}", (Object)RequestResponseLink.this.linkPath);
                                    InternalSender.this.sendLink.close();
                                    InternalSender.this.parent.underlyingFactory.deregisterForConnectionError((Link)InternalSender.this.sendLink);
                                    if (waitForCloseCompletion) {
                                        RequestResponseLink.scheduleLinkCloseTimeout(InternalSender.this.closeFuture, InternalSender.this.parent.underlyingFactory.getOperationTimeout(), InternalSender.this.sendLink.getName());
                                    } else {
                                        AsyncUtil.completeFuture(InternalSender.this.closeFuture, null);
                                    }
                                }
                            }
                        });
                    }
                    catch (IOException e) {
                        AsyncUtil.completeFutureExceptionally(this.closeFuture, e);
                    }
                } else {
                    AsyncUtil.completeFuture(this.closeFuture, null);
                }
            }
        }

        @Override
        public void onOpenComplete(Exception completionException) {
            if (completionException == null) {
                TRACE_LOGGER.debug("Opened internal send link of requestresponselink to {}", (Object)this.parent.linkPath);
                this.maxMessageSize = Util.getMaxMessageSizeFromLink((Link)this.sendLink);
                AsyncUtil.completeFuture(this.openFuture, null);
                this.runSendLoop();
            } else {
                TRACE_LOGGER.error("Opening internal send link '{}' of requestresponselink to {} failed.", new Object[]{this.sendLink.getName(), this.parent.linkPath, completionException});
                this.setClosed();
                AsyncUtil.completeFuture(this.closeFuture, null);
                AsyncUtil.completeFutureExceptionally(this.openFuture, completionException);
            }
        }

        @Override
        public void onError(Exception exception) {
            if (!this.openFuture.isDone()) {
                this.onOpenComplete(exception);
            }
            if (this.getIsClosingOrClosed()) {
                if (!this.closeFuture.isDone()) {
                    TRACE_LOGGER.error("Closing internal send link '{}' of requestresponselink to {} failed.", new Object[]{this.sendLink.getName(), this.parent.linkPath, exception});
                    AsyncUtil.completeFutureExceptionally(this.closeFuture, exception);
                }
            } else {
                TRACE_LOGGER.warn("Internal send link '{}' of requestresponselink to '{}' encountered error.", new Object[]{this.sendLink.getName(), this.parent.linkPath, exception});
                this.parent.underlyingFactory.deregisterForConnectionError((Link)this.sendLink);
                this.matchingReceiveLink.close();
                this.parent.underlyingFactory.deregisterForConnectionError((Link)this.matchingReceiveLink);
                this.parent.onInnerLinksClosed(this.linkGeneration, exception);
            }
        }

        @Override
        public void onClose(ErrorCondition condition) {
            if (condition == null || condition.getCondition() == null) {
                if (!this.closeFuture.isDone() && !this.closeFuture.isDone()) {
                    TRACE_LOGGER.info("Closed internal send link of requestresponselink to {}", (Object)this.parent.linkPath);
                    AsyncUtil.completeFuture(this.closeFuture, null);
                }
            } else {
                Exception exception = ExceptionUtil.toException(condition);
                this.onError(exception);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void sendRequest(String requestId, boolean isRetry) {
            Object object = this.pendingSendsSyncLock;
            synchronized (object) {
                if (isRetry) {
                    this.pendingRetrySends.add(requestId);
                } else {
                    this.pendingFreshSends.add(requestId);
                }
                if (this.isSendLoopRunning) {
                    return;
                }
            }
            try {
                this.parent.underlyingFactory.scheduleOnReactorThread(new DispatchHandler(){

                    @Override
                    public void onEvent() {
                        InternalSender.this.runSendLoop();
                    }
                });
            }
            catch (IOException e) {
                this.parent.exceptionallyCompleteRequest(requestId, e, true);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void removeEnqueuedRequest(String requestId, boolean isRetry) {
            Object object = this.pendingSendsSyncLock;
            synchronized (object) {
                if (isRetry) {
                    this.pendingRetrySends.remove(requestId);
                } else {
                    this.pendingFreshSends.remove(requestId);
                }
            }
        }

        @Override
        public void onFlow(int creditIssued) {
            TRACE_LOGGER.debug("RequestResonseLink {} internal sender received credit :{}", (Object)this.parent.linkPath, (Object)creditIssued);
            this.availableCredit.addAndGet(creditIssued);
            TRACE_LOGGER.debug("RequestResonseLink {} internal sender available credit :{}", (Object)this.parent.linkPath, (Object)this.availableCredit.get());
            this.runSendLoop();
        }

        @Override
        public void onSendComplete(Delivery delivery) {
        }

        public void setLinks(Sender sendLink, Receiver receiveLink) {
            this.sendLink = sendLink;
            this.matchingReceiveLink = receiveLink;
            this.availableCredit = new AtomicInteger(0);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        private void runSendLoop() {
            Object object = this.pendingSendsSyncLock;
            synchronized (object) {
                if (this.isSendLoopRunning) {
                    return;
                }
                this.isSendLoopRunning = true;
            }
            TRACE_LOGGER.debug("Starting requestResponseLink {} internal sender send loop", (Object)this.parent.linkPath);
            try {
                while (this.sendLink != null) {
                    if (this.sendLink.getLocalState() != EndpointState.ACTIVE) return;
                    if (this.sendLink.getRemoteState() != EndpointState.ACTIVE) return;
                    if (this.availableCredit.get() <= 0) return;
                    String requestIdToBeSent = null;
                    Object object2 = this.pendingSendsSyncLock;
                    synchronized (object2) {
                        requestIdToBeSent = this.pendingRetrySends.poll();
                        if (requestIdToBeSent == null && (requestIdToBeSent = this.pendingFreshSends.poll()) == null) {
                            this.isSendLoopRunning = false;
                            TRACE_LOGGER.debug("RequestResponseLink {} internal sender send loop ending as there are no more requests enqueued.", (Object)this.parent.linkPath);
                            return;
                        }
                    }
                    RequestResponseWorkItem requestToBeSent = (RequestResponseWorkItem)this.parent.pendingRequests.get(requestIdToBeSent);
                    if (requestToBeSent != null) {
                        Delivery delivery = this.sendLink.delivery(UUID.randomUUID().toString().getBytes());
                        delivery.setMessageFormat(0);
                        Pair<byte[], Integer> encodedPair = null;
                        try {
                            encodedPair = Util.encodeMessageToOptimalSizeArray(requestToBeSent.getRequest(), this.maxMessageSize);
                        }
                        catch (PayloadSizeExceededException exception) {
                            this.parent.exceptionallyCompleteRequest(requestIdToBeSent, new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s kb", this.maxMessageSize / 1024), (Throwable)exception), false);
                        }
                        try {
                            int sentMsgSize = this.sendLink.send(encodedPair.getFirstItem(), 0, encodedPair.getSecondItem().intValue());
                            assert (sentMsgSize == encodedPair.getSecondItem()) : "Contract of the ProtonJ library for Sender.Send API changed";
                            delivery.settle();
                            this.availableCredit.decrementAndGet();
                            TRACE_LOGGER.debug("RequestResonseLink {} internal sender sent a request. available credit :{}", (Object)this.parent.linkPath, (Object)this.availableCredit.get());
                            continue;
                        }
                        catch (Exception e) {
                            TRACE_LOGGER.error("RequestResonseLink {} failed to send request with request id:{}.", new Object[]{this.parent.linkPath, requestIdToBeSent, e});
                            this.parent.exceptionallyCompleteRequest(requestIdToBeSent, e, false);
                            continue;
                        }
                    }
                    TRACE_LOGGER.warn("Request with id:{} not found in the requestresponse link.", (Object)requestIdToBeSent);
                }
                return;
            }
            finally {
                object = this.pendingSendsSyncLock;
                synchronized (object) {
                    if (this.isSendLoopRunning) {
                        this.isSendLoopRunning = false;
                    }
                }
                TRACE_LOGGER.debug("RequestResponseLink {} internal sender send loop stopped.", (Object)this.parent.linkPath);
            }
        }
    }

    private class InternalReceiver
    extends ClientEntity
    implements IAmqpReceiver {
        private RequestResponseLink parent;
        private Receiver receiveLink;
        private Sender matchingSendLink;
        private CompletableFuture<Void> openFuture;
        private CompletableFuture<Void> closeFuture;
        private int linkGeneration;

        protected InternalReceiver(String clientId, RequestResponseLink parent) {
            super(clientId);
            this.parent = parent;
            this.linkGeneration = parent.internalLinkGeneration;
            this.openFuture = new CompletableFuture();
            this.closeFuture = new CompletableFuture();
        }

        @Override
        protected CompletableFuture<Void> onClose() {
            this.closeInternals(true);
            return this.closeFuture;
        }

        void closeInternals(final boolean waitForCloseCompletion) {
            if (!this.getIsClosed()) {
                if (this.receiveLink != null && this.receiveLink.getLocalState() != EndpointState.CLOSED) {
                    try {
                        this.parent.underlyingFactory.scheduleOnReactorThread(new DispatchHandler(){

                            @Override
                            public void onEvent() {
                                if (InternalReceiver.this.receiveLink != null && InternalReceiver.this.receiveLink.getLocalState() != EndpointState.CLOSED) {
                                    TRACE_LOGGER.debug("Closing internal receive link of requestresponselink to {}", (Object)RequestResponseLink.this.linkPath);
                                    InternalReceiver.this.receiveLink.close();
                                    InternalReceiver.this.parent.underlyingFactory.deregisterForConnectionError((Link)InternalReceiver.this.receiveLink);
                                    if (waitForCloseCompletion) {
                                        RequestResponseLink.scheduleLinkCloseTimeout(InternalReceiver.this.closeFuture, InternalReceiver.this.parent.underlyingFactory.getOperationTimeout(), InternalReceiver.this.receiveLink.getName());
                                    } else {
                                        AsyncUtil.completeFuture(InternalReceiver.this.closeFuture, null);
                                    }
                                }
                            }
                        });
                    }
                    catch (IOException e) {
                        AsyncUtil.completeFutureExceptionally(this.closeFuture, e);
                    }
                } else {
                    AsyncUtil.completeFuture(this.closeFuture, null);
                }
            }
        }

        @Override
        public void onOpenComplete(Exception completionException) {
            if (completionException == null) {
                TRACE_LOGGER.debug("Opened internal receive link of requestresponselink to {}", (Object)this.parent.linkPath);
                AsyncUtil.completeFuture(this.openFuture, null);
                this.receiveLink.flow(Integer.MAX_VALUE);
            } else {
                TRACE_LOGGER.error("Opening internal receive link '{}' of requestresponselink to {} failed.", new Object[]{this.receiveLink.getName(), this.parent.linkPath, completionException});
                this.setClosed();
                AsyncUtil.completeFuture(this.closeFuture, null);
                AsyncUtil.completeFutureExceptionally(this.openFuture, completionException);
            }
        }

        @Override
        public void onError(Exception exception) {
            if (!this.openFuture.isDone()) {
                this.onOpenComplete(exception);
            }
            if (this.getIsClosingOrClosed()) {
                if (!this.closeFuture.isDone()) {
                    TRACE_LOGGER.error("Closing internal receive link '{}' of requestresponselink to {} failed.", new Object[]{this.receiveLink.getName(), this.parent.linkPath, exception});
                    AsyncUtil.completeFutureExceptionally(this.closeFuture, exception);
                }
            } else {
                TRACE_LOGGER.warn("Internal receive link '{}' of requestresponselink to '{}' encountered error.", new Object[]{this.receiveLink.getName(), this.parent.linkPath, exception});
                this.parent.underlyingFactory.deregisterForConnectionError((Link)this.receiveLink);
                this.matchingSendLink.close();
                this.parent.underlyingFactory.deregisterForConnectionError((Link)this.matchingSendLink);
                this.parent.onInnerLinksClosed(this.linkGeneration, exception);
            }
        }

        @Override
        public void onClose(ErrorCondition condition) {
            if (condition == null || condition.getCondition() == null) {
                if (this.getIsClosingOrClosed() && !this.closeFuture.isDone()) {
                    TRACE_LOGGER.info("Closed internal receive link of requestresponselink to {}", (Object)this.parent.linkPath);
                    AsyncUtil.completeFuture(this.closeFuture, null);
                }
            } else {
                Exception exception = ExceptionUtil.toException(condition);
                this.onError(exception);
            }
        }

        @Override
        public void onReceiveComplete(Delivery delivery) {
            Message responseMessage = null;
            try {
                responseMessage = Util.readMessageFromDelivery(this.receiveLink, delivery);
                delivery.disposition((DeliveryState)Accepted.getInstance());
                delivery.settle();
            }
            catch (Exception e) {
                TRACE_LOGGER.warn("Reading message from delivery failed with unexpected exception.", (Throwable)e);
                delivery.disposition((DeliveryState)Released.getInstance());
                delivery.settle();
                return;
            }
            Message finalResponseMessage = responseMessage;
            MessagingFactory.INTERNAL_THREAD_POOL.submit(() -> {
                String requestMessageId = (String)finalResponseMessage.getCorrelationId();
                if (requestMessageId != null) {
                    TRACE_LOGGER.debug("RequestRespnseLink received response for request with id :{}", (Object)requestMessageId);
                    this.parent.completeRequestWithResponse(requestMessageId, finalResponseMessage);
                } else {
                    TRACE_LOGGER.warn("RequestRespnseLink received a message with null correlationId");
                }
            });
        }

        public void setLinks(Sender sendLink, Receiver receiveLink) {
            this.receiveLink = receiveLink;
            this.matchingSendLink = sendLink;
        }
    }
}

