/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.thrift;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.thrift.TCustomNonblockingServerSocket;
import org.apache.cassandra.thrift.TServerFactory;
import org.apache.cassandra.thrift.ThriftSessionManager;
import org.apache.thrift.TProcessor;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CustomTHsHaServer
extends TNonblockingServer {
    private static final Logger LOGGER = LoggerFactory.getLogger((String)CustomTHsHaServer.class.getName());
    private final Set<SelectorThread> ioThreads = new HashSet<SelectorThread>();
    private volatile boolean stopped = true;
    private final ExecutorService invoker;

    public CustomTHsHaServer(TNonblockingServer.Args args, ExecutorService invoker, int threadCount) {
        super((TNonblockingServer.AbstractNonblockingServerArgs)args);
        this.invoker = invoker;
        for (int i = 0; i < threadCount; ++i) {
            this.ioThreads.add(new SelectorThread("Selector-Thread-" + i));
        }
    }

    public void serve() {
        if (!this.startListening()) {
            return;
        }
        if (!this.startThreads()) {
            return;
        }
        this.setServing(true);
        this.joinSelector();
        this.invoker.shutdown();
        this.setServing(false);
        this.stopListening();
    }

    protected boolean startThreads() {
        this.stopped = false;
        for (SelectorThread thread : this.ioThreads) {
            thread.start();
        }
        return true;
    }

    protected void joinSelector() {
        try {
            for (SelectorThread thread : this.ioThreads) {
                thread.join();
            }
        }
        catch (InterruptedException e) {
            LOGGER.error("Interrupted while joining threads!", (Throwable)e);
        }
    }

    public void stop() {
        this.stopListening();
        this.stopped = true;
        for (SelectorThread thread : this.ioThreads) {
            thread.wakeupSelector();
        }
        this.joinSelector();
    }

    protected boolean requestInvoke(TNonblockingServer.FrameBuffer frameBuffer, SelectorThread thread) {
        try {
            Invocation invocation = new Invocation(frameBuffer, thread);
            this.invoker.execute(invocation);
            return true;
        }
        catch (RejectedExecutionException rx) {
            LOGGER.warn("ExecutorService rejected execution!", (Throwable)rx);
            return false;
        }
    }

    protected void requestSelectInterestChange(TNonblockingServer.FrameBuffer fb) {
    }

    public static class Factory
    implements TServerFactory {
        @Override
        public TServer buildTServer(TServerFactory.Args args) {
            TCustomNonblockingServerSocket serverTransport;
            if (!DatabaseDescriptor.getClientEncryptionOptions().internode_encryption.equals((Object)EncryptionOptions.InternodeEncryption.none)) {
                throw new RuntimeException("Client SSL is not supported for non-blocking sockets (hsha). Please remove client ssl from the configuration.");
            }
            InetSocketAddress addr = args.addr;
            try {
                serverTransport = new TCustomNonblockingServerSocket(addr, args.keepAlive, args.sendBufferSize, args.recvBufferSize);
            }
            catch (TTransportException e) {
                throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", addr.getAddress(), addr.getPort()), e);
            }
            JMXEnabledThreadPoolExecutor executorService = new JMXEnabledThreadPoolExecutor((int)DatabaseDescriptor.getRpcMinThreads(), (int)DatabaseDescriptor.getRpcMaxThreads(), 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL");
            TNonblockingServer.Args serverArgs = (TNonblockingServer.Args)((TNonblockingServer.Args)((TNonblockingServer.Args)((TNonblockingServer.Args)((TNonblockingServer.Args)new TNonblockingServer.Args((TNonblockingServerTransport)serverTransport).inputTransportFactory(args.inTransportFactory)).outputTransportFactory(args.outTransportFactory)).inputProtocolFactory(args.tProtocolFactory)).outputProtocolFactory(args.tProtocolFactory)).processor((TProcessor)args.processor);
            return new CustomTHsHaServer(serverArgs, executorService, Runtime.getRuntime().availableProcessors());
        }
    }

    protected class SelectorThread
    extends Thread {
        private final Selector selector;
        private final TNonblockingServerTransport serverTransport;
        private final Set<TNonblockingServer.FrameBuffer> selectInterestChanges;

        public SelectorThread(String name) {
            super(name);
            this.selectInterestChanges = new HashSet<TNonblockingServer.FrameBuffer>();
            try {
                this.selector = SelectorProvider.provider().openSelector();
                this.serverTransport = (TNonblockingServerTransport)CustomTHsHaServer.this.serverTransport_;
                this.serverTransport.registerSelector(this.selector);
            }
            catch (IOException ex) {
                throw new RuntimeException("Couldnt open the NIO selector", ex);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                while (!CustomTHsHaServer.this.stopped) {
                    this.select();
                }
            }
            catch (Throwable t) {
                LOGGER.error("Uncaught Exception: ", t);
            }
            finally {
                try {
                    this.selector.close();
                }
                catch (IOException e) {}
            }
        }

        private void select() throws InterruptedException, IOException {
            this.selector.select();
            Iterator<SelectionKey> keyIterator = this.selector.selectedKeys().iterator();
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();
                try {
                    if (!key.isValid()) {
                        this.cleanupSelectionkey(key);
                        continue;
                    }
                    if (key.isAcceptable()) {
                        this.handleAccept();
                    }
                    if (key.isReadable()) {
                        this.handleRead(key);
                        continue;
                    }
                    if (key.isWritable()) {
                        this.handleWrite(key);
                        continue;
                    }
                    LOGGER.debug("Unexpected state " + key.interestOps());
                }
                catch (Exception io) {
                    this.cleanupSelectionkey(key);
                }
            }
            this.processInterestChanges();
        }

        private void handleAccept() {
            block4: {
                SelectionKey clientKey = null;
                TNonblockingTransport client = null;
                try {
                    client = (TNonblockingTransport)this.serverTransport.accept();
                    clientKey = client.registerSelector(this.selector, 1);
                    TNonblockingServer.FrameBuffer frameBuffer = new TNonblockingServer.FrameBuffer((TNonblockingServer)CustomTHsHaServer.this, client, clientKey);
                    clientKey.attach(frameBuffer);
                }
                catch (TTransportException ex) {
                    return;
                }
                catch (IOException tte) {
                    LOGGER.warn("Exception trying to accept!", (Throwable)tte);
                    tte.printStackTrace();
                    if (clientKey != null) {
                        this.cleanupSelectionkey(clientKey);
                    }
                    if (client == null) break block4;
                    client.close();
                }
            }
        }

        private void handleRead(SelectionKey key) {
            TNonblockingServer.FrameBuffer buffer = (TNonblockingServer.FrameBuffer)key.attachment();
            if (!buffer.read()) {
                this.cleanupSelectionkey(key);
                return;
            }
            if (buffer.isFrameFullyRead() && !CustomTHsHaServer.this.requestInvoke(buffer, this)) {
                this.cleanupSelectionkey(key);
            }
        }

        private void handleWrite(SelectionKey key) {
            TNonblockingServer.FrameBuffer buffer = (TNonblockingServer.FrameBuffer)key.attachment();
            if (!buffer.write()) {
                this.cleanupSelectionkey(key);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void requestSelectInterestChange(TNonblockingServer.FrameBuffer frameBuffer) {
            Set<TNonblockingServer.FrameBuffer> set = this.selectInterestChanges;
            synchronized (set) {
                this.selectInterestChanges.add(frameBuffer);
            }
            this.selector.wakeup();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processInterestChanges() {
            Set<TNonblockingServer.FrameBuffer> set = this.selectInterestChanges;
            synchronized (set) {
                for (TNonblockingServer.FrameBuffer fb : this.selectInterestChanges) {
                    fb.changeSelectInterests();
                }
                this.selectInterestChanges.clear();
            }
        }

        private void cleanupSelectionkey(SelectionKey key) {
            TNonblockingServer.FrameBuffer buffer = (TNonblockingServer.FrameBuffer)key.attachment();
            if (buffer != null) {
                buffer.close();
            }
            key.cancel();
        }

        public void wakeupSelector() {
            this.selector.wakeup();
        }
    }

    protected class Invocation
    implements Runnable {
        private final TNonblockingServer.FrameBuffer frameBuffer;
        private final SelectorThread thread;

        public Invocation(TNonblockingServer.FrameBuffer frameBuffer, SelectorThread thread) {
            this.frameBuffer = frameBuffer;
            this.thread = thread;
        }

        @Override
        public void run() {
            TNonblockingSocket socket = (TNonblockingSocket)this.frameBuffer.trans_;
            ThriftSessionManager.instance.setCurrentSocket(socket.getSocketChannel().socket().getRemoteSocketAddress());
            this.frameBuffer.invoke();
            this.thread.requestSelectInterestChange(this.frameBuffer);
        }
    }
}

