/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.remoting;

import com.alipay.remoting.Connection;
import com.alipay.remoting.ConnectionEventHandler;
import com.alipay.remoting.ConnectionEventListener;
import com.alipay.remoting.ConnectionHeartbeatManager;
import com.alipay.remoting.ConnectionManager;
import com.alipay.remoting.ConnectionPool;
import com.alipay.remoting.ConnectionSelectStrategy;
import com.alipay.remoting.NamedThreadFactory;
import com.alipay.remoting.RandomSelectStrategy;
import com.alipay.remoting.RemotingAddressParser;
import com.alipay.remoting.Scannable;
import com.alipay.remoting.Url;
import com.alipay.remoting.config.ConfigManager;
import com.alipay.remoting.config.switches.GlobalSwitch;
import com.alipay.remoting.connection.ConnectionFactory;
import com.alipay.remoting.exception.RemotingException;
import com.alipay.remoting.log.BoltLoggerFactory;
import com.alipay.remoting.util.FutureTaskUtil;
import com.alipay.remoting.util.RunStateRecordedFutureTask;
import com.alipay.remoting.util.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;

public class DefaultConnectionManager
implements ConnectionManager,
ConnectionHeartbeatManager,
Scannable {
    private static final Logger logger = BoltLoggerFactory.getLogger("CommonDefault");
    private static final int DEFAULT_EXPIRE_TIME = 600000;
    private static final int DEFAULT_RETRY_TIMES = 2;
    private int minPoolSize = ConfigManager.conn_create_tp_min_size();
    private int maxPoolSize = ConfigManager.conn_create_tp_max_size();
    private int queueSize = ConfigManager.conn_create_tp_queue_size();
    private long keepAliveTime = ConfigManager.conn_create_tp_keepalive();
    private volatile boolean executorInitialized;
    private Executor asyncCreateConnectionExecutor;
    private GlobalSwitch globalSwitch;
    protected ConcurrentHashMap<String, RunStateRecordedFutureTask<ConnectionPool>> connTasks = new ConcurrentHashMap();
    protected ConcurrentHashMap<String, FutureTask<Integer>> healTasks = new ConcurrentHashMap();
    protected ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy(this.globalSwitch);
    protected RemotingAddressParser addressParser;
    protected ConnectionFactory connectionFactory;
    protected ConnectionEventHandler connectionEventHandler;
    protected ConnectionEventListener connectionEventListener;

    public DefaultConnectionManager() {
    }

    public DefaultConnectionManager(ConnectionSelectStrategy connectionSelectStrategy) {
        this();
        this.connectionSelectStrategy = connectionSelectStrategy;
    }

    public DefaultConnectionManager(ConnectionSelectStrategy connectionSelectStrategy, ConnectionFactory connectionFactory) {
        this(connectionSelectStrategy);
        this.connectionFactory = connectionFactory;
    }

    public DefaultConnectionManager(ConnectionFactory connectionFactory, RemotingAddressParser addressParser, ConnectionEventHandler connectionEventHandler) {
        this(new RandomSelectStrategy(), connectionFactory);
        this.addressParser = addressParser;
        this.connectionEventHandler = connectionEventHandler;
    }

    public DefaultConnectionManager(ConnectionSelectStrategy connectionSelectStrategy, ConnectionFactory connectionFactory, ConnectionEventHandler connectionEventHandler, ConnectionEventListener connectionEventListener) {
        this(connectionSelectStrategy, connectionFactory);
        this.connectionEventHandler = connectionEventHandler;
        this.connectionEventListener = connectionEventListener;
    }

    public DefaultConnectionManager(ConnectionSelectStrategy connectionSelectStrategy, ConnectionFactory connctionFactory, ConnectionEventHandler connectionEventHandler, ConnectionEventListener connectionEventListener, GlobalSwitch globalSwitch) {
        this(connectionSelectStrategy, connctionFactory, connectionEventHandler, connectionEventListener);
        this.globalSwitch = globalSwitch;
    }

    @Override
    public void init() {
        this.connectionEventHandler.setConnectionManager(this);
        this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
        this.connectionFactory.init(this.connectionEventHandler);
    }

    @Override
    public void add(Connection connection) {
        Set<String> poolKeys = connection.getPoolKeys();
        for (String poolKey : poolKeys) {
            this.add(connection, poolKey);
        }
    }

    @Override
    public void add(Connection connection, String poolKey) {
        ConnectionPool pool = null;
        try {
            pool = this.getConnectionPoolAndCreateIfAbsent(poolKey, new ConnectionPoolCall());
        }
        catch (Exception e) {
            logger.error("[NOTIFYME] Exception occurred when getOrCreateIfAbsent an empty ConnectionPool!", (Throwable)e);
        }
        if (pool != null) {
            pool.add(connection);
        } else {
            logger.error("[NOTIFYME] Connection pool NULL!");
        }
    }

    @Override
    public Connection get(String poolKey) {
        ConnectionPool pool = this.getConnectionPool(this.connTasks.get(poolKey));
        return null == pool ? null : pool.get();
    }

    @Override
    public List<Connection> getAll(String poolKey) {
        ConnectionPool pool = this.getConnectionPool(this.connTasks.get(poolKey));
        return null == pool ? new ArrayList() : pool.getAll();
    }

    @Override
    public Map<String, List<Connection>> getAll() {
        HashMap<String, List<Connection>> allConnections = new HashMap<String, List<Connection>>();
        for (Map.Entry<String, RunStateRecordedFutureTask<ConnectionPool>> entry : this.getConnPools().entrySet()) {
            ConnectionPool pool = FutureTaskUtil.getFutureTaskResult(entry.getValue(), logger);
            if (null == pool) continue;
            allConnections.put(entry.getKey(), pool.getAll());
        }
        return allConnections;
    }

    @Override
    public void remove(Connection connection) {
        if (null == connection) {
            return;
        }
        Set<String> poolKeys = connection.getPoolKeys();
        if (null == poolKeys || poolKeys.isEmpty()) {
            connection.close();
            logger.warn("Remove and close a standalone connection.");
        } else {
            for (String poolKey : poolKeys) {
                this.remove(connection, poolKey);
            }
        }
    }

    @Override
    public void remove(Connection connection, String poolKey) {
        if (null == connection || StringUtils.isBlank(poolKey)) {
            return;
        }
        ConnectionPool pool = this.getConnectionPool(this.connTasks.get(poolKey));
        if (null == pool) {
            connection.close();
            logger.warn("Remove and close a standalone connection.");
        } else {
            pool.removeAndTryClose(connection);
            if (pool.isEmpty()) {
                this.removeTask(poolKey);
                logger.warn("Remove and close the last connection in ConnectionPool with poolKey {}", (Object)poolKey);
            } else {
                logger.warn("Remove and close a connection in ConnectionPool with poolKey {}, {} connections left.", (Object)poolKey, (Object)pool.size());
            }
        }
    }

    @Override
    public void remove(String poolKey) {
        ConnectionPool pool;
        if (StringUtils.isBlank(poolKey)) {
            return;
        }
        RunStateRecordedFutureTask<ConnectionPool> task = this.connTasks.remove(poolKey);
        if (null != task && null != (pool = this.getConnectionPool(task))) {
            pool.removeAllAndTryClose();
            logger.warn("Remove and close all connections in ConnectionPool of poolKey={}", (Object)poolKey);
        }
    }

    @Override
    public void removeAll() {
        if (null == this.connTasks || this.connTasks.isEmpty()) {
            return;
        }
        if (null != this.connTasks && !this.connTasks.isEmpty()) {
            Iterator iter = ((ConcurrentHashMap.KeySetView)this.connTasks.keySet()).iterator();
            while (iter.hasNext()) {
                String poolKey = (String)iter.next();
                this.removeTask(poolKey);
                iter.remove();
            }
            logger.warn("All connection pool and connections have been removed!");
        }
    }

    @Override
    public void check(Connection connection) throws RemotingException {
        if (connection == null) {
            throw new RemotingException("Connection is null when do check!");
        }
        if (connection.getChannel() == null || !connection.getChannel().isActive()) {
            this.remove(connection);
            throw new RemotingException("Check connection failed for address: " + connection.getUrl());
        }
        if (!connection.getChannel().isWritable()) {
            throw new RemotingException("Check connection failed for address: " + connection.getUrl() + ", maybe write overflow!");
        }
    }

    @Override
    public int count(String poolKey) {
        if (StringUtils.isBlank(poolKey)) {
            return 0;
        }
        ConnectionPool pool = this.getConnectionPool(this.connTasks.get(poolKey));
        if (null != pool) {
            return pool.size();
        }
        return 0;
    }

    @Override
    public void scan() {
        if (null != this.connTasks && !this.connTasks.isEmpty()) {
            Iterator iter = ((ConcurrentHashMap.KeySetView)this.connTasks.keySet()).iterator();
            while (iter.hasNext()) {
                String poolKey = (String)iter.next();
                RunStateRecordedFutureTask<ConnectionPool> task = this.connTasks.get(poolKey);
                if (!task.isDone()) {
                    logger.info("task(poolKey={}) is not done, do not scan the connection pool", (Object)poolKey);
                    continue;
                }
                ConnectionPool pool = this.getConnectionPool(task);
                if (null == pool) continue;
                pool.scan();
                if (!pool.isEmpty() || System.currentTimeMillis() - pool.getLastAccessTimestamp() <= 600000L) continue;
                iter.remove();
                logger.warn("Remove expired pool task of poolKey {} which is empty.", (Object)poolKey);
            }
        }
    }

    @Override
    public Connection getAndCreateIfAbsent(Url url) throws InterruptedException, RemotingException {
        ConnectionPool pool = this.getConnectionPoolAndCreateIfAbsent(url.getUniqueKey(), new ConnectionPoolCall(url));
        if (null != pool) {
            return pool.get();
        }
        logger.error("[NOTIFYME] bug detected! pool here must not be null!");
        return null;
    }

    @Override
    public void createConnectionAndHealIfNeed(Url url) throws InterruptedException, RemotingException {
        ConnectionPool pool = this.getConnectionPoolAndCreateIfAbsent(url.getUniqueKey(), new ConnectionPoolCall(url));
        if (null != pool) {
            this.healIfNeed(pool, url);
        } else {
            logger.error("[NOTIFYME] bug detected! pool here must not be null!");
        }
    }

    @Override
    public Connection create(Url url) throws RemotingException {
        Connection conn = null;
        try {
            conn = this.connectionFactory.createConnection(url);
        }
        catch (Exception e) {
            throw new RemotingException("Create connection failed. The address is " + url.getOriginUrl(), e);
        }
        return conn;
    }

    @Override
    public Connection create(String ip, int port, int connectTimeout) throws RemotingException {
        Connection conn = null;
        try {
            conn = this.connectionFactory.createConnection(ip, port, connectTimeout);
        }
        catch (Exception e) {
            throw new RemotingException("Create connection failed. The address is " + ip + ":" + port, e);
        }
        return conn;
    }

    @Override
    public Connection create(String address, int connectTimeout) throws RemotingException {
        Url url = this.addressParser.parse(address);
        url.setConnectTimeout(connectTimeout);
        return this.create(url);
    }

    @Override
    public void disableHeartbeat(Connection connection) {
        if (null != connection) {
            connection.getChannel().attr(Connection.HEARTBEAT_SWITCH).set((Object)false);
        }
    }

    @Override
    public void enableHeartbeat(Connection connection) {
        if (null != connection) {
            connection.getChannel().attr(Connection.HEARTBEAT_SWITCH).set((Object)true);
        }
    }

    private ConnectionPool getConnectionPool(RunStateRecordedFutureTask<ConnectionPool> task) {
        return FutureTaskUtil.getFutureTaskResult(task, logger);
    }

    private ConnectionPool getConnectionPoolAndCreateIfAbsent(String poolKey, Callable<ConnectionPool> callable) throws RemotingException, InterruptedException {
        RunStateRecordedFutureTask<ConnectionPool> initialTask = null;
        ConnectionPool pool = null;
        int retry = 2;
        int timesOfResultNull = 0;
        int timesOfInterrupt = 0;
        for (int i = 0; i < retry && pool == null; ++i) {
            initialTask = this.connTasks.get(poolKey);
            if (null == initialTask) {
                initialTask = new RunStateRecordedFutureTask<ConnectionPool>(callable);
                if (null == (initialTask = this.connTasks.putIfAbsent(poolKey, initialTask))) {
                    initialTask = this.connTasks.get(poolKey);
                    initialTask.run();
                }
            }
            try {
                pool = (ConnectionPool)initialTask.get();
                if (null != pool) continue;
                if (i + 1 < retry) {
                    ++timesOfResultNull;
                    continue;
                }
                this.connTasks.remove(poolKey);
                String errMsg = "Get future task result null for poolKey [" + poolKey + "] after [" + (timesOfResultNull + 1) + "] times try.";
                throw new RemotingException(errMsg);
            }
            catch (InterruptedException e) {
                if (i + 1 < retry) {
                    ++timesOfInterrupt;
                    continue;
                }
                this.connTasks.remove(poolKey);
                logger.warn("Future task of poolKey {} interrupted {} times. InterruptedException thrown and stop retry.", new Object[]{poolKey, timesOfInterrupt + 1, e});
                throw e;
            }
            catch (ExecutionException e) {
                this.connTasks.remove(poolKey);
                Throwable cause = e.getCause();
                if (cause instanceof RemotingException) {
                    throw (RemotingException)cause;
                }
                FutureTaskUtil.launderThrowable(cause);
            }
        }
        return pool;
    }

    private void removeTask(String poolKey) {
        ConnectionPool pool;
        RunStateRecordedFutureTask<ConnectionPool> task = this.connTasks.remove(poolKey);
        if (null != task && null != (pool = FutureTaskUtil.getFutureTaskResult(task, logger))) {
            pool.removeAllAndTryClose();
        }
    }

    private void healIfNeed(ConnectionPool pool, Url url) throws RemotingException, InterruptedException {
        String poolKey = url.getUniqueKey();
        if (pool.isAsyncCreationDone() && pool.size() < url.getConnNum()) {
            FutureTask<Integer> task = this.healTasks.get(poolKey);
            if (null == task) {
                task = new FutureTask<Integer>(new HealConnectionCall(url, pool));
                if (null == (task = this.healTasks.putIfAbsent(poolKey, task))) {
                    task = this.healTasks.get(poolKey);
                    task.run();
                }
            }
            try {
                int numAfterHeal = task.get();
                if (logger.isDebugEnabled()) {
                    logger.debug("[NOTIFYME] - conn num after heal {}, expected {}, warmup {}", new Object[]{numAfterHeal, url.getConnNum(), url.isConnWarmup()});
                }
            }
            catch (InterruptedException e) {
                this.healTasks.remove(poolKey);
                throw e;
            }
            catch (ExecutionException e) {
                this.healTasks.remove(poolKey);
                Throwable cause = e.getCause();
                if (cause instanceof RemotingException) {
                    throw (RemotingException)cause;
                }
                FutureTaskUtil.launderThrowable(cause);
            }
            this.healTasks.remove(poolKey);
        }
    }

    private void doCreate(final Url url, final ConnectionPool pool, final String taskName, int syncCreateNumWhenNotWarmup) throws RemotingException {
        block10: {
            int expectNum;
            int actualNum = pool.size();
            if (actualNum >= (expectNum = url.getConnNum())) break block10;
            if (logger.isDebugEnabled()) {
                logger.debug("actual num {}, expect num {}, task name {}", new Object[]{actualNum, expectNum, taskName});
            }
            if (url.isConnWarmup()) {
                for (int i = actualNum; i < expectNum; ++i) {
                    Connection connection = this.create(url);
                    pool.add(connection);
                }
            } else {
                if (syncCreateNumWhenNotWarmup < 0 || syncCreateNumWhenNotWarmup > url.getConnNum()) {
                    throw new IllegalArgumentException("sync create number when not warmup should be [0," + url.getConnNum() + "]");
                }
                if (syncCreateNumWhenNotWarmup > 0) {
                    for (int i = 0; i < syncCreateNumWhenNotWarmup; ++i) {
                        Connection connection = this.create(url);
                        pool.add(connection);
                    }
                    if (syncCreateNumWhenNotWarmup == url.getConnNum()) {
                        return;
                    }
                }
                this.initializeExecutor();
                pool.markAsyncCreationStart();
                try {
                    this.asyncCreateConnectionExecutor.execute(new Runnable(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        public void run() {
                            try {
                                for (int i = pool.size(); i < url.getConnNum(); ++i) {
                                    Connection conn = null;
                                    try {
                                        conn = DefaultConnectionManager.this.create(url);
                                    }
                                    catch (RemotingException e) {
                                        logger.error("Exception occurred in async create connection thread for {}, taskName {}", new Object[]{url.getUniqueKey(), taskName, e});
                                    }
                                    pool.add(conn);
                                }
                            }
                            finally {
                                pool.markAsyncCreationDone();
                            }
                        }
                    });
                }
                catch (RejectedExecutionException e) {
                    pool.markAsyncCreationDone();
                    throw e;
                }
            }
        }
    }

    private void initializeExecutor() {
        if (!this.executorInitialized) {
            this.executorInitialized = true;
            this.asyncCreateConnectionExecutor = new ThreadPoolExecutor(this.minPoolSize, this.maxPoolSize, this.keepAliveTime, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(this.queueSize), new NamedThreadFactory("Bolt-conn-warmup-executor", true));
        }
    }

    public ConnectionSelectStrategy getConnectionSelectStrategy() {
        return this.connectionSelectStrategy;
    }

    public void setConnectionSelectStrategy(ConnectionSelectStrategy connectionSelectStrategy) {
        this.connectionSelectStrategy = connectionSelectStrategy;
    }

    public RemotingAddressParser getAddressParser() {
        return this.addressParser;
    }

    public void setAddressParser(RemotingAddressParser addressParser) {
        this.addressParser = addressParser;
    }

    public ConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    public void setConnectionFactory(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public ConnectionEventHandler getConnectionEventHandler() {
        return this.connectionEventHandler;
    }

    public void setConnectionEventHandler(ConnectionEventHandler connectionEventHandler) {
        this.connectionEventHandler = connectionEventHandler;
    }

    public ConnectionEventListener getConnectionEventListener() {
        return this.connectionEventListener;
    }

    public void setConnectionEventListener(ConnectionEventListener connectionEventListener) {
        this.connectionEventListener = connectionEventListener;
    }

    public ConcurrentHashMap<String, RunStateRecordedFutureTask<ConnectionPool>> getConnPools() {
        return this.connTasks;
    }

    private class HealConnectionCall
    implements Callable<Integer> {
        private Url url;
        private ConnectionPool pool;

        public HealConnectionCall(Url url, ConnectionPool pool) {
            this.url = url;
            this.pool = pool;
        }

        @Override
        public Integer call() throws Exception {
            DefaultConnectionManager.this.doCreate(this.url, this.pool, this.getClass().getSimpleName(), 0);
            return this.pool.size();
        }
    }

    private class ConnectionPoolCall
    implements Callable<ConnectionPool> {
        private boolean whetherInitConnection;
        private Url url;

        public ConnectionPoolCall() {
            this.whetherInitConnection = false;
        }

        public ConnectionPoolCall(Url url) {
            this.whetherInitConnection = true;
            this.url = url;
        }

        @Override
        public ConnectionPool call() throws Exception {
            ConnectionPool pool = new ConnectionPool(DefaultConnectionManager.this.connectionSelectStrategy);
            if (this.whetherInitConnection) {
                try {
                    DefaultConnectionManager.this.doCreate(this.url, pool, this.getClass().getSimpleName(), 1);
                }
                catch (Exception e) {
                    pool.removeAllAndTryClose();
                    throw e;
                }
            }
            return pool;
        }
    }
}

