/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.protocol.amqp.connect.bridge;

import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeFromPolicyManager;
import org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeManagementSupport;
import org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeMetrics;
import org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeQueuePolicy;
import org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeReceiver;
import org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeReceiverConfiguration;
import org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeReceiverInfo;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpJmsSelectorFilter;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQPBridgeFromQueueReceiver
extends AMQPBridgeReceiver {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    public static final int DEFAULT_PENDING_MSG_CHECK_BACKOFF_MULTIPLIER = 2;
    public static final int DEFAULT_PENDING_MSG_CHECK_MAX_DELAY = 30;

    public AMQPBridgeFromQueueReceiver(AMQPBridgeFromPolicyManager policyManager, AMQPBridgeReceiverConfiguration configuration, AMQPSessionContext session, AMQPBridgeReceiverInfo receiverInfo, AMQPBridgeQueuePolicy policy, AMQPBridgeMetrics.ReceiverMetrics metrics) {
        super(policyManager, configuration, session, receiverInfo, policy, metrics);
    }

    @Override
    public int getReceiverIdleTimeout() {
        return this.configuration.getQueueReceiverIdleTimeout();
    }

    @Override
    protected void doCreateReceiver() {
        try {
            HashMap<Symbol, Integer> receiverProperties;
            Receiver protonReceiver = this.session.getSession().receiver(this.generateLinkName());
            Target target = new Target();
            Source source = new Source();
            String address = this.receiverInfo.getRemoteAddress();
            String filterString = this.receiverInfo.getFilterString();
            Queue localQueue = this.bridgeManager.getServer().locateQueue(this.receiverInfo.getLocalQueue());
            source.setOutcomes(Arrays.copyOf(OUTCOMES, OUTCOMES.length));
            source.setDefaultOutcome((Outcome)DEFAULT_OUTCOME);
            source.setDurable(TerminusDurability.NONE);
            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
            source.setAddress(address);
            source.setCapabilities(this.getRemoteTerminusCapabilities());
            if (filterString != null && !filterString.isBlank()) {
                AmqpJmsSelectorFilter jmsFilter = new AmqpJmsSelectorFilter(filterString);
                HashMap<Symbol, AmqpJmsSelectorFilter> filtersMap = new HashMap<Symbol, AmqpJmsSelectorFilter>();
                filtersMap.put(AmqpSupport.JMS_SELECTOR_KEY, jmsFilter);
                source.setFilter(filtersMap);
            }
            target.setAddress(this.receiverInfo.getLocalFqqn());
            if (this.receiverInfo.getPriority() != null) {
                receiverProperties = new HashMap<Symbol, Integer>();
                receiverProperties.put(AmqpSupport.RECEIVER_PRIORITY, this.receiverInfo.getPriority());
            } else {
                receiverProperties = null;
            }
            protonReceiver.setSenderSettleMode(this.configuration.isUsingPresettledSenders() ? SenderSettleMode.SETTLED : SenderSettleMode.UNSETTLED);
            protonReceiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
            if (this.configuration.isCoreMessageTunnelingEnabled()) {
                protonReceiver.setOfferedCapabilities(new Symbol[]{AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT});
            }
            protonReceiver.setProperties(receiverProperties);
            protonReceiver.setTarget((org.apache.qpid.proton.amqp.transport.Target)target);
            protonReceiver.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
            protonReceiver.open();
            AtomicBoolean openTimedOut = new AtomicBoolean(false);
            ScheduledFuture<?> openTimeoutTask = this.configuration.getLinkAttachTimeout() > 0 ? this.bridgeManager.getServer().getScheduledPool().schedule(() -> {
                openTimedOut.set(true);
                this.bridgeManager.signalResourceCreateError((Exception)((Object)ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout()));
            }, (long)this.configuration.getLinkAttachTimeout(), TimeUnit.SECONDS) : null;
            this.protonReceiver = protonReceiver;
            protonReceiver.attachments().set(AmqpSupport.AMQP_LINK_INITIALIZER_KEY, Runnable.class, () -> {
                try {
                    if (openTimeoutTask != null) {
                        openTimeoutTask.cancel(false);
                    }
                    if (openTimedOut.get()) {
                        return;
                    }
                    boolean linkOpened = protonReceiver.getRemoteSource() != null;
                    this.bridgeManager.addLinkClosedInterceptor(this.receiverInfo.getId(), this::remoteLinkClosedInterceptor);
                    this.receiver = new AMQPBridgeQueueDeliveryReceiver(localQueue, this.receiverInfo, protonReceiver);
                    if (linkOpened) {
                        logger.debug("AMQP Bridge {} queue receiver {} completed open", (Object)this.bridgeManager.getName(), (Object)this.receiverInfo);
                    } else {
                        logger.debug("AMQP Bridge {} queue receiver {} rejected by remote", (Object)this.bridgeManager.getName(), (Object)this.receiverInfo);
                    }
                    this.session.addReceiver(protonReceiver, (session, protonRcvr) -> this.receiver);
                    if (linkOpened && this.remoteOpenHandler != null) {
                        this.remoteOpenHandler.accept(this);
                    }
                }
                catch (Exception e) {
                    this.bridgeManager.signalError(e);
                }
            });
        }
        catch (Exception e) {
            this.bridgeManager.signalError(e);
        }
        this.connection.flush();
    }

    private String generateLinkName() {
        return "amqp-bridge-" + this.bridgeManager.getName() + "-policy-" + this.policy.getPolicyName() + "-queue-receiver-" + this.receiverInfo.getRemoteAddress() + "-" + String.valueOf(this.bridgeManager.getServer().getNodeID()) + ":" + LINK_SEQUENCE_ID.getAndIncrement();
    }

    private static int caclulateNextDelay(int lastDelay, int backoffMultiplier, int maxDelay) {
        int nextDelay = lastDelay == 0 ? 1 : Math.min(lastDelay * backoffMultiplier, maxDelay);
        return nextDelay;
    }

    private class AMQPBridgeQueueDeliveryReceiver
    extends ProtonServerReceiverContext {
        private final SimpleString cachedFqqn;
        private final Queue localQueue;
        private boolean closed;
        private final AtomicBoolean creditTopUpInProgress;
        private final Runnable checkForNoBacklogRunnable;
        private final Runnable performCreditTopUpRunnable;
        private int lastBacklogCheckDelay;

        AMQPBridgeQueueDeliveryReceiver(Queue localQueue, AMQPBridgeReceiverInfo receiverInfo, Receiver receiver) {
            super(AMQPBridgeFromQueueReceiver.this.session.getSessionSPI(), AMQPBridgeFromQueueReceiver.this.session.getAMQPConnectionContext(), AMQPBridgeFromQueueReceiver.this.session, receiver);
            this.creditTopUpInProgress = new AtomicBoolean();
            this.checkForNoBacklogRunnable = this::checkForNoBacklogOnQueue;
            this.performCreditTopUpRunnable = this::performCreditTopUp;
            this.cachedFqqn = SimpleString.of((String)receiverInfo.getLocalFqqn());
            this.localQueue = localQueue;
        }

        @Override
        protected boolean isUseModifiedForTransientDeliveryErrors(AMQPConnectionContext connection) {
            return AMQPBridgeFromQueueReceiver.this.configuration.isUseModifiedForTransientDeliveryErrors();
        }

        @Override
        protected boolean isDrainOnTransientDeliveryErrors(AMQPConnectionContext connection) {
            return AMQPBridgeFromQueueReceiver.this.configuration.isDrainOnTransientDeliveryErrors();
        }

        @Override
        protected int getLinkQuiesceTimeout(AMQPConnectionContext connection) {
            return AMQPBridgeFromQueueReceiver.this.configuration.getLinkQuiesceTimeout();
        }

        @Override
        public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
            super.close(remoteLinkClose);
            if (!this.closed) {
                this.closed = true;
                try {
                    AMQPBridgeManagementSupport.unregisterBridgeReceiver(AMQPBridgeFromQueueReceiver.this);
                }
                catch (Exception e) {
                    logger.debug("Error caught when trying to remove bridge queue receiver from management", (Throwable)e);
                }
                if (remoteLinkClose && AMQPBridgeFromQueueReceiver.this.remoteCloseHandler != null) {
                    try {
                        AMQPBridgeFromQueueReceiver.this.remoteCloseHandler.accept(AMQPBridgeFromQueueReceiver.this);
                    }
                    catch (Exception e) {
                        logger.debug("User remote closed handler threw error: ", (Throwable)e);
                    }
                    finally {
                        AMQPBridgeFromQueueReceiver.this.remoteCloseHandler = null;
                    }
                }
            }
        }

        @Override
        public void initialize() throws Exception {
            this.initialized = true;
            Target target = (Target)this.receiver.getRemoteTarget();
            this.receiver.setSenderSettleMode(this.receiver.getRemoteSenderSettleMode());
            this.receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
            if (target == null || target.getAddress() == null || target.getAddress().isEmpty()) {
                throw new ActiveMQAMQPInternalErrorException("Remote should have sent a valid Target but we got: " + String.valueOf(target));
            }
            if (!target.getAddress().equals(AMQPBridgeFromQueueReceiver.this.receiverInfo.getLocalFqqn())) {
                throw new ActiveMQAMQPInternalErrorException("Remote should have sent a matching Target FQQN but we got: " + target.getAddress());
            }
            this.address = SimpleString.of((String)AMQPBridgeFromQueueReceiver.this.receiverInfo.getLocalQueue());
            this.defRoutingType = AMQPBridgeFromQueueReceiver.this.receiverInfo.getRoutingType();
            try {
                QueueQueryResult result = this.sessionSPI.queueQuery(this.address, this.defRoutingType, false);
                if (!result.isExists()) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(this.address.toString());
                }
            }
            catch (ActiveMQAMQPNotFoundException e) {
                throw e;
            }
            catch (Exception e) {
                logger.debug(e.getMessage(), (Throwable)e);
                throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
            }
            if (AMQPBridgeFromQueueReceiver.this.configuration.isCoreMessageTunnelingEnabled() && AmqpSupport.verifyDesiredCapability((Link)this.receiver, AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT)) {
                this.enableCoreTunneling();
            }
            try {
                AMQPBridgeManagementSupport.registerBridgeReceiver(AMQPBridgeFromQueueReceiver.this);
            }
            catch (Exception e) {
                logger.debug("Error caught when trying to add bridge queue receiver to management", (Throwable)e);
            }
            this.topUpCreditIfNeeded();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnotations deliveryAnnotations, Receiver receiver, Transaction tx) {
            try {
                Message theMessage;
                if (logger.isTraceEnabled()) {
                    logger.trace("AMQP Bridge {} queue receiver {} dispatching incoming message: {}", new Object[]{AMQPBridgeFromQueueReceiver.this.bridgeManager.getName(), AMQPBridgeFromQueueReceiver.this.receiverInfo, message});
                }
                if ((theMessage = AMQPBridgeFromQueueReceiver.this.transformer.transform(message)) != message && logger.isTraceEnabled()) {
                    logger.trace("The transformer {} replaced the original message {} with a new instance {}", new Object[]{AMQPBridgeFromQueueReceiver.this.transformer, message, theMessage});
                }
                this.sessionSPI.serverSend(this, tx, receiver, delivery, this.cachedFqqn, this.routingContext, theMessage);
            }
            catch (Exception e) {
                logger.warn("Inbound delivery for {} encountered an error: {}", new Object[]{AMQPBridgeFromQueueReceiver.this.receiverInfo, e.getMessage(), e});
                this.deliveryFailed(delivery, receiver, e);
            }
            finally {
                AMQPBridgeFromQueueReceiver.this.recordMessageReceived(message);
            }
        }

        @Override
        protected Runnable createCreditRunnable(AMQPConnectionContext connection) {
            if (AMQPBridgeFromQueueReceiver.this.configuration.getReceiverCredits() > 0) {
                return AMQPBridgeQueueDeliveryReceiver.createCreditRunnable(AMQPBridgeFromQueueReceiver.this.configuration.getReceiverCredits(), AMQPBridgeFromQueueReceiver.this.configuration.getReceiverCreditsLow(), this.receiver, connection, this);
            }
            return this::checkIfCreditTopUpNeeded;
        }

        @Override
        protected int getConfiguredMinLargeMessageSize(AMQPConnectionContext connection) {
            return AMQPBridgeFromQueueReceiver.this.configuration.getLargeMessageThreshold();
        }

        private void checkIfCreditTopUpNeeded() {
            if (!this.connection.isHandler()) {
                this.connection.runLater(this.creditRunnable);
                return;
            }
            if (this.receiver.getCredit() + this.pendingSettles <= 0 && !this.creditTopUpInProgress.get()) {
                this.creditTopUpInProgress.set(true);
                this.localQueue.getExecutor().execute(this.checkForNoBacklogRunnable);
            }
        }

        private void checkForNoBacklogOnQueue() {
            if (this.localQueue.getPendingMessageCount() == 0L) {
                this.lastBacklogCheckDelay = 0;
                this.connection.runLater(this.performCreditTopUpRunnable);
            } else {
                this.lastBacklogCheckDelay = AMQPBridgeFromQueueReceiver.caclulateNextDelay(this.lastBacklogCheckDelay, 2, 30);
                AMQPBridgeFromQueueReceiver.this.bridgeManager.getScheduler().schedule(() -> this.localQueue.getExecutor().execute(this.checkForNoBacklogRunnable), (long)this.lastBacklogCheckDelay, TimeUnit.SECONDS);
            }
        }

        private void performCreditTopUp() {
            this.connection.requireInHandler();
            try {
                if (this.receiver.getLocalState() != EndpointState.ACTIVE) {
                    return;
                }
                this.receiver.flow(AMQPBridgeFromQueueReceiver.this.configuration.getPullReceiverBatchSize());
                this.connection.instantFlush();
                this.lastBacklogCheckDelay = 0;
            }
            finally {
                this.creditTopUpInProgress.set(false);
            }
        }
    }
}

