/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.remoting.server.impl;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.net.MalformedURLException;
import java.net.URL;
import java.security.AccessController;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.security.auth.Subject;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.management.AcceptorControl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ConfigurationUtils;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ServiceRegistry;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ssl.OpenSSLContextFactory;
import org.apache.activemq.artemis.spi.core.remoting.ssl.OpenSSLContextFactoryProvider;
import org.apache.activemq.artemis.spi.core.remoting.ssl.SSLContextFactoryProvider;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.PemConfigUtil;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemotingServiceImpl
implements RemotingService,
ServerConnectionLifeCycleListener {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final int ACCEPTOR_STOP_TIMEOUT = 3000;
    private static final int UPDATE_ACCEPTORS_STOP_TIMEOUT = 5000;
    private volatile boolean started = false;
    private final Map<String, TransportConfiguration> acceptorsConfig;
    private final List<BaseInterceptor> incomingInterceptors = new CopyOnWriteArrayList<BaseInterceptor>();
    private final List<BaseInterceptor> outgoingInterceptors = new CopyOnWriteArrayList<BaseInterceptor>();
    private final Map<String, Acceptor> acceptors = new HashMap<String, Acceptor>();
    private final ConcurrentMap<Object, ConnectionEntry> connections = new ConcurrentHashMap<Object, ConnectionEntry>();
    private final ReusableLatch connectionCountLatch = new ReusableLatch(0);
    private final ActiveMQServer server;
    private final ManagementService managementService;
    private ExecutorService threadPool;
    private final Executor flushExecutor;
    private final ScheduledExecutorService scheduledThreadPool;
    private FailureCheckAndFlushThread failureCheckAndFlushThread;
    private final ClusterManager clusterManager;
    private final Map<String, ProtocolManagerFactory> protocolMap = new ConcurrentHashMap<String, ProtocolManagerFactory>();
    private ActiveMQPrincipal defaultInvmSecurityPrincipal;
    private ServiceRegistry serviceRegistry;
    private boolean paused = false;
    private AtomicLong totalConnectionCount = new AtomicLong(0L);
    private long connectionTtlCheckInterval;

    public RemotingServiceImpl(ClusterManager clusterManager, Configuration config, ActiveMQServer server, ManagementService managementService, ScheduledExecutorService scheduledThreadPool, List<ProtocolManagerFactory> protocolManagerFactories, Executor flushExecutor, ServiceRegistry serviceRegistry) {
        this.serviceRegistry = serviceRegistry;
        this.acceptorsConfig = config.getAcceptorConfigurations() != null && !config.getAcceptorConfigurations().isEmpty() ? config.getAcceptorConfigurations().stream().collect(Collectors.toMap(c -> c.getName(), Function.identity())) : new HashMap<String, TransportConfiguration>();
        this.server = server;
        this.clusterManager = clusterManager;
        this.setInterceptors(config);
        this.managementService = managementService;
        this.scheduledThreadPool = scheduledThreadPool;
        CoreProtocolManagerFactory coreProtocolManagerFactory = new CoreProtocolManagerFactory();
        MessagePersister.registerProtocol(coreProtocolManagerFactory);
        this.flushExecutor = flushExecutor;
        ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getModuleName(), coreProtocolManagerFactory.getProtocols()[0]);
        this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory);
        if (config.isResolveProtocols()) {
            this.resolveProtocols(this.getClass().getClassLoader());
            if (this.getClass().getClassLoader() != Thread.currentThread().getContextClassLoader()) {
                this.resolveProtocols(Thread.currentThread().getContextClassLoader());
            }
        }
        if (protocolManagerFactories != null) {
            this.loadProtocolManagerFactories(protocolManagerFactories);
        }
        this.connectionTtlCheckInterval = config.getConnectionTtlCheckInterval();
    }

    private void setInterceptors(Configuration configuration) {
        this.incomingInterceptors.addAll(this.serviceRegistry.getIncomingInterceptors(configuration.getIncomingInterceptorClassNames()));
        this.outgoingInterceptors.addAll(this.serviceRegistry.getOutgoingInterceptors(configuration.getOutgoingInterceptorClassNames()));
    }

    @Override
    public Map<String, ProtocolManagerFactory> getProtocolFactoryMap() {
        return this.protocolMap;
    }

    @Override
    public synchronized void start() throws Exception {
        if (this.started) {
            return;
        }
        logger.trace("Starting remoting service {}", (Object)this);
        this.paused = false;
        this.threadPool = Executors.newCachedThreadPool(AccessController.doPrivileged(() -> new ActiveMQThreadFactory(this.server.getThreadGroupName("remoting-service"), false, Thread.currentThread().getContextClassLoader())));
        for (TransportConfiguration info : this.acceptorsConfig.values()) {
            this.createAcceptor(info);
        }
        this.failureCheckAndFlushThread = new FailureCheckAndFlushThread(this.connectionTtlCheckInterval);
        this.failureCheckAndFlushThread.start();
        this.started = true;
    }

    @Override
    public Acceptor createAcceptor(String name, String uri) throws Exception {
        List<TransportConfiguration> configurations = ConfigurationUtils.parseAcceptorURI(name, uri);
        return this.createAcceptor(configurations.get(0));
    }

    @Override
    public Acceptor createAcceptor(TransportConfiguration info) {
        Map config;
        Acceptor acceptor = null;
        try {
            String protocols;
            AcceptorFactory factory = this.server.getServiceRegistry().getAcceptorFactory(info.getName(), info.getFactoryClassName());
            Map<String, ProtocolManagerFactory> selectedProtocolFactories = new ConcurrentHashMap<String, ProtocolManagerFactory>();
            String protocol = ConfigurationHelper.getStringProperty((String)"protocol", null, (Map)info.getParams());
            if (protocol != null) {
                ActiveMQServerLogger.LOGGER.warnDeprecatedProtocol();
                this.locateProtocols(protocol, info, selectedProtocolFactories);
            }
            if ((protocols = ConfigurationHelper.getStringProperty((String)"protocols", null, (Map)info.getParams())) != null) {
                this.locateProtocols(protocols, info, selectedProtocolFactories);
            }
            ClusterConnection clusterConnection = this.lookupClusterConnection(info);
            if (selectedProtocolFactories.isEmpty()) {
                selectedProtocolFactories = this.protocolMap;
            }
            ConcurrentHashMap<String, ProtocolManager> selectedProtocols = new ConcurrentHashMap<String, ProtocolManager>();
            for (Map.Entry<String, ProtocolManagerFactory> entry : selectedProtocolFactories.entrySet()) {
                selectedProtocols.put(entry.getKey(), entry.getValue().createProtocolManager(this.server, info.getCombinedParams(), this.incomingInterceptors, this.outgoingInterceptors));
            }
            acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getParams(), new DelegatingBufferHandler(), this, this.threadPool, this.scheduledThreadPool, selectedProtocols, this.server.getThreadGroupName("remoting-" + info.getName()), this.server.getMetricsManager());
            if (this.defaultInvmSecurityPrincipal != null && acceptor.isUnsecurable()) {
                acceptor.setDefaultActiveMQPrincipal(this.defaultInvmSecurityPrincipal);
            }
            this.acceptors.put(info.getName(), acceptor);
            if (this.managementService != null) {
                acceptor.setNotificationService(this.managementService);
                this.managementService.registerAcceptor(acceptor, info);
            }
        }
        catch (Exception e) {
            ActiveMQServerLogger.LOGGER.errorCreatingAcceptor(info.getName(), e);
        }
        if (this.server.getConfiguration().getConfigurationFileRefreshPeriod() > 0L && ConfigurationHelper.getBooleanProperty((String)"sslAutoReload", (boolean)false, (Map)(config = info.getCombinedParams()))) {
            this.addAcceptorStoreReloadCallback(info.getName(), RemotingServiceImpl.fileUrlFrom(config.get("keyStorePath")), RemotingServiceImpl.storeTypeFrom(config.get("keyStoreType")));
            this.addAcceptorStoreReloadCallback(info.getName(), RemotingServiceImpl.fileUrlFrom(config.get("trustStorePath")), RemotingServiceImpl.storeTypeFrom(config.get("trustStoreType")));
        }
        return acceptor;
    }

    @Override
    public Map<String, Acceptor> getAcceptors() {
        return this.acceptors;
    }

    @Override
    public void destroyAcceptor(String name) throws Exception {
        Acceptor acceptor = this.acceptors.get(name);
        if (acceptor != null) {
            acceptor.stop();
            this.acceptors.remove(name);
        }
    }

    @Override
    public synchronized void startAcceptors() throws Exception {
        if (this.isStarted()) {
            for (Acceptor a : this.acceptors.values()) {
                try {
                    if (!a.isAutoStart()) continue;
                    a.start();
                }
                catch (Throwable t) {
                    ActiveMQServerLogger.LOGGER.errorStartingAcceptor(a.getName(), a.getConfiguration());
                    throw t;
                }
            }
        }
    }

    @Override
    public synchronized void allowInvmSecurityOverride(ActiveMQPrincipal principal) {
        this.defaultInvmSecurityPrincipal = principal;
        for (Acceptor acceptor : this.acceptors.values()) {
            if (!acceptor.isUnsecurable()) continue;
            acceptor.setDefaultActiveMQPrincipal(principal);
        }
    }

    @Override
    public synchronized void pauseAcceptors() {
        if (!this.started) {
            return;
        }
        this.paused = true;
        for (Acceptor acceptor : this.acceptors.values()) {
            try {
                acceptor.pause();
            }
            catch (Exception e) {
                ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptor.getName());
            }
        }
    }

    @Override
    public synchronized boolean isPaused() {
        return this.paused;
    }

    @Override
    public synchronized void freeze(String scaleDownNodeID, CoreRemotingConnection connectionToKeepOpen) {
        if (!this.started) {
            return;
        }
        this.failureCheckAndFlushThread.close(false);
        HashMap<Object, ConnectionEntry> connectionEntries = new HashMap<Object, ConnectionEntry>(this.connections);
        for (Map.Entry entry : connectionEntries.entrySet()) {
            RemotingConnection conn = ((ConnectionEntry)entry.getValue()).connection;
            if (conn.equals(connectionToKeepOpen)) continue;
            logger.trace("Sending connection.disconnection packet to {}", (Object)conn);
            if (conn.isClient()) continue;
            conn.disconnect(scaleDownNodeID, false);
            this.removeConnection(entry.getKey());
        }
    }

    @Override
    public void notifyStop() {
        for (Acceptor acceptor : this.acceptors.values()) {
            logger.debug("send stop notifications on acceptor {}", (Object)acceptor);
            try {
                acceptor.notifyStop();
            }
            catch (Throwable t) {
                ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptor.getName());
            }
        }
    }

    @Override
    public void prepareStop(boolean criticalError, Set<RemotingConnection> ignoreList) throws Exception {
        for (Acceptor acceptor : this.acceptors.values()) {
            logger.debug("Pausing acceptor {}", (Object)acceptor);
            try {
                acceptor.pause();
            }
            catch (Throwable t) {
                ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptor.getName());
            }
        }
        logger.debug("Sending disconnect on client connections");
        HashSet connectionEntries = new HashSet(this.connections.values());
        for (ConnectionEntry entry : connectionEntries) {
            RemotingConnection conn = entry.connection;
            if (ignoreList.contains(conn)) {
                logger.debug("ignoring connection {} during the close", (Object)conn);
                continue;
            }
            logger.debug("Sending disconnect on connection {} from server {}", conn.getID(), (Object)this.server);
            conn.disconnect(criticalError);
        }
    }

    @Override
    public void stop(boolean criticalError) throws Exception {
        boolean ok;
        if (!this.started) {
            return;
        }
        SSLContextFactoryProvider.getSSLContextFactory().clearSSLContexts();
        OpenSSLContextFactory openSSLContextFactory = OpenSSLContextFactoryProvider.getOpenSSLContextFactory();
        if (openSSLContextFactory != null) {
            openSSLContextFactory.clearSslContexts();
        }
        this.failureCheckAndFlushThread.close(criticalError);
        this.prepareStop(criticalError, Collections.emptySet());
        CountDownLatch acceptorCountDownLatch = new CountDownLatch(this.acceptors.size());
        for (Acceptor acceptor : this.acceptors.values()) {
            try {
                acceptor.asyncStop(acceptorCountDownLatch::countDown);
            }
            catch (Throwable t) {
                ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptor.getName());
            }
        }
        acceptorCountDownLatch.await(3000L, TimeUnit.MILLISECONDS);
        this.acceptors.clear();
        this.connections.clear();
        this.connectionCountLatch.setCount(0);
        if (this.managementService != null) {
            this.managementService.unregisterAcceptors();
        }
        this.threadPool.shutdown();
        if (!criticalError && !(ok = this.threadPool.awaitTermination(10000L, TimeUnit.MILLISECONDS))) {
            ActiveMQServerLogger.LOGGER.timeoutRemotingThreadPool();
        }
        this.started = false;
    }

    @Override
    public Acceptor getAcceptor(String name) {
        return this.acceptors.get(name);
    }

    @Override
    public boolean isStarted() {
        return this.started;
    }

    @Override
    public RemotingConnection getConnection(Object remotingConnectionID) {
        ConnectionEntry entry = (ConnectionEntry)this.connections.get(remotingConnectionID);
        if (entry != null) {
            return entry.connection;
        }
        ActiveMQServerLogger.LOGGER.errorRemovingConnection();
        return null;
    }

    public ConnectionEntry getConnectionEntry(Object remotingConnectionID) {
        ConnectionEntry entry = (ConnectionEntry)this.connections.get(remotingConnectionID);
        if (entry != null) {
            return entry;
        }
        return null;
    }

    @Override
    public RemotingConnection removeConnection(Object remotingConnectionID) {
        ConnectionEntry entry = (ConnectionEntry)this.connections.remove(remotingConnectionID);
        if (entry != null) {
            if (AuditLogger.isResourceLoggingEnabled()) {
                AuditLogger.destroyedConnection((String)entry.connection.getProtocolName(), (Object)entry.connection.getID(), (Subject)entry.connection.getSubject(), (String)entry.connection.getRemoteAddress());
            }
            if (logger.isDebugEnabled()) {
                logger.debug("RemotingServiceImpl::removing succeeded connection ID {}, we now have {} connections", remotingConnectionID, (Object)this.connections.size());
            }
            this.connectionCountLatch.countDown();
            return entry.connection;
        }
        logger.debug("The connectionID::{} was already removed by some other module", remotingConnectionID);
        return null;
    }

    @Override
    public synchronized Set<RemotingConnection> getConnections() {
        HashSet<RemotingConnection> conns = new HashSet<RemotingConnection>(this.connections.size());
        for (ConnectionEntry entry : this.connections.values()) {
            conns.add(entry.connection);
        }
        return conns;
    }

    public synchronized Set<ConnectionEntry> getConnectionEntries() {
        HashSet<ConnectionEntry> conns = new HashSet<ConnectionEntry>(this.connections.size());
        for (ConnectionEntry entry : this.connections.values()) {
            conns.add(entry);
        }
        return conns;
    }

    @Override
    public int getConnectionCount() {
        return this.connections.size();
    }

    @Override
    public long getTotalConnectionCount() {
        return this.totalConnectionCount.get();
    }

    @Override
    public synchronized ReusableLatch getConnectionCountLatch() {
        return this.connectionCountLatch;
    }

    @Override
    public void loadProtocolServices(List<ActiveMQComponent> protocolServices) {
        for (ProtocolManagerFactory protocolManagerFactory : this.protocolMap.values()) {
            protocolManagerFactory.loadProtocolServices(this.server, protocolServices);
        }
    }

    @Override
    public void updateProtocolServices(List<ActiveMQComponent> protocolServices) throws Exception {
        this.updateAcceptors();
        for (ProtocolManagerFactory protocolManagerFactory : this.protocolMap.values()) {
            protocolManagerFactory.updateProtocolServices(this.server, protocolServices);
        }
    }

    /*
     * WARNING - void declaration
     */
    private void updateAcceptors() throws Exception {
        void var7_16;
        Set updatedConfigurationSet = Objects.requireNonNullElse(this.server.getConfiguration().getAcceptorConfigurations(), Collections.emptySet());
        Map updatedConfiguration = updatedConfigurationSet.stream().collect(Collectors.toMap(c -> c.getName(), Function.identity()));
        HashSet<TransportConfiguration> acceptorsToStop = new HashSet<TransportConfiguration>();
        HashSet<TransportConfiguration> acceptorsToCreate = new HashSet<TransportConfiguration>();
        for (TransportConfiguration transportConfiguration : updatedConfiguration.values()) {
            TransportConfiguration transportConfiguration2 = this.acceptorsConfig.get(transportConfiguration.getName());
            if (transportConfiguration2 == null) {
                acceptorsToCreate.add(transportConfiguration);
                continue;
            }
            if (transportConfiguration2.equals((Object)transportConfiguration)) continue;
            acceptorsToCreate.add(transportConfiguration);
            acceptorsToStop.add(transportConfiguration);
        }
        for (TransportConfiguration transportConfiguration : this.acceptorsConfig.values()) {
            if (updatedConfiguration.containsKey(transportConfiguration.getName())) continue;
            acceptorsToStop.add(transportConfiguration);
        }
        this.acceptorsConfig.clear();
        this.acceptorsConfig.putAll(updatedConfiguration);
        CountDownLatch acceptorsStoppedLatch = new CountDownLatch(acceptorsToStop.size());
        for (TransportConfiguration transportConfiguration : acceptorsToStop) {
            Acceptor acceptor = this.acceptors.remove(transportConfiguration.getName());
            if (acceptor == null) continue;
            Map acceptorToStopParams = transportConfiguration.getCombinedParams();
            this.removeAcceptorStoreReloadCallback(transportConfiguration.getName(), RemotingServiceImpl.fileUrlFrom(acceptorToStopParams.get("keyStorePath")), RemotingServiceImpl.storeTypeFrom(acceptorToStopParams.get("keyStoreType")));
            this.removeAcceptorStoreReloadCallback(transportConfiguration.getName(), RemotingServiceImpl.fileUrlFrom(acceptorToStopParams.get("trustStorePath")), RemotingServiceImpl.storeTypeFrom(acceptorToStopParams.get("trustStoreType")));
            try {
                this.managementService.unregisterAcceptor(acceptor.getName());
            }
            catch (Throwable t) {
                ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptor.getName());
            }
            try {
                acceptor.notifyStop();
            }
            catch (Throwable t) {
                ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptor.getName());
            }
            try {
                acceptor.pause();
            }
            catch (Throwable t) {
                ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptor.getName());
            }
            try {
                acceptor.asyncStop(acceptorsStoppedLatch::countDown);
            }
            catch (Throwable t) {
                ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptor.getName());
            }
        }
        if (!acceptorsStoppedLatch.await(5000L, TimeUnit.MILLISECONDS)) {
            logger.warn("Timed out waiting on removed or updated acceptors stopping.");
        }
        ArrayList<Acceptor> arrayList = new ArrayList<Acceptor>();
        for (TransportConfiguration candidateConfiguration : acceptorsToCreate) {
            Acceptor acceptor = this.createAcceptor(candidateConfiguration);
            if (!this.isStarted() || !acceptor.isAutoStart()) continue;
            arrayList.add(acceptor);
        }
        Object var7_15 = null;
        for (Acceptor acceptor : arrayList) {
            try {
                acceptor.start();
            }
            catch (Exception e) {
                ActiveMQServerLogger.LOGGER.errorStartingAcceptor(acceptor.getName(), acceptor.getConfiguration());
                if (var7_16 != null) continue;
                Exception exception = e;
            }
        }
        if (var7_16 != null) {
            throw var7_16;
        }
    }

    private ProtocolManagerFactory getProtocolManager(String protocol) {
        return this.protocolMap.get(protocol);
    }

    public void connectionCreated(ActiveMQComponent component, Connection connection, ProtocolManager protocol) {
        if (this.server == null) {
            throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
        }
        ConnectionEntry entry = protocol.createConnectionEntry((Acceptor)component, connection);
        try {
            if (this.server.hasBrokerConnectionPlugins()) {
                this.server.callBrokerConnectionPlugins(plugin -> plugin.afterCreateConnection(entry.connection));
            }
        }
        catch (ActiveMQException t) {
            logger.warn("Error executing afterCreateConnection plugin method: {}", (Object)t.getMessage(), (Object)t);
            throw new IllegalStateException(t.getMessage(), t.getCause());
        }
        logger.trace("Connection created {}", (Object)connection);
        this.addConnectionEntry(connection, entry);
        this.connectionCountLatch.countUp();
        this.totalConnectionCount.incrementAndGet();
    }

    @Override
    public void addConnectionEntry(Connection connection, ConnectionEntry entry) {
        this.connections.put(connection.getID(), entry);
        if (AuditLogger.isResourceLoggingEnabled()) {
            AuditLogger.createdConnection((String)(connection.getProtocolConnection() == null ? null : connection.getProtocolConnection().getProtocolName()), (Object)connection.getID(), (String)connection.getRemoteAddress());
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Adding connection {}, we now have {} on server {}", new Object[]{connection.getID(), this.connections.size(), this.server});
        }
    }

    public void connectionDestroyed(Object connectionID, boolean failed) {
        if (logger.isTraceEnabled()) {
            logger.trace("Connection removed {} from server {}", new Object[]{connectionID, this.server, new Exception("trace")});
        }
        if (failed) {
            this.issueFailure(connectionID, (ActiveMQException)new ActiveMQRemoteDisconnectException());
        } else {
            this.issueClose(connectionID);
        }
    }

    private void issueFailure(Object connectionID, ActiveMQException e) {
        ConnectionEntry conn = (ConnectionEntry)this.connections.get(connectionID);
        if (conn != null && !conn.connection.isSupportReconnect()) {
            RemotingConnection removedConnection = this.removeConnection(connectionID);
            if (removedConnection != null) {
                try {
                    if (this.server.hasBrokerConnectionPlugins()) {
                        this.server.callBrokerConnectionPlugins(plugin -> plugin.afterDestroyConnection(removedConnection));
                    }
                }
                catch (ActiveMQException t) {
                    logger.warn("Error executing afterDestroyConnection plugin method: {}", (Object)t.getMessage(), (Object)t);
                    conn.connection.fail(t);
                    return;
                }
            }
            conn.connection.fail(e);
        }
    }

    private void issueClose(Object connectionID) {
        ConnectionEntry conn = (ConnectionEntry)this.connections.get(connectionID);
        if (conn != null && !conn.connection.isSupportReconnect()) {
            RemotingConnection removedConnection = this.removeConnection(connectionID);
            if (removedConnection != null) {
                try {
                    if (this.server.hasBrokerConnectionPlugins()) {
                        this.server.callBrokerConnectionPlugins(plugin -> plugin.afterDestroyConnection(removedConnection));
                    }
                }
                catch (ActiveMQException t) {
                    logger.warn("Error executing afterDestroyConnection plugin method: {}", (Object)t.getMessage(), (Object)t);
                }
            }
            conn.connection.close();
        }
    }

    public void connectionException(Object connectionID, ActiveMQException me) {
        this.issueFailure(connectionID, me);
    }

    public void connectionReadyForWrites(Object connectionID, boolean ready) {
    }

    @Override
    public void addIncomingInterceptor(BaseInterceptor interceptor) {
        this.incomingInterceptors.add(interceptor);
        this.updateProtocols();
    }

    @Override
    public List<BaseInterceptor> getIncomingInterceptors() {
        return Collections.unmodifiableList(this.incomingInterceptors);
    }

    @Override
    public boolean removeIncomingInterceptor(BaseInterceptor interceptor) {
        if (this.incomingInterceptors.remove(interceptor)) {
            this.updateProtocols();
            return true;
        }
        return false;
    }

    @Override
    public void addOutgoingInterceptor(BaseInterceptor interceptor) {
        this.outgoingInterceptors.add(interceptor);
        this.updateProtocols();
    }

    @Override
    public List<BaseInterceptor> getOutgoinInterceptors() {
        return Collections.unmodifiableList(this.outgoingInterceptors);
    }

    @Override
    public boolean removeOutgoingInterceptor(BaseInterceptor interceptor) {
        if (this.outgoingInterceptors.remove(interceptor)) {
            this.updateProtocols();
            return true;
        }
        return false;
    }

    private ClusterConnection lookupClusterConnection(TransportConfiguration acceptorConfig) {
        String clusterConnectionName = (String)acceptorConfig.getParams().get("clusterConnection");
        ClusterConnection clusterConnection = null;
        if (clusterConnectionName != null) {
            clusterConnection = this.clusterManager.getClusterConnection(clusterConnectionName);
        }
        if (clusterConnection == null) {
            clusterConnection = this.clusterManager.getDefaultConnection(acceptorConfig);
        }
        return clusterConnection;
    }

    protected void updateProtocols() {
        for (Acceptor acceptor : this.acceptors.values()) {
            acceptor.updateInterceptors(this.incomingInterceptors, this.outgoingInterceptors);
        }
    }

    private void locateProtocols(String protocolList, Object transportConfig, Map<String, ProtocolManagerFactory> protocolMap) {
        String[] protocolsSplit;
        for (String protocolItem : protocolsSplit = protocolList.split(",")) {
            ProtocolManagerFactory protocolManagerFactory = this.protocolMap.get(protocolItem);
            if (protocolManagerFactory == null) {
                ActiveMQServerLogger.LOGGER.noProtocolManagerFound(protocolItem, transportConfig.toString());
                continue;
            }
            protocolMap.put(protocolItem, protocolManagerFactory);
        }
    }

    private void resolveProtocols(ClassLoader loader) {
        ServiceLoader<ProtocolManagerFactory> serviceLoader = ServiceLoader.load(ProtocolManagerFactory.class, loader);
        this.loadProtocolManagerFactories(serviceLoader);
    }

    private void loadProtocolManagerFactories(Iterable<ProtocolManagerFactory> protocolManagerFactoryCollection) {
        for (ProtocolManagerFactory next : protocolManagerFactoryCollection) {
            String[] protocols;
            MessagePersister.registerProtocol(next);
            for (String protocol : protocols = next.getProtocols()) {
                ActiveMQServerLogger.LOGGER.addingProtocolSupport(next.getModuleName(), protocol);
                this.protocolMap.put(protocol, next);
            }
        }
    }

    private void removeAcceptorStoreReloadCallback(String acceptorName, URL storeURL, String storeType) {
        if (storeURL != null) {
            ReloadManager reloadManager = this.server.getReloadManager();
            reloadManager.removeCallbacks(storeURL);
            if (PemConfigUtil.isPemConfigStoreType((String)storeType)) {
                String[] sources = null;
                try (InputStream pemConfigStream = storeURL.openStream();){
                    sources = PemConfigUtil.parseSources((InputStream)pemConfigStream);
                }
                catch (IOException e) {
                    ActiveMQServerLogger.LOGGER.skipSSLAutoReloadForSourcesOfStore(storeURL.getPath(), e.toString());
                }
                if (sources != null) {
                    for (String source : sources) {
                        URL sourceURL = RemotingServiceImpl.fileUrlFrom(source);
                        if (sourceURL == null) continue;
                        reloadManager.removeCallbacks(sourceURL);
                    }
                }
            }
        }
    }

    private void addAcceptorStoreReloadCallback(String acceptorName, URL storeURL, String storeType) {
        if (storeURL != null) {
            this.server.getReloadManager().addCallback(storeURL, uri -> {
                Object targetControl;
                if (this.managementService != null && (targetControl = this.managementService.getResource("acceptor." + acceptorName)) instanceof AcceptorControl) {
                    AcceptorControl acceptorControl = (AcceptorControl)targetControl;
                    acceptorControl.reload();
                }
            });
            if (PemConfigUtil.isPemConfigStoreType((String)storeType)) {
                String[] sources = null;
                try (InputStream pemConfigStream = storeURL.openStream();){
                    sources = PemConfigUtil.parseSources((InputStream)pemConfigStream);
                }
                catch (IOException e) {
                    ActiveMQServerLogger.LOGGER.skipSSLAutoReloadForSourcesOfStore(storeURL.getPath(), e.toString());
                }
                if (sources != null) {
                    for (String source : sources) {
                        URL sourceURL = RemotingServiceImpl.fileUrlFrom(source);
                        if (sourceURL == null) continue;
                        this.addAcceptorStoreReloadCallback(acceptorName, sourceURL, null);
                    }
                }
            }
        }
    }

    private static URL fileUrlFrom(Object o) {
        if (o instanceof String) {
            String string = (String)o;
            try {
                return new File(string).toURI().toURL();
            }
            catch (MalformedURLException malformedURLException) {
                // empty catch block
            }
        }
        return null;
    }

    private static String storeTypeFrom(Object o) {
        if (o instanceof String) {
            String string = (String)o;
            return string;
        }
        return null;
    }

    private final class FailureCheckAndFlushThread
    extends Thread {
        private final long pauseInterval;
        private volatile boolean closed;
        private final CountDownLatch latch;

        FailureCheckAndFlushThread(long pauseInterval) {
            super("activemq-failure-check-thread");
            this.latch = new CountDownLatch(1);
            this.pauseInterval = pauseInterval;
        }

        public void close(boolean criticalError) {
            this.closed = true;
            this.latch.countDown();
            if (!criticalError) {
                try {
                    this.join();
                }
                catch (InterruptedException e) {
                    throw new ActiveMQInterruptedException((Throwable)e);
                }
            }
        }

        @Override
        public void run() {
            while (!this.closed) {
                try {
                    RemotingConnection conn;
                    HashSet<Pair> toRemove = new HashSet<Pair>();
                    for (ConnectionEntry entry : RemotingServiceImpl.this.connections.values()) {
                        conn = entry.connection;
                        long lastCheck = entry.lastCheck;
                        long ttl = entry.ttl;
                        long now = System.currentTimeMillis();
                        boolean flush = true;
                        if (ttl != -1L) {
                            if (!conn.checkDataReceived()) {
                                if (now >= lastCheck + ttl) {
                                    toRemove.add(new Pair(conn.getID(), (Object)ttl));
                                    flush = false;
                                }
                            } else {
                                entry.lastCheck = now;
                            }
                        }
                        if (!flush) continue;
                        RemotingServiceImpl.this.flushExecutor.execute(() -> {
                            try {
                                conn.scheduledFlush();
                            }
                            catch (Throwable e) {
                                ActiveMQServerLogger.LOGGER.failedToFlushOutstandingDataFromTheConnection(e);
                            }
                        });
                    }
                    for (Pair pair : toRemove) {
                        conn = RemotingServiceImpl.this.getConnection(pair.getA());
                        if (conn == null) continue;
                        RemotingServiceImpl.this.flushExecutor.execute(() -> conn.fail((ActiveMQException)ActiveMQMessageBundle.BUNDLE.clientExited(conn.getRemoteAddress(), (Long)pair.getB())));
                        RemotingServiceImpl.this.removeConnection(pair.getA());
                    }
                    if (!this.latch.await(this.pauseInterval, TimeUnit.MILLISECONDS)) continue;
                    return;
                }
                catch (Throwable e) {
                    ActiveMQServerLogger.LOGGER.errorOnFailureCheck(e);
                }
            }
        }
    }

    private final class DelegatingBufferHandler
    implements BufferHandler {
        private DelegatingBufferHandler() {
        }

        public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
            ConnectionEntry conn = (ConnectionEntry)RemotingServiceImpl.this.connections.get(connectionID);
            if (conn != null) {
                try {
                    conn.connection.bufferReceived(connectionID, buffer);
                }
                catch (RuntimeException e) {
                    ActiveMQServerLogger.LOGGER.disconnectCritical("Error decoding buffer", e);
                    conn.connection.fail(new ActiveMQException(e.getMessage()));
                }
            } else {
                logger.trace("ConnectionID = {} was already closed, so ignoring packet", connectionID);
            }
        }
    }
}

