package org.apache.activemq.artemis.protocol.amqp.connect.federation;

import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.activemq.artemis.protocol.amqp.federation.Federation;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpJmsSelectorFilter;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.MessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.class */
public class AMQPFederationQueueConsumer implements FederationConsumerInternal {
    public static final int DEFAULT_PULL_CREDIT_BATCH_SIZE = 100;
    public static final int DEFAULT_PENDING_MSG_CHECK_BACKOFF_MULTIPLIER = 2;
    public static final int DEFAULT_PENDING_MSG_CHECK_MAX_DELAY = 30;
    private final AMQPFederation federation;
    private final AMQPFederationConsumerConfiguration configuration;
    private final FederationConsumerInfo consumerInfo;
    private final FederationReceiveFromQueuePolicy policy;
    private final AMQPConnectionContext connection;
    private final AMQPSessionContext session;
    private final Predicate<Link> remoteCloseIntercepter = this::remoteLinkClosedIntercepter;
    private final Transformer transformer;
    private AMQPFederatedQueueDeliveryReceiver receiver;
    private Receiver protonReceiver;
    private boolean started;
    private volatile boolean closed;
    private Consumer<FederationConsumerInternal> remoteCloseHandler;
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final Symbol[] DEFAULT_OUTCOMES = {Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL};

    /* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer$AMQPFederatedQueueDeliveryReceiver.class */
    private class AMQPFederatedQueueDeliveryReceiver extends ProtonServerReceiverContext {
        private final SimpleString cachedFqqn;
        private final Queue localQueue;
        private MessageReader coreMessageReader;
        private MessageReader coreLargeMessageReader;
        private final AtomicBoolean creditTopUpInProgress;
        private final Runnable checkForNoBacklogRunnable;
        private final Runnable performCreditTopUpRunnable;
        private int lastBacklogCheckDelay;

        AMQPFederatedQueueDeliveryReceiver(Queue queue, Receiver receiver) {
            super(AMQPFederationQueueConsumer.this.session.getSessionSPI(), AMQPFederationQueueConsumer.this.session.getAMQPConnectionContext(), AMQPFederationQueueConsumer.this.session, receiver);
            this.creditTopUpInProgress = new AtomicBoolean();
            this.checkForNoBacklogRunnable = this::checkForNoBacklogOnQueue;
            this.performCreditTopUpRunnable = this::performCreditTopUp;
            this.localQueue = queue;
            this.cachedFqqn = SimpleString.toSimpleString(AMQPFederationQueueConsumer.this.consumerInfo.getFqqn());
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext, org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver, org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
        public void close(boolean z) throws ActiveMQAMQPException {
            super.close(z);
            if (!z || AMQPFederationQueueConsumer.this.remoteCloseHandler == null) {
                return;
            }
            try {
                AMQPFederationQueueConsumer.this.remoteCloseHandler.accept(AMQPFederationQueueConsumer.this);
            } catch (Exception e) {
                AMQPFederationQueueConsumer.logger.debug("User remote closed handler threw error: ", e);
            } finally {
                AMQPFederationQueueConsumer.this.remoteCloseHandler = null;
            }
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext, org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable
        public void initialize() throws Exception {
            this.initialized = true;
            Target remoteTarget = this.receiver.getRemoteTarget();
            this.receiver.setSenderSettleMode(this.receiver.getRemoteSenderSettleMode());
            this.receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
            if (remoteTarget == null || remoteTarget.getAddress() == null || remoteTarget.getAddress().isEmpty()) {
                throw new ActiveMQAMQPInternalErrorException("Remote should have sent an valid Target but we got: " + remoteTarget);
            }
            this.address = SimpleString.toSimpleString(remoteTarget.getAddress());
            this.defRoutingType = getRoutingType(remoteTarget.getCapabilities(), this.address);
            try {
                if (!this.sessionSPI.queueQuery(this.address, this.defRoutingType, false).isExists()) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(this.address.toString());
                }
                flow();
            } catch (Exception e) {
                AMQPFederationQueueConsumer.logger.debug(e.getMessage(), e);
                throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
            } catch (ActiveMQAMQPNotFoundException e2) {
                throw e2;
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver
        public MessageReader trySelectMessageReader(Receiver receiver, Delivery delivery) {
            if (delivery.getMessageFormat() == 1183580416) {
                if (this.coreMessageReader != null) {
                    return this.coreMessageReader;
                }
                AMQPTunneledCoreMessageReader aMQPTunneledCoreMessageReader = new AMQPTunneledCoreMessageReader(this);
                this.coreMessageReader = aMQPTunneledCoreMessageReader;
                return aMQPTunneledCoreMessageReader;
            }
            if (delivery.getMessageFormat() != 1183580672) {
                return super.trySelectMessageReader(receiver, delivery);
            }
            if (this.coreLargeMessageReader != null) {
                return this.coreLargeMessageReader;
            }
            AMQPTunneledCoreLargeMessageReader aMQPTunneledCoreLargeMessageReader = new AMQPTunneledCoreLargeMessageReader(this);
            this.coreLargeMessageReader = aMQPTunneledCoreLargeMessageReader;
            return aMQPTunneledCoreLargeMessageReader;
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext, org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver
        protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnotations deliveryAnnotations, Receiver receiver, Transaction transaction) {
            try {
                if (AMQPFederationQueueConsumer.logger.isTraceEnabled()) {
                    AMQPFederationQueueConsumer.logger.trace("AMQP Federation {} queue consumer {} dispatching incoming message: {}", new Object[]{AMQPFederationQueueConsumer.this.federation.getName(), AMQPFederationQueueConsumer.this.consumerInfo, message});
                }
                Message transform = AMQPFederationQueueConsumer.this.transformer.transform(message);
                if (transform != message && AMQPFederationQueueConsumer.logger.isTraceEnabled()) {
                    AMQPFederationQueueConsumer.logger.trace("The transformer {} replaced the original message {} with a new instance {}", new Object[]{AMQPFederationQueueConsumer.this.transformer, message, transform});
                }
                AMQPFederationQueueConsumer.this.signalBeforeFederationConsumerMessageHandled(transform);
                this.sessionSPI.serverSend(this, transaction, receiver, delivery, this.cachedFqqn, this.routingContext, transform);
                AMQPFederationQueueConsumer.this.signalAfterFederationConsumerMessageHandled(transform);
            } catch (Exception e) {
                AMQPFederationQueueConsumer.logger.warn("Inbound delivery for {} encountered an error: {}", new Object[]{AMQPFederationQueueConsumer.this.consumerInfo, e.getMessage(), e});
                deliveryFailed(delivery, receiver, e);
            }
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver
        protected Runnable createCreditRunnable(AMQPConnectionContext aMQPConnectionContext) {
            return AMQPFederationQueueConsumer.this.federation.getReceiverCredits() > 0 ? createCreditRunnable(AMQPFederationQueueConsumer.this.configuration.getReceiverCredits(), AMQPFederationQueueConsumer.this.configuration.getReceiverCreditsLow(), this.receiver, aMQPConnectionContext, this) : this::checkIfCreditTopUpNeeded;
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver
        protected int getConfiguredMinLargeMessageSize(AMQPConnectionContext aMQPConnectionContext) {
            return AMQPFederationQueueConsumer.this.configuration.getLargeMessageThreshold();
        }

        private void checkIfCreditTopUpNeeded() {
            if (!this.connection.isHandler()) {
                this.connection.runLater(this.creditRunnable);
            } else {
                if (this.receiver.getCredit() + this.pendingSettles > 0 || this.creditTopUpInProgress.get()) {
                    return;
                }
                this.creditTopUpInProgress.set(true);
                this.localQueue.getExecutor().execute(this.checkForNoBacklogRunnable);
            }
        }

        private void checkForNoBacklogOnQueue() {
            if (this.localQueue.getPendingMessageCount() == 0) {
                this.lastBacklogCheckDelay = 0;
                this.connection.runLater(this.performCreditTopUpRunnable);
            } else {
                this.lastBacklogCheckDelay = AMQPFederationQueueConsumer.caclulateNextDelay(this.lastBacklogCheckDelay, 2, 30);
                AMQPFederationQueueConsumer.this.federation.getScheduler().schedule(() -> {
                    this.localQueue.getExecutor().execute(this.checkForNoBacklogRunnable);
                }, this.lastBacklogCheckDelay, TimeUnit.SECONDS);
            }
        }

        private void performCreditTopUp() {
            this.connection.requireInHandler();
            if (this.receiver.getLocalState() != EndpointState.ACTIVE) {
                return;
            }
            this.receiver.flow(100);
            this.connection.instantFlush();
            this.lastBacklogCheckDelay = 0;
            this.creditTopUpInProgress.set(false);
        }
    }

    public AMQPFederationQueueConsumer(AMQPFederation aMQPFederation, AMQPFederationConsumerConfiguration aMQPFederationConsumerConfiguration, AMQPSessionContext aMQPSessionContext, FederationConsumerInfo federationConsumerInfo, FederationReceiveFromQueuePolicy federationReceiveFromQueuePolicy) {
        this.federation = aMQPFederation;
        this.consumerInfo = federationConsumerInfo;
        this.policy = federationReceiveFromQueuePolicy;
        this.connection = aMQPSessionContext.getAMQPConnectionContext();
        this.session = aMQPSessionContext;
        this.configuration = aMQPFederationConsumerConfiguration;
        TransformerConfiguration transformerConfiguration = federationReceiveFromQueuePolicy.getTransformerConfiguration();
        if (transformerConfiguration != null) {
            this.transformer = aMQPFederation.getServer().getServiceRegistry().getFederationTransformer(federationReceiveFromQueuePolicy.getPolicyName(), transformerConfiguration);
        } else {
            this.transformer = message -> {
                return message;
            };
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer
    public Federation getFederation() {
        return this.federation;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer
    public FederationConsumerInfo getConsumerInfo() {
        return this.consumerInfo;
    }

    public FederationReceiveFromQueuePolicy getPolicy() {
        return this.policy;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal
    public synchronized void start() {
        if (this.started || this.closed) {
            return;
        }
        this.started = true;
        asyncCreateReceiver();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        if (this.started) {
            this.started = false;
            this.connection.runLater(() -> {
                this.federation.removeLinkClosedInterceptor(this.consumerInfo.getFqqn());
                if (this.receiver != null) {
                    try {
                        this.receiver.close(false);
                    } catch (ActiveMQAMQPException e) {
                    } finally {
                        this.receiver = null;
                    }
                }
                if (this.protonReceiver != null) {
                    try {
                        this.protonReceiver.close();
                    } finally {
                        this.protonReceiver = null;
                    }
                }
                this.connection.flush();
            });
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal
    public synchronized AMQPFederationQueueConsumer setRemoteClosedHandler(Consumer<FederationConsumerInternal> consumer) {
        if (this.started) {
            throw new IllegalStateException("Cannot set a remote close handler after the consumer is started");
        }
        this.remoteCloseHandler = consumer;
        return this;
    }

    protected boolean remoteLinkClosedIntercepter(Link link) {
        if (link != this.protonReceiver || link.getRemoteCondition() == null || link.getRemoteCondition().getCondition() == null) {
            return false;
        }
        Symbol condition = link.getRemoteCondition().getCondition();
        return AmqpSupport.RESOURCE_DELETED.equals(condition) || AmqpSupport.NOT_FOUND.equals(condition) || AmqpSupport.DETACH_FORCED.equals(condition);
    }

    private void signalBeforeFederationConsumerMessageHandled(Message message) throws ActiveMQException {
        try {
            this.federation.getServer().callBrokerAMQPFederationPlugins(aMQPFederationBrokerPlugin -> {
                if (aMQPFederationBrokerPlugin instanceof ActiveMQServerAMQPFederationPlugin) {
                    ((ActiveMQServerAMQPFederationPlugin) aMQPFederationBrokerPlugin).beforeFederationConsumerMessageHandled(this, message);
                }
            });
        } catch (ActiveMQException e) {
            ActiveMQServerLogger.LOGGER.federationPluginExecutionError("beforeFederationConsumerMessageHandled", e);
        }
    }

    private void signalAfterFederationConsumerMessageHandled(Message message) throws ActiveMQException {
        try {
            this.federation.getServer().callBrokerAMQPFederationPlugins(aMQPFederationBrokerPlugin -> {
                if (aMQPFederationBrokerPlugin instanceof ActiveMQServerAMQPFederationPlugin) {
                    ((ActiveMQServerAMQPFederationPlugin) aMQPFederationBrokerPlugin).afterFederationConsumerMessageHandled(this, message);
                }
            });
        } catch (ActiveMQException e) {
            ActiveMQServerLogger.LOGGER.federationPluginExecutionError("afterFederationConsumerMessageHandled", e);
        }
    }

    private String generateLinkName() {
        return "federation-" + this.federation.getName() + "-queue-receiver-" + this.consumerInfo.getFqqn() + "-" + this.federation.getServer().getNodeID();
    }

    private void asyncCreateReceiver() {
        this.connection.runLater(() -> {
            if (this.closed) {
                return;
            }
            try {
                Receiver receiver = this.session.getSession().receiver(generateLinkName());
                Target target = new Target();
                Source source = new Source();
                String fqqn = this.consumerInfo.getFqqn();
                Queue locateQueue = this.federation.getServer().locateQueue(this.consumerInfo.getQueueName());
                if (RoutingType.ANYCAST.equals(this.consumerInfo.getRoutingType())) {
                    source.setCapabilities(new Symbol[]{AmqpSupport.QUEUE_CAPABILITY});
                } else {
                    source.setCapabilities(new Symbol[]{AmqpSupport.TOPIC_CAPABILITY});
                }
                source.setOutcomes((Symbol[]) Arrays.copyOf(DEFAULT_OUTCOMES, DEFAULT_OUTCOMES.length));
                source.setDurable(TerminusDurability.NONE);
                source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
                source.setAddress(fqqn);
                if (this.consumerInfo.getFilterString() != null && !this.consumerInfo.getFilterString().isEmpty()) {
                    AmqpJmsSelectorFilter amqpJmsSelectorFilter = new AmqpJmsSelectorFilter(this.consumerInfo.getFilterString());
                    HashMap hashMap = new HashMap();
                    hashMap.put(AmqpSupport.JMS_SELECTOR_KEY, amqpJmsSelectorFilter);
                    source.setFilter(hashMap);
                }
                target.setAddress(fqqn);
                HashMap hashMap2 = new HashMap();
                hashMap2.put(AMQPFederationConstants.FEDERATION_RECEIVER_PRIORITY, Integer.valueOf(this.consumerInfo.getPriority()));
                receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
                receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
                receiver.setDesiredCapabilities(new Symbol[]{AMQPFederationConstants.FEDERATION_QUEUE_RECEIVER});
                if (this.configuration.isCoreMessageTunnelingEnabled()) {
                    receiver.setOfferedCapabilities(new Symbol[]{AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT});
                }
                receiver.setProperties(hashMap2);
                receiver.setTarget(target);
                receiver.setSource(source);
                receiver.open();
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                ScheduledFuture<?> schedule = this.federation.getLinkAttachTimeout() > 0 ? this.federation.getServer().getScheduledPool().schedule(() -> {
                    atomicBoolean.set(true);
                    this.federation.signalResourceCreateError(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout());
                }, this.federation.getLinkAttachTimeout(), TimeUnit.SECONDS) : null;
                this.protonReceiver = receiver;
                ScheduledFuture<?> scheduledFuture = schedule;
                receiver.attachments().set(AmqpSupport.AMQP_LINK_INITIALIZER_KEY, Runnable.class, () -> {
                    if (scheduledFuture != null) {
                        try {
                            scheduledFuture.cancel(false);
                        } catch (Exception e) {
                            this.federation.signalError(e);
                            return;
                        }
                    }
                    if (atomicBoolean.get()) {
                        return;
                    }
                    if (receiver.getRemoteSource() != null && !AmqpSupport.verifyOfferedCapabilities(receiver, AMQPFederationConstants.FEDERATION_QUEUE_RECEIVER)) {
                        this.federation.signalResourceCreateError(ActiveMQAMQPProtocolMessageBundle.BUNDLE.missingOfferedCapability(AMQPFederationConstants.FEDERATION_QUEUE_RECEIVER.toString()));
                        return;
                    }
                    this.federation.addLinkClosedInterceptor(this.consumerInfo.getFqqn(), this.remoteCloseIntercepter);
                    this.receiver = new AMQPFederatedQueueDeliveryReceiver(locateQueue, receiver);
                    if (receiver.getRemoteSource() != null) {
                        logger.debug("AMQP Federation {} queue consumer {} completed open", this.federation.getName(), this.consumerInfo);
                    } else {
                        logger.debug("AMQP Federation {} queue consumer {} rejected by remote", this.federation.getName(), this.consumerInfo);
                    }
                    this.session.addReceiver(receiver, (aMQPSessionContext, receiver2) -> {
                        return this.receiver;
                    });
                });
            } catch (Exception e) {
                this.federation.signalError(e);
            }
            this.connection.flush();
        });
    }

    private static int caclulateNextDelay(int i, int i2, int i3) {
        return i == 0 ? 1 : Math.min(i * i2, i3);
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal
    public /* bridge */ /* synthetic */ FederationConsumerInternal setRemoteClosedHandler(Consumer consumer) {
        return setRemoteClosedHandler((Consumer<FederationConsumerInternal>) consumer);
    }
}
