/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.protocol.openwire;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import javax.jms.InvalidClientIDException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.LongSequenceGenerator;

public class OpenWireProtocolManager
implements ProtocolManager<Interceptor>,
NotificationListener {
    private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
    private static final IdGenerator ID_GENERATOR = new IdGenerator();
    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
    private final ActiveMQServer server;
    private final OpenWireProtocolManagerFactory factory;
    private OpenWireFormatFactory wireFactory;
    private boolean tightEncodingEnabled = true;
    private boolean prefixPacketSize = true;
    private BrokerId brokerId;
    protected final ProducerId advisoryProducerId = new ProducerId();
    protected final Map<ConnectionId, OpenWireConnection> brokerConnectionStates = Collections.synchronizedMap(new HashMap());
    private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList();
    protected final ConcurrentMap<ConnectionId, ConnectionInfo> connectionInfos = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
    private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<String, AMQConnectionContext>();
    private String brokerName;
    private Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<SessionId, AMQSession>();
    private Map<TransactionId, AMQSession> transactions = new ConcurrentHashMap<TransactionId, AMQSession>();
    private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<String, SessionId>();
    private final ScheduledExecutorService scheduledPool;

    public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
        this.factory = factory;
        this.server = server;
        this.wireFactory = new OpenWireFormatFactory();
        this.wireFactory.setCacheEnabled(false);
        this.advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
        ManagementService service = server.getManagementService();
        this.scheduledPool = server.getScheduledPool();
        if (service != null) {
            service.addNotificationListener((NotificationListener)this);
        }
    }

    public ProtocolManagerFactory<Interceptor> getFactory() {
        return this.factory;
    }

    public void updateInterceptors(List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors) {
    }

    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
        OpenWireFormat wf = (OpenWireFormat)this.wireFactory.createWireFormat();
        OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, this, wf);
        owConn.init();
        return new ConnectionEntry((RemotingConnection)owConn, null, System.currentTimeMillis(), 60000L);
    }

    public MessageConverter getConverter() {
        return new OpenWireMessageConverter();
    }

    public void removeHandler(String name) {
    }

    public void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer) {
    }

    public void addChannelHandlers(ChannelPipeline pipeline) {
        pipeline.addLast("packet-decipher", (ChannelHandler)new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4));
    }

    public boolean isProtocol(byte[] array) {
        int remainingLen;
        if (array.length < 8) {
            throw new IllegalArgumentException("Protocol header length changed " + array.length);
        }
        int start = this.prefixPacketSize ? 4 : 0;
        int j = 0;
        if (array[start] != 1) {
            return false;
        }
        WireFormatInfo info = new WireFormatInfo();
        byte[] magic = info.getMagic();
        int useLen = (remainingLen = array.length - ++start) > magic.length ? magic.length : remainingLen;
        useLen += start;
        for (int i = start; i < useLen; ++i) {
            if (array[i] != magic[j]) {
                return false;
            }
            ++j;
        }
        return true;
    }

    public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
    }

    public void handleCommand(OpenWireConnection openWireConnection, Object command) throws Exception {
        Command amqCmd = (Command)command;
        byte type = amqCmd.getDataStructureType();
        switch (type) {
            case 3: {
                break;
            }
            case 18: {
                break;
            }
            case 20: {
                MessagePull messagePull = (MessagePull)amqCmd;
                openWireConnection.processMessagePull(messagePull);
                break;
            }
            case 17: {
                break;
            }
            default: {
                throw new IllegalStateException("Cannot handle command: " + command);
            }
        }
    }

    public void sendReply(final OpenWireConnection connection, final Command command) {
        this.server.getStorageManager().afterCompleteOperations(new IOCallback(){

            public void onError(int errorCode, String errorMessage) {
                ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(Integer.valueOf(errorCode), errorMessage);
            }

            public void done() {
                OpenWireProtocolManager.this.send(connection, command);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean send(OpenWireConnection connection, Command command) {
        if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
            ActiveMQServerLogger.LOGGER.trace((Object)("sending " + command));
        }
        OpenWireConnection openWireConnection = connection;
        synchronized (openWireConnection) {
            if (connection.isDestroyed()) {
                return false;
            }
            try {
                connection.physicalSend(command);
            }
            catch (Exception e) {
                return false;
            }
            catch (Throwable t) {
                return false;
            }
            return true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void addConnection(AMQConnectionContext context, ConnectionInfo info) throws Exception {
        String password;
        String username = info.getUserName();
        if (!this.validateUser(username, password = info.getPassword())) {
            throw new SecurityException("User name [" + username + "] or password is invalid.");
        }
        String clientId = info.getClientId();
        if (clientId == null) {
            throw new InvalidClientIDException("No clientID specified for connection request");
        }
        Map<String, AMQConnectionContext> map = this.clientIdSet;
        synchronized (map) {
            AMQConnectionContext oldContext = this.clientIdSet.get(clientId);
            if (oldContext != null) {
                if (!context.isAllowLinkStealing()) throw new InvalidClientIDException("Broker: " + this.getBrokerName() + " - Client: " + clientId + " already connected from " + oldContext.getConnection().getRemoteAddress());
                this.clientIdSet.remove(clientId);
                if (oldContext.getConnection() != null) {
                    OpenWireConnection connection = oldContext.getConnection();
                    connection.disconnect(true);
                }
            } else {
                this.clientIdSet.put(clientId, context);
            }
        }
        this.connections.add(context.getConnection());
        ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
        ConnectionInfo copy = info.copy();
        copy.setPassword("");
        this.fireAdvisory(context, topic, (Command)copy);
        this.connectionInfos.put(copy.getConnectionId(), copy);
        this.addSessions(context.getConnection(), context.getConnectionState().getSessionIds());
    }

    private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command copy) throws Exception {
        this.fireAdvisory(context, topic, copy, null);
    }

    public BrokerId getBrokerId() {
        if (this.brokerId == null) {
            this.brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
        }
        return this.brokerId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
        ActiveMQMessage advisoryMessage = new ActiveMQMessage();
        advisoryMessage.setStringProperty("originBrokerName", this.getBrokerName());
        String id = this.getBrokerId() != null ? this.getBrokerId().getValue() : "NOT_SET";
        advisoryMessage.setStringProperty("originBrokerId", id);
        String url = context.getConnection().getLocalAddress();
        advisoryMessage.setStringProperty("originBrokerURL", url);
        advisoryMessage.setDataStructure((DataStructure)command);
        advisoryMessage.setPersistent(false);
        advisoryMessage.setType("Advisory");
        advisoryMessage.setMessageId(new MessageId(this.advisoryProducerId, this.messageIdGenerator.getNextSequenceId()));
        advisoryMessage.setTargetConsumerId(targetConsumerId);
        advisoryMessage.setDestination((org.apache.activemq.command.ActiveMQDestination)topic);
        advisoryMessage.setResponseRequired(false);
        advisoryMessage.setProducerId(this.advisoryProducerId);
        boolean originalFlowControl = context.isProducerFlowControl();
        AMQProducerBrokerExchange producerExchange = new AMQProducerBrokerExchange();
        producerExchange.setConnectionContext(context);
        producerExchange.setMutable(true);
        producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
        try {
            context.setProducerFlowControl(false);
            AMQSession sess = context.getConnection().getAdvisorySession();
            if (sess != null) {
                sess.send(producerExchange, (Message)advisoryMessage, false);
            }
        }
        finally {
            context.setProducerFlowControl(originalFlowControl);
        }
    }

    public String getBrokerName() {
        if (this.brokerName == null) {
            try {
                this.brokerName = InetAddressUtil.getLocalHostName().toLowerCase(Locale.ENGLISH);
            }
            catch (Exception e) {
                this.brokerName = "localhost";
            }
        }
        return this.brokerName;
    }

    public boolean isFaultTolerantConfiguration() {
        return false;
    }

    public void postProcessDispatch(MessageDispatch md) {
    }

    public boolean isStopped() {
        return false;
    }

    public void preProcessDispatch(MessageDispatch messageDispatch) {
    }

    public boolean isStopping() {
        return false;
    }

    public void addProducer(OpenWireConnection theConn, ProducerInfo info) throws Exception {
        SessionId sessionId = info.getProducerId().getParentId();
        ConnectionId connectionId = sessionId.getParentId();
        ConnectionState cs = theConn.getState();
        if (cs == null) {
            throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + connectionId);
        }
        SessionState ss = cs.getSessionState(sessionId);
        if (ss == null) {
            throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId);
        }
        if (!ss.getProducerIds().contains(info.getProducerId())) {
            AMQSession amqSession = this.sessions.get(sessionId);
            if (amqSession == null) {
                throw new IllegalStateException("Session not exist! : " + sessionId);
            }
            org.apache.activemq.command.ActiveMQDestination destination = info.getDestination();
            if (destination != null && !AdvisorySupport.isAdvisoryTopic((org.apache.activemq.command.ActiveMQDestination)destination)) {
                if (theConn.getProducerCount() >= theConn.getMaximumProducersAllowedPerConnection()) {
                    throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + theConn.getMaximumProducersAllowedPerConnection());
                }
                if (destination.isQueue()) {
                    OpenWireUtil.validateDestination(destination, amqSession);
                }
                DestinationInfo destInfo = new DestinationInfo(theConn.getConext().getConnectionId(), 0, destination);
                this.addDestination(theConn, destInfo);
            }
            amqSession.createProducer(info);
            try {
                ss.addProducer(info);
            }
            catch (IllegalStateException e) {
                amqSession.removeProducer(info);
            }
        }
    }

    public void addConsumer(OpenWireConnection theConn, ConsumerInfo info) throws Exception {
        SessionId sessionId = info.getConsumerId().getParentId();
        ConnectionId connectionId = sessionId.getParentId();
        ConnectionState cs = theConn.getState();
        if (cs == null) {
            throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " + connectionId);
        }
        SessionState ss = cs.getSessionState(sessionId);
        if (ss == null) {
            throw new IllegalStateException(this.server + " Cannot add a consumer to a session that had not been registered: " + sessionId);
        }
        if (!ss.getConsumerIds().contains(info.getConsumerId())) {
            org.apache.activemq.command.ActiveMQDestination destination = info.getDestination();
            if (destination != null && !AdvisorySupport.isAdvisoryTopic((org.apache.activemq.command.ActiveMQDestination)destination) && theConn.getConsumerCount() >= theConn.getMaximumConsumersAllowedPerConnection()) {
                throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + theConn.getMaximumConsumersAllowedPerConnection());
            }
            AMQSession amqSession = this.sessions.get(sessionId);
            if (amqSession == null) {
                throw new IllegalStateException("Session not exist! : " + sessionId);
            }
            amqSession.createConsumer(info, amqSession);
            ss.addConsumer(info);
        }
    }

    public void addSessions(OpenWireConnection theConn, Set<SessionId> sessionSet) {
        for (SessionId sid : sessionSet) {
            this.addSession(theConn, theConn.getState().getSessionState(sid).getInfo(), true);
        }
    }

    public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss) {
        return this.addSession(theConn, ss, false);
    }

    public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss, boolean internal) {
        AMQSession amqSession = new AMQSession(theConn.getState().getInfo(), ss, this.server, theConn, this.scheduledPool, this);
        amqSession.initialize();
        amqSession.setInternal(internal);
        this.sessions.put(ss.getSessionId(), amqSession);
        this.sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId());
        return amqSession;
    }

    public void removeConnection(AMQConnectionContext context, ConnectionInfo info, Throwable error) {
        this.connections.remove(context.getConnection());
        this.connectionInfos.remove(info.getConnectionId());
        String clientId = info.getClientId();
        if (clientId != null) {
            this.clientIdSet.remove(clientId);
        }
    }

    public void removeSession(AMQConnectionContext context, SessionInfo info) throws Exception {
        AMQSession session = this.sessions.remove(info.getSessionId());
        if (session != null) {
            session.close();
        }
    }

    public void removeProducer(ProducerId id) {
        SessionId sessionId = id.getParentId();
        AMQSession session = this.sessions.get(sessionId);
        session.removeProducer(id);
    }

    public AMQSession getSession(SessionId sessionId) {
        return this.sessions.get(sessionId);
    }

    public void removeDestination(OpenWireConnection connection, org.apache.activemq.command.ActiveMQDestination dest) throws Exception {
        if (dest.isQueue()) {
            SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName());
            this.server.destroyQueue(qName);
        } else {
            Bindings bindings = this.server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString((String)("jms.topic." + dest.getPhysicalName())));
            Iterator iterator = bindings.getBindings().iterator();
            while (iterator.hasNext()) {
                Queue b = (Queue)((Binding)iterator.next()).getBindable();
                if (b.getConsumerCount() > 0) {
                    throw new Exception("Destination still has an active subscription: " + dest.getPhysicalName());
                }
                if (b.isDurable()) {
                    throw new Exception("Destination still has durable subscription: " + dest.getPhysicalName());
                }
                b.deleteQueue();
            }
        }
        if (!AdvisorySupport.isAdvisoryTopic((org.apache.activemq.command.ActiveMQDestination)dest)) {
            AMQConnectionContext context = connection.getConext();
            DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), 1, dest);
            ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic((org.apache.activemq.command.ActiveMQDestination)dest);
            this.fireAdvisory(context, topic, (Command)advInfo);
        }
    }

    public void addDestination(OpenWireConnection connection, DestinationInfo info) throws Exception {
        org.apache.activemq.command.ActiveMQDestination dest = info.getDestination();
        if (dest.isQueue()) {
            SimpleString qName = OpenWireUtil.toCoreAddress(dest);
            QueueBinding binding = (QueueBinding)this.server.getPostOffice().getBinding(qName);
            if (binding == null) {
                ConnectionInfo connInfo;
                if (connection.getState().getInfo() != null) {
                    CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
                    this.server.getSecurityStore().check(qName, checkType, (SecurityAuth)connection);
                    this.server.checkQueueCreationLimit(connection.getUsername());
                }
                this.server.createQueue(qName, qName, null, (connInfo = connection.getState().getInfo()) == null ? null : SimpleString.toSimpleString((String)connInfo.getUserName()), false, dest.isTemporary());
            }
            if (dest.isTemporary()) {
                connection.registerTempQueue(dest);
            }
        }
        if (!AdvisorySupport.isAdvisoryTopic((org.apache.activemq.command.ActiveMQDestination)dest)) {
            AMQConnectionContext context = connection.getConext();
            DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), 0, dest);
            ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic((org.apache.activemq.command.ActiveMQDestination)dest);
            this.fireAdvisory(context, topic, (Command)advInfo);
        }
    }

    public void endTransaction(TransactionInfo info) throws Exception {
        AMQSession txSession = this.transactions.get(info.getTransactionId());
        if (txSession != null) {
            txSession.endTransaction(info);
        }
    }

    public void commitTransactionOnePhase(TransactionInfo info) throws Exception {
        AMQSession txSession = this.transactions.get(info.getTransactionId());
        if (txSession != null) {
            txSession.commitOnePhase(info);
        }
        this.transactions.remove(info.getTransactionId());
    }

    public void prepareTransaction(TransactionInfo info) throws Exception {
        XATransactionId xid = (XATransactionId)info.getTransactionId();
        AMQSession txSession = this.transactions.get(xid);
        if (txSession != null) {
            txSession.prepareTransaction(xid);
        }
    }

    public void commitTransactionTwoPhase(TransactionInfo info) throws Exception {
        XATransactionId xid = (XATransactionId)info.getTransactionId();
        AMQSession txSession = this.transactions.get(xid);
        if (txSession != null) {
            txSession.commitTwoPhase(xid);
        }
        this.transactions.remove(xid);
    }

    public void rollbackTransaction(TransactionInfo info) throws Exception {
        AMQSession txSession = this.transactions.get(info.getTransactionId());
        if (txSession != null) {
            txSession.rollback(info);
        }
        this.transactions.remove(info.getTransactionId());
    }

    public TransactionId[] recoverTransactions(Set<SessionId> sIds) {
        ArrayList<TransactionId> recovered = new ArrayList<TransactionId>();
        if (sIds != null) {
            for (SessionId sid : sIds) {
                AMQSession s = this.sessions.get(sid);
                if (s == null) continue;
                s.recover(recovered);
            }
        }
        return recovered.toArray(new TransactionId[0]);
    }

    public boolean validateUser(String login, String passcode) {
        boolean validated = true;
        ActiveMQSecurityManager sm = this.server.getSecurityManager();
        if (sm != null && this.server.getConfiguration().isSecurityEnabled()) {
            validated = sm.validateUser(login, passcode);
        }
        return validated;
    }

    public void forgetTransaction(TransactionId xid) throws Exception {
        AMQSession txSession = this.transactions.get(xid);
        if (txSession != null) {
            txSession.forget(xid);
        }
        this.transactions.remove(xid);
    }

    public void registerTx(TransactionId txId, AMQSession amqSession) {
        this.transactions.put(txId, amqSession);
    }

    public void onNotification(Notification notif) {
        try {
            if (notif.getType() instanceof CoreNotificationType) {
                CoreNotificationType type = (CoreNotificationType)notif.getType();
                switch (type) {
                    case CONSUMER_SLOW: {
                        this.fireSlowConsumer(notif);
                        break;
                    }
                }
            }
        }
        catch (Exception e) {
            ActiveMQServerLogger.LOGGER.error((Object)("Failed to send notification " + notif), (Throwable)e);
        }
    }

    private void fireSlowConsumer(Notification notif) throws Exception {
        SimpleString coreSessionId = notif.getProperties().getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME);
        Long coreConsumerId = notif.getProperties().getLongProperty(ManagementHelper.HDR_CONSUMER_NAME);
        SessionId sessionId = this.sessionIdMap.get(coreSessionId.toString());
        AMQSession session = this.sessions.get(sessionId);
        AMQConsumer consumer = session.getConsumer(coreConsumerId);
        org.apache.activemq.command.ActiveMQDestination destination = consumer.getDestination();
        if (!AdvisorySupport.isAdvisoryTopic((org.apache.activemq.command.ActiveMQDestination)destination)) {
            ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic((org.apache.activemq.command.ActiveMQDestination)destination);
            ConnectionId connId = sessionId.getParentId();
            OpenWireConnection cc = this.brokerConnectionStates.get(connId);
            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
            advisoryMessage.setStringProperty("consumerId", consumer.getId().toString());
            this.fireAdvisory(cc.getConext(), topic, (Command)advisoryMessage, consumer.getId());
        }
    }

    public void removeSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
        SimpleString subQueueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription((boolean)true, (String)subInfo.getClientId(), (String)subInfo.getSubscriptionName()));
        this.server.destroyQueue(subQueueName);
    }

    public void sendBrokerInfo(OpenWireConnection connection) {
        BrokerInfo brokerInfo = new BrokerInfo();
        brokerInfo.setBrokerName(this.server.getIdentity());
        brokerInfo.setBrokerId(new BrokerId(this.server.getNodeID().toString()));
        brokerInfo.setPeerBrokerInfos(null);
        brokerInfo.setFaultTolerantConfiguration(false);
        brokerInfo.setBrokerURL(connection.getLocalAddress());
        brokerInfo.setPeerBrokerInfos(null);
        connection.dispatchAsync((Command)brokerInfo);
    }
}

