/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.nio.tcp;

import com.hazelcast.internal.cluster.impl.BindMessage;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.networking.Channel;
import com.hazelcast.internal.networking.Networking;
import com.hazelcast.internal.util.concurrent.ThreadFactoryImpl;
import com.hazelcast.internal.util.counters.MwCounter;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.LoggingService;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.Connection;
import com.hazelcast.nio.ConnectionListener;
import com.hazelcast.nio.ConnectionManager;
import com.hazelcast.nio.IOService;
import com.hazelcast.nio.IOUtil;
import com.hazelcast.nio.Packet;
import com.hazelcast.nio.tcp.TcpIpAcceptor;
import com.hazelcast.nio.tcp.TcpIpConnection;
import com.hazelcast.nio.tcp.TcpIpConnectionErrorHandler;
import com.hazelcast.nio.tcp.TcpIpConnector;
import com.hazelcast.spi.properties.GroupProperty;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.util.ConcurrencyUtil;
import com.hazelcast.util.ConstructorFunction;
import com.hazelcast.util.Preconditions;
import com.hazelcast.util.ThreadUtil;
import com.hazelcast.util.executor.StripedRunnable;
import com.hazelcast.util.function.Consumer;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class TcpIpConnectionManager
implements ConnectionManager,
Consumer<Packet> {
    private static final int RETRY_NUMBER = 5;
    private static final long DELAY_FACTOR = 100L;
    private static final int SCHEDULER_POOL_SIZE = 4;
    final LoggingService loggingService;
    @Probe(name="connectionListenerCount")
    final Set<ConnectionListener> connectionListeners = new CopyOnWriteArraySet<ConnectionListener>();
    private final boolean spoofingChecks;
    private final IOService ioService;
    private final ConstructorFunction<Address, TcpIpConnectionErrorHandler> monitorConstructor = new ConstructorFunction<Address, TcpIpConnectionErrorHandler>(){

        @Override
        public TcpIpConnectionErrorHandler createNew(Address endpoint) {
            return new TcpIpConnectionErrorHandler(TcpIpConnectionManager.this, endpoint);
        }
    };
    private final ILogger logger;
    @Probe(name="count", level=ProbeLevel.MANDATORY)
    private final ConcurrentHashMap<Address, Connection> connectionsMap = new ConcurrentHashMap(100);
    @Probe(name="monitorCount")
    private final ConcurrentHashMap<Address, TcpIpConnectionErrorHandler> monitors = new ConcurrentHashMap(100);
    @Probe(name="inProgressCount")
    private final Set<Address> connectionsInProgress = Collections.newSetFromMap(new ConcurrentHashMap());
    @Probe(name="acceptedSocketCount", level=ProbeLevel.MANDATORY)
    private final Set<Channel> acceptedChannels = Collections.newSetFromMap(new ConcurrentHashMap());
    @Probe(name="activeCount", level=ProbeLevel.MANDATORY)
    private final Set<TcpIpConnection> activeConnections = Collections.newSetFromMap(new ConcurrentHashMap());
    @Probe(name="textCount", level=ProbeLevel.MANDATORY)
    private final AtomicInteger allTextConnections = new AtomicInteger();
    private final AtomicInteger connectionIdGen = new AtomicInteger();
    private final Networking networking;
    private final MetricsRegistry metricsRegistry;
    private final ServerSocketChannel serverSocketChannel;
    @Probe
    private final MwCounter openedCount = MwCounter.newMwCounter();
    @Probe
    private final MwCounter closedCount = MwCounter.newMwCounter();
    private final ScheduledExecutorService scheduler;
    private volatile TcpIpAcceptor acceptor;
    private volatile boolean live;
    private TcpIpConnector connector;

    public TcpIpConnectionManager(IOService ioService, ServerSocketChannel serverSocketChannel, LoggingService loggingService, MetricsRegistry metricsRegistry, Networking networking) {
        this(ioService, serverSocketChannel, loggingService, metricsRegistry, networking, null);
    }

    public TcpIpConnectionManager(IOService ioService, ServerSocketChannel serverSocketChannel, LoggingService loggingService, MetricsRegistry metricsRegistry, Networking networking, HazelcastProperties properties) {
        this.ioService = ioService;
        this.networking = networking;
        this.serverSocketChannel = serverSocketChannel;
        this.loggingService = loggingService;
        this.logger = loggingService.getLogger(TcpIpConnectionManager.class);
        this.metricsRegistry = metricsRegistry;
        this.connector = new TcpIpConnector(this);
        this.scheduler = new ScheduledThreadPoolExecutor(4, new ThreadFactoryImpl(ThreadUtil.createThreadPoolName(ioService.getHazelcastName(), "TcpIpConnectionManager")));
        this.spoofingChecks = properties != null && properties.getBoolean(GroupProperty.BIND_SPOOFING_CHECKS);
        metricsRegistry.scanAndRegister(this, "tcp.connection");
    }

    public IOService getIoService() {
        return this.ioService;
    }

    public Networking getNetworking() {
        return this.networking;
    }

    public Set<TcpIpConnection> getActiveConnections() {
        return this.activeConnections;
    }

    @Override
    public int getActiveConnectionCount() {
        return this.activeConnections.size();
    }

    @Override
    public int getAllTextConnections() {
        return this.allTextConnections.get();
    }

    @Override
    public int getConnectionCount() {
        return this.connectionsMap.size();
    }

    public void incrementTextConnections() {
        this.allTextConnections.incrementAndGet();
    }

    @Override
    public void addConnectionListener(ConnectionListener listener) {
        Preconditions.checkNotNull(listener, "listener can't be null");
        this.connectionListeners.add(listener);
    }

    @Override
    public void accept(Packet packet) {
        assert (packet.getPacketType() == Packet.Type.BIND);
        BindMessage bind = (BindMessage)this.ioService.getSerializationService().toObject(packet);
        this.bind((TcpIpConnection)packet.getConn(), bind.getLocalAddress(), bind.getTargetAddress(), bind.shouldReply());
    }

    private synchronized boolean bind(TcpIpConnection connection, Address remoteEndPoint, Address localEndpoint, boolean reply) {
        if (this.logger.isFinestEnabled()) {
            this.logger.finest("Binding " + connection + " to " + remoteEndPoint + ", reply is " + reply);
        }
        Address thisAddress = this.ioService.getThisAddress();
        if (!(!this.spoofingChecks || this.ensureValidBindSource(connection, remoteEndPoint) && this.ensureBindNotFromSelf(connection, remoteEndPoint, thisAddress))) {
            return false;
        }
        if (!this.ensureValidBindTarget(connection, remoteEndPoint, localEndpoint, thisAddress)) {
            return false;
        }
        connection.setEndPoint(remoteEndPoint);
        this.ioService.onSuccessfulConnection(remoteEndPoint);
        if (reply) {
            this.sendBindRequest(connection, remoteEndPoint, false);
        }
        if (this.checkAlreadyConnected(connection, remoteEndPoint)) {
            return false;
        }
        return this.registerConnection(remoteEndPoint, connection);
    }

    private boolean ensureValidBindSource(TcpIpConnection connection, Address remoteEndPoint) {
        try {
            InetAddress originalRemoteAddr = connection.getRemoteSocketAddress().getAddress();
            InetAddress presentedRemoteAddr = remoteEndPoint.getInetAddress();
            if (!originalRemoteAddr.equals(presentedRemoteAddr)) {
                String msg = "Wrong bind request from " + originalRemoteAddr + ", identified as " + presentedRemoteAddr;
                this.logger.warning(msg);
                connection.close(msg, null);
                return false;
            }
        }
        catch (UnknownHostException e) {
            String msg = e.getMessage();
            this.logger.warning(msg);
            connection.close(msg, e);
            return false;
        }
        return true;
    }

    private boolean ensureValidBindTarget(TcpIpConnection connection, Address remoteEndPoint, Address localEndpoint, Address thisAddress) {
        if (this.ioService.isSocketBindAny() && !connection.isClient() && !thisAddress.equals(localEndpoint)) {
            String msg = "Wrong bind request from " + remoteEndPoint + "! This node is not the requested endpoint: " + localEndpoint;
            this.logger.warning(msg);
            connection.close(msg, null);
            return false;
        }
        return true;
    }

    private boolean ensureBindNotFromSelf(TcpIpConnection connection, Address remoteEndPoint, Address thisAddress) {
        if (thisAddress.equals(remoteEndPoint)) {
            String msg = "Wrong bind request. Remote endpoint is same to this endpoint.";
            this.logger.warning(msg);
            connection.close(msg, null);
            return false;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized boolean registerConnection(final Address remoteEndPoint, final Connection connection) {
        try {
            if (remoteEndPoint.equals(this.ioService.getThisAddress())) {
                boolean bl = false;
                return bl;
            }
            if (!connection.isAlive()) {
                if (this.logger.isFinestEnabled()) {
                    this.logger.finest(connection + " to " + remoteEndPoint + " is not registered since connection is not active.");
                }
                boolean bl = false;
                return bl;
            }
            if (connection instanceof TcpIpConnection) {
                TcpIpConnection tcpConnection = (TcpIpConnection)connection;
                Address currentEndPoint = tcpConnection.getEndPoint();
                if (currentEndPoint != null && !currentEndPoint.equals(remoteEndPoint)) {
                    throw new IllegalArgumentException(connection + " has already a different endpoint than: " + remoteEndPoint);
                }
                tcpConnection.setEndPoint(remoteEndPoint);
                if (!connection.isClient()) {
                    TcpIpConnectionErrorHandler connectionMonitor = this.getErrorHandler(remoteEndPoint, true);
                    tcpConnection.setErrorHandler(connectionMonitor);
                }
            }
            this.connectionsMap.put(remoteEndPoint, connection);
            this.ioService.getEventService().executeEventCallback(new StripedRunnable(){

                @Override
                public void run() {
                    for (ConnectionListener listener : TcpIpConnectionManager.this.connectionListeners) {
                        listener.connectionAdded(connection);
                    }
                }

                @Override
                public int getKey() {
                    return remoteEndPoint.hashCode();
                }
            });
            boolean bl = true;
            return bl;
        }
        finally {
            this.connectionsInProgress.remove(remoteEndPoint);
        }
    }

    private boolean checkAlreadyConnected(TcpIpConnection connection, Address remoteEndPoint) {
        Connection existingConnection = this.connectionsMap.get(remoteEndPoint);
        if (existingConnection != null && existingConnection.isAlive()) {
            if (existingConnection != connection) {
                if (this.logger.isFinestEnabled()) {
                    this.logger.finest(existingConnection + " is already bound to " + remoteEndPoint + ", new one is " + connection);
                }
                this.activeConnections.add(connection);
            }
            return true;
        }
        return false;
    }

    void sendBindRequest(TcpIpConnection connection, Address remoteEndPoint, boolean reply) {
        connection.setEndPoint(remoteEndPoint);
        this.ioService.onSuccessfulConnection(remoteEndPoint);
        if (this.logger.isFinestEnabled()) {
            this.logger.finest("Sending bind packet to " + remoteEndPoint);
        }
        BindMessage bind = new BindMessage(this.ioService.getThisAddress(), remoteEndPoint, reply);
        byte[] bytes = this.ioService.getSerializationService().toBytes(bind);
        Packet packet = new Packet(bytes).setPacketType(Packet.Type.BIND);
        connection.write(packet);
    }

    Channel createChannel(SocketChannel socketChannel, boolean clientMode) throws Exception {
        Channel channel = this.networking.register(socketChannel, clientMode);
        this.acceptedChannels.add(channel);
        return channel;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    synchronized TcpIpConnection newConnection(Channel channel, Address endpoint) {
        try {
            if (!this.live) {
                throw new IllegalStateException("connection manager is not live!");
            }
            TcpIpConnection connection = new TcpIpConnection(this, this.connectionIdGen.incrementAndGet(), channel);
            connection.setEndPoint(endpoint);
            this.activeConnections.add(connection);
            this.logger.info("Established socket connection between " + channel.localSocketAddress() + " and " + channel.remoteSocketAddress());
            this.openedCount.inc();
            channel.start();
            TcpIpConnection tcpIpConnection = connection;
            return tcpIpConnection;
        }
        finally {
            this.acceptedChannels.remove(channel);
        }
    }

    void failedConnection(Address address, Throwable t, boolean silent) {
        this.connectionsInProgress.remove(address);
        this.ioService.onFailedConnection(address);
        if (!silent) {
            this.getErrorHandler(address, false).onError(t);
        }
    }

    @Override
    public Connection getConnection(Address address) {
        return this.connectionsMap.get(address);
    }

    @Override
    public Connection getOrConnect(Address address) {
        return this.getOrConnect(address, false);
    }

    @Override
    public Connection getOrConnect(Address address, boolean silent) {
        Connection connection = this.connectionsMap.get(address);
        if (connection == null && this.live && this.connectionsInProgress.add(address)) {
            this.connector.asyncConnect(address, silent);
        }
        return connection;
    }

    private TcpIpConnectionErrorHandler getErrorHandler(Address endpoint, boolean reset) {
        TcpIpConnectionErrorHandler monitor = ConcurrencyUtil.getOrPutIfAbsent(this.monitors, endpoint, this.monitorConstructor);
        if (reset) {
            monitor.reset();
        }
        return monitor;
    }

    @Override
    public void onConnectionClose(Connection connection) {
        this.closedCount.inc();
        this.activeConnections.remove(connection);
        Address endPoint = connection.getEndPoint();
        if (endPoint != null) {
            this.connectionsInProgress.remove(endPoint);
            this.connectionsMap.remove(endPoint, connection);
            this.fireConnectionRemovedEvent(connection, endPoint);
        }
    }

    private void fireConnectionRemovedEvent(final Connection connection, final Address endPoint) {
        if (this.live) {
            this.ioService.getEventService().executeEventCallback(new StripedRunnable(){

                @Override
                public void run() {
                    for (ConnectionListener listener : TcpIpConnectionManager.this.connectionListeners) {
                        listener.connectionRemoved(connection);
                    }
                }

                @Override
                public int getKey() {
                    return endPoint.hashCode();
                }
            });
        }
    }

    @Override
    public synchronized void start() {
        if (this.live) {
            return;
        }
        if (!this.serverSocketChannel.isOpen()) {
            throw new IllegalStateException("ConnectionManager is already shutdown. Cannot start!");
        }
        this.live = true;
        this.logger.finest("Starting ConnectionManager and IO selectors.");
        this.networking.start();
        this.startAcceptor();
    }

    private void startAcceptor() {
        if (this.acceptor != null) {
            this.logger.warning("TcpIpAcceptor already is running! Shutting down old acceptor...");
            this.shutdownAcceptor();
        }
        this.acceptor = new TcpIpAcceptor(this.serverSocketChannel, this).start();
        this.metricsRegistry.collectMetrics(this.acceptor);
    }

    @Override
    public synchronized void stop() {
        if (!this.live) {
            return;
        }
        this.live = false;
        this.logger.finest("Stopping ConnectionManager");
        this.shutdownAcceptor();
        for (Channel channel : this.acceptedChannels) {
            IOUtil.closeResource(channel);
        }
        for (Connection connection : this.connectionsMap.values()) {
            this.destroySilently(connection, "TcpIpConnectionManager is stopping");
        }
        for (TcpIpConnection tcpIpConnection : this.activeConnections) {
            this.destroySilently(tcpIpConnection, "TcpIpConnectionManager is stopping");
        }
        this.networking.shutdown();
        this.acceptedChannels.clear();
        this.connectionsInProgress.clear();
        this.connectionsMap.clear();
        this.monitors.clear();
        this.activeConnections.clear();
    }

    private void destroySilently(Connection conn, String reason) {
        if (conn == null) {
            return;
        }
        try {
            conn.close(reason, null);
        }
        catch (Throwable ignore) {
            this.logger.finest(ignore);
        }
    }

    @Override
    public synchronized void shutdown() {
        this.shutdownAcceptor();
        this.closeServerSocket();
        this.stop();
        this.scheduler.shutdownNow();
        this.connectionListeners.clear();
    }

    private void shutdownAcceptor() {
        if (this.acceptor != null) {
            this.acceptor.shutdown();
            this.metricsRegistry.deregister(this.acceptor);
            this.acceptor = null;
        }
    }

    private void closeServerSocket() {
        try {
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("Closing server socket channel: " + this.serverSocketChannel);
            }
            this.serverSocketChannel.close();
        }
        catch (IOException ignore) {
            this.logger.finest(ignore);
        }
    }

    @Override
    @Probe(name="clientCount", level=ProbeLevel.MANDATORY)
    public int getCurrentClientConnections() {
        int count = 0;
        for (TcpIpConnection conn : this.activeConnections) {
            if (!conn.isAlive() || !conn.isClient()) continue;
            ++count;
        }
        return count;
    }

    public boolean isLive() {
        return this.live;
    }

    @Override
    public boolean transmit(Packet packet, Connection connection) {
        Preconditions.checkNotNull(packet, "Packet can't be null");
        if (connection == null) {
            return false;
        }
        return connection.write(packet);
    }

    @Override
    public boolean transmit(Packet packet, Address target) {
        Preconditions.checkNotNull(packet, "Packet can't be null");
        Preconditions.checkNotNull(target, "target can't be null");
        return this.send(packet, target, null);
    }

    private boolean send(Packet packet, Address target, SendTask sendTask) {
        block6: {
            int retries;
            Connection connection = this.getConnection(target);
            if (connection != null) {
                return connection.write(packet);
            }
            if (sendTask == null) {
                sendTask = new SendTask(packet, target);
            }
            if ((retries = sendTask.retries) < 5 && this.ioService.isActive()) {
                this.getOrConnect(target, true);
                try {
                    this.scheduler.schedule(sendTask, (long)(retries + 1) * 100L, TimeUnit.MILLISECONDS);
                    return true;
                }
                catch (RejectedExecutionException e) {
                    if (this.live) {
                        throw e;
                    }
                    if (!this.logger.isFinestEnabled()) break block6;
                    this.logger.finest("Packet send task is rejected. Packet cannot be sent to " + target);
                }
            }
        }
        return false;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("Connections {");
        for (Connection conn : this.connectionsMap.values()) {
            sb.append("\n");
            sb.append(conn);
        }
        sb.append("\nlive=");
        sb.append(this.live);
        sb.append("\n}");
        return sb.toString();
    }

    private final class SendTask
    implements Runnable {
        private final Packet packet;
        private final Address target;
        private volatile int retries;

        private SendTask(Packet packet, Address target) {
            this.packet = packet;
            this.target = target;
        }

        @Override
        @SuppressFBWarnings(value={"VO_VOLATILE_INCREMENT"}, justification="single-writer, many-reader")
        public void run() {
            ++this.retries;
            if (TcpIpConnectionManager.this.logger.isFinestEnabled()) {
                TcpIpConnectionManager.this.logger.finest("Retrying[" + this.retries + "] packet send operation to: " + this.target);
            }
            TcpIpConnectionManager.this.send(this.packet, this.target, this);
        }
    }
}

