/*
 * Decompiled with CFR 0.152.
 */
package org.spf4j.io.tcp;

import com.google.common.annotations.Beta;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.Service;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import org.spf4j.base.Throwables;
import org.spf4j.concurrent.RestartableServiceImpl;
import org.spf4j.ds.UpdateablePriorityQueue;
import org.spf4j.io.tcp.AcceptorSelectorEventHandler;
import org.spf4j.io.tcp.ClientHandler;
import org.spf4j.io.tcp.DeadlineAction;
import org.spf4j.io.tcp.SelectorEventHandler;

@SuppressFBWarnings(value={"HES_EXECUTOR_NEVER_SHUTDOWN"})
@Beta
public final class TcpServer
extends RestartableServiceImpl {
    private final int serverPort;

    public TcpServer(final ExecutorService executor, final ClientHandler handlerFactory, final int serverPort, final int acceptBacklog) {
        super(new Supplier<Service>(){

            public Service get() {
                return new TcpServerGuavaService(executor, handlerFactory, serverPort, acceptBacklog);
            }
        });
        this.serverPort = serverPort;
    }

    @Override
    public String getServiceName() {
        return "TCP:LISTEN:" + this.serverPort;
    }

    public static final class TcpServerGuavaService
    extends AbstractExecutionThreadService
    implements Closeable {
        private final ExecutorService executor;
        private final ClientHandler handlerFactory;
        private final int serverPort;
        private final int acceptBacklog;
        private volatile boolean terminated;
        private volatile Selector selector;
        private volatile ServerSocketChannel serverCh;

        public TcpServerGuavaService(ExecutorService executor, ClientHandler handlerFactory, int serverPort, int acceptBacklog) {
            this.executor = executor;
            this.handlerFactory = handlerFactory;
            this.acceptBacklog = acceptBacklog;
            this.serverPort = serverPort;
            this.terminated = false;
            this.selector = null;
        }

        protected void startUp() throws Exception {
            this.selector = Selector.open();
            try {
                ServerSocketChannel sc = ServerSocketChannel.open();
                try {
                    sc.bind(new InetSocketAddress(this.serverPort), this.acceptBacklog);
                    sc.configureBlocking(false);
                    this.serverCh = sc;
                }
                catch (IOException | RuntimeException e) {
                    sc.close();
                    throw e;
                }
            }
            catch (IOException | RuntimeException e) {
                this.selector.close();
                throw e;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @SuppressFBWarnings(value={"AFBR_ABNORMAL_FINALLY_BLOCK_RETURN"})
        public void run() throws IOException {
            Selector sel = this.selector;
            try {
                ArrayBlockingQueue<Runnable> tasksToRunBySelector = new ArrayBlockingQueue<Runnable>(64);
                UpdateablePriorityQueue<DeadlineAction> deadlineActions = new UpdateablePriorityQueue<DeadlineAction>(64, DeadlineAction.COMPARATOR);
                new AcceptorSelectorEventHandler(this.serverCh, this.handlerFactory, sel, this.executor, tasksToRunBySelector, deadlineActions).initialInterestRegistration();
                while (this.isRunning()) {
                    Runnable task;
                    DeadlineAction peek;
                    int nrSelectors = sel.select(100L);
                    if (nrSelectors > 0) {
                        Set<SelectionKey> selectedKeys = sel.selectedKeys();
                        Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
                        while (keyIterator.hasNext()) {
                            SelectionKey skey = keyIterator.next();
                            Object attachment = skey.attachment();
                            if (attachment instanceof SelectorEventHandler) {
                                SelectorEventHandler seh = (SelectorEventHandler)attachment;
                                if (seh.canRunAsync()) {
                                    seh.runAsync(skey);
                                } else {
                                    seh.run(skey);
                                }
                            }
                            keyIterator.remove();
                        }
                    }
                    long currentTime = System.currentTimeMillis();
                    while ((peek = deadlineActions.peek()) != null && currentTime > peek.getDeadline()) {
                        deadlineActions.poll().getAction().run();
                    }
                    while ((task = (Runnable)tasksToRunBySelector.poll()) != null) {
                        task.run();
                    }
                }
            }
            finally {
                try {
                    TcpServerGuavaService.closeSelectorChannels(this.selector);
                }
                catch (IOException ex) {
                    try {
                        sel.close();
                    }
                    catch (IOException ex2) {
                        ex2.addSuppressed(ex);
                        throw ex2;
                    }
                    throw ex;
                }
                sel.close();
            }
        }

        protected Executor executor() {
            return this.executor;
        }

        protected String serviceName() {
            return "TCP:LISTEN:" + this.serverPort;
        }

        protected void triggerShutdown() {
            this.selector.wakeup();
        }

        @Override
        public synchronized void close() throws IOException {
            this.stopAsync().awaitTerminated();
        }

        public static void closeSelectorChannels(Selector selector) throws IOException {
            IOException ex = null;
            for (SelectionKey key : selector.keys()) {
                SelectableChannel channel = key.channel();
                try {
                    channel.close();
                }
                catch (IOException ex2) {
                    if (ex == null) {
                        ex = ex2;
                        continue;
                    }
                    ex = Throwables.suppress(ex, ex2);
                }
            }
            if (ex != null) {
                throw ex;
            }
        }

        public String toString() {
            return "TcpServer{executor=" + this.executor + ", handlerFactory=" + this.handlerFactory + ", serverPort=" + this.serverPort + ", acceptBacklog=" + this.acceptBacklog + ", terminated=" + this.terminated + ", selector=" + this.selector + '}';
        }
    }
}

