/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.net;

import java.io.IOException;
import java.net.MulticastSocket;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.IStage;
import org.apache.cassandra.concurrent.MultiThreadedStage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.AsyncResult;
import org.apache.cassandra.net.ConnectionStatistics;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.FileStreamTask;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.IMessagingService;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDeliveryTask;
import org.apache.cassandra.net.MessageSerializationTask;
import org.apache.cassandra.net.MessagingConfig;
import org.apache.cassandra.net.ResponseVerbHandler;
import org.apache.cassandra.net.SelectorManager;
import org.apache.cassandra.net.TcpConnection;
import org.apache.cassandra.net.TcpConnectionHandler;
import org.apache.cassandra.net.TcpConnectionManager;
import org.apache.cassandra.net.UdpConnection;
import org.apache.cassandra.net.io.SerializerType;
import org.apache.cassandra.utils.Cachetable;
import org.apache.cassandra.utils.GuidGenerator;
import org.apache.cassandra.utils.ICachetable;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;

public class MessagingService
implements IMessagingService {
    private static boolean debugOn_ = false;
    private static int version_ = 1;
    private static SerializerType serializerType_ = SerializerType.BINARY;
    private static byte[] protocol_ = new byte[16];
    public static final String responseVerbHandler_ = "RESPONSE";
    public static final String responseStage_ = "RESPONSE-STAGE";
    private static Map<String, String> reservedVerbs_ = new Hashtable<String, String>();
    private static AtomicBoolean isStreaming_ = new AtomicBoolean(false);
    private static ICachetable<String, IAsyncCallback> callbackMap_;
    private static ICachetable<String, IAsyncResult> taskCompletionMap_;
    private static Set<EndPoint> endPoints_;
    private static Map<EndPoint, SelectionKey> listenSockets_;
    private static Map<String, IVerbHandler> verbHandlers_;
    private static Map<String, MulticastSocket> mCastMembership_;
    private static ExecutorService messageDeserializationExecutor_;
    private static ExecutorService messageSerializerExecutor_;
    private static ExecutorService messageDeserializerExecutor_;
    private static ExecutorService streamExecutor_;
    private static final ReentrantLock lock_;
    private static Map<String, TcpConnectionManager> poolTable_;
    private static boolean bShutdown_;
    private static Logger logger_;
    private static IMessagingService messagingService_;

    public static boolean isDebugOn() {
        return debugOn_;
    }

    public static void debugOn(boolean on) {
        debugOn_ = on;
    }

    public static SerializerType getSerializerType() {
        return serializerType_;
    }

    public static synchronized void serializerType(String type) {
        if (type.equalsIgnoreCase("binary")) {
            serializerType_ = SerializerType.BINARY;
        } else if (type.equalsIgnoreCase("java")) {
            serializerType_ = SerializerType.JAVA;
        } else if (type.equalsIgnoreCase("xml")) {
            serializerType_ = SerializerType.XML;
        }
    }

    public static int getVersion() {
        return version_;
    }

    public static void setVersion(int version) {
        version_ = version;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static IMessagingService getMessagingInstance() {
        if (bShutdown_) {
            lock_.lock();
            try {
                if (bShutdown_) {
                    messagingService_ = new MessagingService();
                    bShutdown_ = false;
                }
            }
            finally {
                lock_.unlock();
            }
        }
        return messagingService_;
    }

    public Object clone() throws CloneNotSupportedException {
        throw new CloneNotSupportedException();
    }

    protected MessagingService() {
        for (ReservedVerbs_ verbs : ReservedVerbs_.values()) {
            reservedVerbs_.put(verbs.toString(), verbs.toString());
        }
        verbHandlers_ = new HashMap<String, IVerbHandler>();
        endPoints_ = new HashSet<EndPoint>();
        int maxSize = MessagingConfig.getMessagingThreadCount();
        callbackMap_ = new Cachetable<String, IAsyncCallback>(2L * DatabaseDescriptor.getRpcTimeout());
        taskCompletionMap_ = new Cachetable<String, IAsyncResult>(2L * DatabaseDescriptor.getRpcTimeout());
        messageDeserializationExecutor_ = new DebuggableThreadPoolExecutor(maxSize, maxSize, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("MESSAGING-SERVICE-POOL"));
        messageSerializerExecutor_ = new DebuggableThreadPoolExecutor(maxSize, maxSize, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("MESSAGE-SERIALIZER-POOL"));
        messageDeserializerExecutor_ = new DebuggableThreadPoolExecutor(maxSize, maxSize, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("MESSAGE-DESERIALIZER-POOL"));
        streamExecutor_ = new DebuggableThreadPoolExecutor("MESSAGE-STREAMING-POOL");
        protocol_ = this.hash("MD5", "FB-MESSAGING".getBytes());
        this.registerVerbHandlers(responseVerbHandler_, new ResponseVerbHandler());
        StageManager.registerStage(responseStage_, new MultiThreadedStage(responseStage_, maxSize));
    }

    public byte[] hash(String type, byte[] data) {
        byte[] result;
        block2: {
            result = null;
            try {
                MessageDigest messageDigest = MessageDigest.getInstance(type);
                result = messageDigest.digest(data);
            }
            catch (Exception e) {
                if (!logger_.isDebugEnabled()) break block2;
                logger_.debug((Object)LogUtil.throwableToString(e));
            }
        }
        return result;
    }

    @Override
    public void listen(EndPoint localEp) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        ServerSocket ss = serverChannel.socket();
        ss.bind(localEp.getInetAddress());
        serverChannel.configureBlocking(false);
        TcpConnectionHandler handler = new TcpConnectionHandler(localEp);
        SelectionKey key = SelectorManager.getSelectorManager().register(serverChannel, handler, 16);
        endPoints_.add(localEp);
        listenSockets_.put(localEp, key);
    }

    @Override
    public void listenUDP(EndPoint localEp) {
        UdpConnection connection = new UdpConnection();
        if (logger_.isDebugEnabled()) {
            logger_.debug((Object)("Starting to listen on " + localEp));
        }
        try {
            connection.init(localEp.getPort());
            endPoints_.add(localEp);
        }
        catch (IOException e) {
            logger_.warn((Object)LogUtil.throwableToString(e));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static TcpConnectionManager getConnectionPool(EndPoint from, EndPoint to) {
        String key = from + ":" + to;
        TcpConnectionManager cp = poolTable_.get(key);
        if (cp == null) {
            lock_.lock();
            try {
                cp = poolTable_.get(key);
                if (cp == null) {
                    cp = new TcpConnectionManager(MessagingConfig.getConnectionPoolInitialSize(), MessagingConfig.getConnectionPoolGrowthFactor(), MessagingConfig.getConnectionPoolMaxSize(), from, to);
                    poolTable_.put(key, cp);
                }
            }
            finally {
                lock_.unlock();
            }
        }
        return cp;
    }

    public static ConnectionStatistics[] getPoolStatistics() {
        HashSet<ConnectionStatistics> stats = new HashSet<ConnectionStatistics>();
        for (TcpConnectionManager cp : poolTable_.values()) {
            ConnectionStatistics cs = new ConnectionStatistics(cp.getLocalEndPoint(), cp.getRemoteEndPoint(), cp.getPoolSize(), cp.getConnectionsInUse());
            stats.add(cs);
        }
        return stats.toArray(new ConnectionStatistics[0]);
    }

    public static TcpConnection getConnection(EndPoint from, EndPoint to) throws IOException {
        return MessagingService.getConnectionPool(from, to).getConnection();
    }

    private void checkForReservedVerb(String type) {
        if (reservedVerbs_.get(type) != null && verbHandlers_.get(type) != null) {
            throw new IllegalArgumentException(type + " is a reserved verb handler. Scram!");
        }
    }

    @Override
    public void registerVerbHandlers(String type, IVerbHandler verbHandler) {
        this.checkForReservedVerb(type);
        verbHandlers_.put(type, verbHandler);
    }

    @Override
    public void deregisterAllVerbHandlers(EndPoint localEndPoint) {
        Iterator<String> keys = verbHandlers_.keySet().iterator();
        String key = null;
        while (keys.hasNext()) {
            key = keys.next();
            if (!key.contains(localEndPoint.toString())) continue;
            keys.remove();
        }
    }

    @Override
    public void deregisterVerbHandlers(String type) {
        verbHandlers_.remove(type);
    }

    @Override
    public IVerbHandler getVerbHandler(String type) {
        IVerbHandler handler = verbHandlers_.get(type);
        return handler;
    }

    @Override
    public String sendRR(Message message, EndPoint[] to, IAsyncCallback cb) {
        String messageId = message.getMessageId();
        callbackMap_.put(messageId, cb);
        for (int i = 0; i < to.length; ++i) {
            this.sendOneWay(message, to[i]);
        }
        return messageId;
    }

    @Override
    public String sendRR(Message message, EndPoint to, IAsyncCallback cb) {
        String messageId = message.getMessageId();
        callbackMap_.put(messageId, cb);
        this.sendOneWay(message, to);
        return messageId;
    }

    @Override
    public String sendRR(Message[] messages, EndPoint[] to, IAsyncCallback cb) {
        if (messages.length != to.length) {
            throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
        }
        String groupId = GuidGenerator.guid();
        callbackMap_.put(groupId, cb);
        for (int i = 0; i < messages.length; ++i) {
            messages[i].setMessageId(groupId);
            this.sendOneWay(messages[i], to[i]);
        }
        return groupId;
    }

    @Override
    public String sendRR(Message[][] messages, EndPoint[][] to, IAsyncCallback cb) {
        int i;
        if (messages.length != to.length) {
            throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
        }
        int length = messages.length;
        String[] gids = new String[length];
        for (i = 0; i < length; ++i) {
            gids[i] = GuidGenerator.guid();
        }
        cb.attachContext(gids);
        for (i = 0; i < length; ++i) {
            callbackMap_.put(gids[i], cb);
            for (int j = 0; j < messages[i].length; ++j) {
                messages[i][j].setMessageId(gids[i]);
                this.sendOneWay(messages[i][j], to[i][j]);
            }
        }
        return gids[0];
    }

    @Override
    public void sendOneWay(Message message, EndPoint to) {
        if (message.getFrom().equals(to)) {
            MessagingService.receive(message);
            return;
        }
        MessageSerializationTask tcpWriteEvent = new MessageSerializationTask(message, to);
        messageSerializerExecutor_.execute(tcpWriteEvent);
    }

    @Override
    public IAsyncResult sendRR(Message message, EndPoint to) {
        AsyncResult iar = new AsyncResult();
        taskCompletionMap_.put(message.getMessageId(), iar);
        this.sendOneWay(message, to);
        return iar;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sendUdpOneWay(Message message, EndPoint to) {
        if (message.getFrom().equals(to)) {
            MessagingService.receive(message);
            return;
        }
        UdpConnection connection = null;
        try {
            connection = new UdpConnection();
            connection.init();
            connection.write(message, to);
        }
        catch (IOException e) {
            logger_.warn((Object)LogUtil.throwableToString(e));
        }
        finally {
            if (connection != null) {
                connection.close();
            }
        }
    }

    @Override
    public void stream(String file, long startPosition, long total, EndPoint from, EndPoint to) {
        isStreaming_.set(true);
        FileStreamTask streamingTask = new FileStreamTask(file, startPosition, total, from, to);
        streamExecutor_.execute(streamingTask);
    }

    public static boolean isStreaming() {
        return isStreaming_.get();
    }

    public static void setStreamingMode(boolean bVal) {
        isStreaming_.set(bVal);
    }

    public static void flushAndshutdown() {
        for (Map.Entry<String, TcpConnectionManager> entry : poolTable_.entrySet()) {
            for (TcpConnection connection : entry.getValue().getConnections()) {
                connection.doPendingWrites();
            }
        }
        MessagingService.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void shutdown() {
        logger_.info((Object)"Shutting down ...");
        Class<MessagingService> clazz = MessagingService.class;
        synchronized (MessagingService.class) {
            for (SelectionKey skey : listenSockets_.values()) {
                skey.cancel();
                try {
                    skey.channel().close();
                }
                catch (IOException iOException) {}
            }
            listenSockets_.clear();
            messageDeserializationExecutor_.shutdownNow();
            messageSerializerExecutor_.shutdownNow();
            messageDeserializerExecutor_.shutdownNow();
            streamExecutor_.shutdownNow();
            taskCompletionMap_.shutdown();
            callbackMap_.shutdown();
            SelectorManager.getSelectorManager().interrupt();
            poolTable_.clear();
            verbHandlers_.clear();
            bShutdown_ = true;
            // ** MonitorExit[var0] (shouldn't be in output)
            logger_.info((Object)"Shutdown invocation complete.");
            return;
        }
    }

    public static void receive(Message message) {
        MessagingService.enqueueRunnable(message.getMessageType(), new MessageDeliveryTask(message));
    }

    public static boolean isLocalEndPoint(EndPoint ep) {
        return endPoints_.contains(ep);
    }

    private static void enqueueRunnable(String stageName, Runnable runnable) {
        IStage stage = StageManager.getStage(stageName);
        if (stage != null) {
            stage.execute(runnable);
        } else {
            logger_.info((Object)"Running on default stage - beware");
            messageSerializerExecutor_.execute(runnable);
        }
    }

    public static IAsyncCallback getRegisteredCallback(String key) {
        return callbackMap_.get(key);
    }

    public static void removeRegisteredCallback(String key) {
        callbackMap_.remove(key);
    }

    public static IAsyncResult getAsyncResult(String key) {
        return taskCompletionMap_.remove(key);
    }

    public static void removeAsyncResult(String key) {
        taskCompletionMap_.remove(key);
    }

    public static byte[] getProtocol() {
        return protocol_;
    }

    public static ExecutorService getReadExecutor() {
        return messageDeserializationExecutor_;
    }

    public static ExecutorService getWriteExecutor() {
        return messageSerializerExecutor_;
    }

    public static ExecutorService getDeserializationExecutor() {
        return messageDeserializerExecutor_;
    }

    public static boolean isProtocolValid(byte[] protocol) {
        return MessagingService.isEqual(protocol_, protocol);
    }

    public static boolean isEqual(byte[] digestA, byte[] digestB) {
        return MessageDigest.isEqual(digestA, digestB);
    }

    public static byte[] toByteArray(int i) {
        byte[] bytes = new byte[]{(byte)(i >>> 24 & 0xFF), (byte)(i >>> 16 & 0xFF), (byte)(i >>> 8 & 0xFF), (byte)(i & 0xFF)};
        return bytes;
    }

    public static byte[] toByteArray(short s) {
        byte[] bytes = new byte[]{(byte)(s >>> 8 & 0xFF), (byte)(s & 0xFF)};
        return bytes;
    }

    public static short byteArrayToShort(byte[] bytes) {
        return MessagingService.byteArrayToShort(bytes, 0);
    }

    public static short byteArrayToShort(byte[] bytes, int offset) {
        if (bytes.length - offset < 2) {
            throw new IllegalArgumentException("A short must be 2 bytes in size.");
        }
        short n = 0;
        for (int i = 0; i < 2; ++i) {
            n = (short)(n << 8);
            n = (short)(n | bytes[offset + i] & 0xFF);
        }
        return n;
    }

    public static int getBits(int x, int p, int n) {
        return x >>> p + 1 - n & ~(-1 << n);
    }

    public static int byteArrayToInt(byte[] bytes) {
        return MessagingService.byteArrayToInt(bytes, 0);
    }

    public static int byteArrayToInt(byte[] bytes, int offset) {
        if (bytes.length - offset < 4) {
            throw new IllegalArgumentException("An integer must be 4 bytes in size.");
        }
        int n = 0;
        for (int i = 0; i < 4; ++i) {
            n <<= 8;
            n |= bytes[offset + i] & 0xFF;
        }
        return n;
    }

    public static ByteBuffer packIt(byte[] bytes, boolean compress, boolean stream, boolean listening) {
        byte[] size = MessagingService.toByteArray(bytes.length);
        int n = 0;
        n |= serializerType_.ordinal();
        if (compress) {
            n |= 4;
        }
        if (stream) {
            n |= 8;
        }
        if (listening) {
            n |= 0x10;
        }
        byte[] header = MessagingService.toByteArray(n |= version_ << 8);
        ByteBuffer buffer = ByteBuffer.allocate(16 + header.length + size.length + bytes.length);
        buffer.put(protocol_);
        buffer.put(header);
        buffer.put(size);
        buffer.put(bytes);
        buffer.flip();
        return buffer;
    }

    public static ByteBuffer constructStreamHeader(boolean compress, boolean stream) {
        int n = 0;
        n |= serializerType_.ordinal();
        if (compress) {
            n |= 4;
        }
        if (stream) {
            n |= 8;
        }
        byte[] header = MessagingService.toByteArray(n |= version_ << 8);
        ByteBuffer buffer = ByteBuffer.allocate(16 + header.length);
        buffer.put(protocol_);
        buffer.put(header);
        buffer.flip();
        return buffer;
    }

    static {
        listenSockets_ = new HashMap<EndPoint, SelectionKey>();
        mCastMembership_ = new HashMap<String, MulticastSocket>();
        lock_ = new ReentrantLock();
        poolTable_ = new Hashtable<String, TcpConnectionManager>();
        bShutdown_ = false;
        logger_ = Logger.getLogger(MessagingService.class);
        messagingService_ = new MessagingService();
    }

    private static enum ReservedVerbs_ {
        RESPONSE;

    }
}

