/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.protocol.amqp.connect.federation;

import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConsumer;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConsumerManager;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationGenericConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationLocalPolicyManager;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationMetrics;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationQueueConsumer;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class AMQPFederationQueuePolicyManager
extends AMQPFederationLocalPolicyManager
implements ActiveMQServerConsumerPlugin,
ActiveMQServerBindingPlugin {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    protected final Predicate<ServerConsumer> federationConsumerMatcher;
    protected final FederationReceiveFromQueuePolicy policy;
    protected final Map<FederationConsumerInfo, AMQPFederationQueueConsumerManager> federationConsumers = new HashMap<FederationConsumerInfo, AMQPFederationQueueConsumerManager>();

    public AMQPFederationQueuePolicyManager(AMQPFederation federation, AMQPFederationMetrics metrics, FederationReceiveFromQueuePolicy queuePolicy) throws ActiveMQException {
        super(federation, metrics, queuePolicy);
        Objects.requireNonNull(queuePolicy, "The Queue match policy cannot be null");
        this.policy = queuePolicy;
        this.federationConsumerMatcher = this.createFederationConsumerMatcher(this.server, queuePolicy);
    }

    @Override
    public FederationReceiveFromQueuePolicy getPolicy() {
        return this.policy;
    }

    @Override
    protected void safeCleanupManagerResources(boolean force) {
        try {
            this.federationConsumers.values().forEach(entry -> {
                if (entry != null) {
                    if (this.isConnected() && !force) {
                        entry.shutdown();
                    } else {
                        entry.shutdownNow();
                    }
                }
            });
        }
        finally {
            this.federationConsumers.clear();
        }
    }

    public synchronized void afterCreateConsumer(ServerConsumer consumer) {
        if (this.isActive()) {
            this.reactIfConsumerMatchesPolicy(consumer);
        }
    }

    public synchronized void afterCloseConsumer(ServerConsumer consumer, boolean failed) {
        FederationConsumerInfo consumerInfo;
        AMQPFederationQueueConsumerManager entry;
        if (this.isActive() && (entry = this.federationConsumers.get(consumerInfo = this.createConsumerInfo(consumer))) != null) {
            logger.trace("Reducing demand on federated queue {}", (Object)entry.getQueueName());
            entry.removeDemand(AMQPFederationQueuePolicyManager.identifyConsumer(consumer));
        }
    }

    public synchronized void afterRemoveBinding(Binding binding, Transaction tx, boolean deleteData) throws ActiveMQException {
        if (binding instanceof QueueBinding) {
            QueueBinding queueBinding = (QueueBinding)binding;
            String queueName = queueBinding.getQueue().getName().toString();
            this.federationConsumers.values().removeIf(entry -> {
                if (entry.getQueueName().equals(queueName)) {
                    logger.trace("Federated queue {} was removed, closing federation consumer", (Object)queueName);
                    entry.shutdownNow();
                    return true;
                }
                return false;
            });
        }
    }

    @Override
    protected void scanAllBindings() {
        this.server.getPostOffice().getAllBindings().filter(b -> b instanceof QueueBinding).map(b -> (QueueBinding)b).forEach(b -> this.checkQueueForMatch(b.getQueue()));
    }

    private void checkQueueForMatch(Queue queue) {
        queue.getConsumers().stream().filter(consumer -> consumer instanceof ServerConsumer).map(c -> (ServerConsumer)c).forEach(this::reactIfConsumerMatchesPolicy);
    }

    private void reactIfConsumerMatchesPolicy(ServerConsumer consumer) {
        String queueName = consumer.getQueue().getName().toString();
        if (this.testIfQueueMatchesPolicy(consumer.getQueueAddress().toString(), queueName)) {
            AMQPFederationQueueConsumerManager entry;
            if (this.federationConsumerMatcher.test(consumer)) {
                return;
            }
            logger.trace("Federation Policy matched on consumer for binding: {}", (Object)consumer.getBinding());
            FederationConsumerInfo consumerInfo = this.createConsumerInfo(consumer);
            if (this.federationConsumers.containsKey(consumerInfo)) {
                logger.trace("Federation Queue Policy manager found existing demand for queue: {}, adding demand", (Object)queueName);
                entry = this.federationConsumers.get(consumerInfo);
            } else {
                entry = new AMQPFederationQueueConsumerManager(this, consumerInfo, consumer.getQueue());
                this.federationConsumers.put(consumerInfo, entry);
            }
            entry.addDemand(AMQPFederationQueuePolicyManager.identifyConsumer(consumer));
        }
    }

    public synchronized void afterRemoteQueueAdded(String addressName, String queueName) throws Exception {
        Queue queue;
        if (this.isActive() && this.testIfQueueMatchesPolicy(queueName) && (queue = this.server.locateQueue(queueName)) != null) {
            this.federationConsumers.forEach((k, v) -> {
                if (k.getQueueName().equals(queueName)) {
                    v.recover();
                }
            });
        }
    }

    private boolean testIfQueueMatchesPolicy(String address, String queueName) {
        return this.policy.test(address, queueName);
    }

    private boolean testIfQueueMatchesPolicy(String queueName) {
        return this.policy.testQueue(queueName);
    }

    private FederationConsumerInfo createConsumerInfo(ServerConsumer consumer) {
        Queue queue = consumer.getQueue();
        String queueName = queue.getName().toString();
        String address = queue.getAddress().toString();
        int priority = this.configuration.isIgnoreSubscriptionPriorities() ? ActiveMQDefaultConfiguration.getDefaultConsumerPriority() + this.policy.getPriorityAjustment() : consumer.getPriority() + this.policy.getPriorityAjustment();
        String filterString = AMQPFederationQueuePolicyManager.selectFilter(queue.getFilter(), this.configuration.isIgnoreSubscriptionFilters() ? null : consumer.getFilter());
        return new AMQPFederationGenericConsumerInfo(FederationConsumerInfo.Role.QUEUE_CONSUMER, address, queueName, queue.getRoutingType(), filterString, CompositeAddress.toFullyQualified((String)address, (String)queueName), priority);
    }

    @Override
    protected AMQPFederationConsumer createFederationConsumer(FederationConsumerInfo consumerInfo) {
        Objects.requireNonNull(consumerInfo, "Federation Queue consumer information object was null");
        if (logger.isTraceEnabled()) {
            logger.trace("AMQP Federation {} creating queue consumer: {} for policy: {}", new Object[]{this.federation.getName(), consumerInfo, this.policy.getPolicyName()});
        }
        return new AMQPFederationQueueConsumer(this, this.configuration, this.session, consumerInfo, this.metrics.newConsumerMetrics());
    }

    private Predicate<ServerConsumer> createFederationConsumerMatcher(ActiveMQServer server, FederationReceiveFromQueuePolicy policy) throws ActiveMQException {
        if (policy.isIncludeFederated()) {
            return consumer -> false;
        }
        Filter metaDataMatcher = FilterImpl.createFilter((String)"\"federation-name\" IS NOT NULL");
        return consumer -> {
            ServerSession serverSession = server.getSessionByID(consumer.getSessionID());
            if (serverSession != null && serverSession.getMetaData() != null) {
                return metaDataMatcher.match(serverSession.getMetaData());
            }
            return false;
        };
    }

    private static String identifyConsumer(ServerConsumer consumer) {
        return consumer.getConnectionID().toString() + ":" + consumer.getSessionID() + ":" + consumer.getID();
    }

    private static String selectFilter(Filter queueFilter, Filter consumerFilter) {
        if (consumerFilter != null) {
            return consumerFilter.getFilterString().toString();
        }
        return queueFilter != null ? queueFilter.getFilterString().toString() : null;
    }

    private static class AMQPFederationQueueConsumerManager
    extends AMQPFederationConsumerManager {
        private final AMQPFederationQueuePolicyManager manager;
        private final Queue queue;
        private final FederationConsumerInfo consumerInfo;

        AMQPFederationQueueConsumerManager(AMQPFederationQueuePolicyManager manager, FederationConsumerInfo consumerInfo, Queue queue) {
            super(manager);
            this.manager = manager;
            this.queue = queue;
            this.consumerInfo = consumerInfo;
        }

        public String getQueueName() {
            return this.queue.getName().toString();
        }

        @Override
        protected AMQPFederationConsumer createFederationConsumer() {
            return this.manager.createFederationConsumer(this.consumerInfo);
        }

        @Override
        protected boolean isPluginBlockingFederationConsumerCreate() {
            return this.manager.isPluginBlockingFederationConsumerCreate(this.queue);
        }
    }
}

