/*
 * Decompiled with CFR 0.152.
 */
package com.solacesystems.jcsmp.protocol.nio.impl;

import com.solacesystems.jcsmp.JCSMPFatalErrorException;
import com.solacesystems.jcsmp.impl.timers.JCSMPTimeoutHandler;
import com.solacesystems.jcsmp.impl.timers.JCSMPTimer;
import com.solacesystems.jcsmp.impl.timers.JCSMPTimerQueue;
import com.solacesystems.jcsmp.impl.timers.impl.JCSMPTimerQueueImpl;
import com.solacesystems.jcsmp.protocol.nio.IOHandler;
import com.solacesystems.jcsmp.protocol.nio.IOReactor;
import com.solacesystems.jcsmp.protocol.nio.ReadIOHandler;
import com.solacesystems.jcsmp.protocol.nio.WriteIOHandler;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.IllegalBlockingModeException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class SyncEventDispatcherReactor
implements IOReactor,
JCSMPTimerQueue {
    private static final Log Trace = LogFactory.getLog(SyncEventDispatcherReactor.class);
    private Selector selector;
    private ConcurrentLinkedQueue<HRegistrationRequest> registrationOps;
    private volatile boolean requestedShutdown = false;
    private JCSMPTimerQueueImpl tQueue;
    private Thread serviceThread;
    private boolean _die_on_unhandled_OOM;

    private SyncEventDispatcherReactor(String ctxId) {
        try {
            try {
                this._die_on_unhandled_OOM = System.getProperty("JCSMP_EXIT_ON_UNHANDLED_OOM") != null;
            }
            catch (SecurityException se) {
                this._die_on_unhandled_OOM = false;
            }
            if (this._die_on_unhandled_OOM) {
                Trace.info((Object)"Reactor configured with JCSMP_EXIT_ON_UNHANDLED_OOM");
            }
            this.selector = SelectorProvider.provider().openSelector();
            this.registrationOps = new ConcurrentLinkedQueue();
            this.tQueue = new JCSMPTimerQueueImpl();
            this.serviceThread = new Thread(new SEDReactorThread());
            this.serviceThread.setName(String.format("Context_%s_ReactorThread", ctxId));
            this.serviceThread.setDaemon(true);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public synchronized void registerHandler(IOHandler handler, int ops) {
        HRegistrationRequest hreg_task = new HRegistrationRequest(this.selector, handler, 0, ops);
        this.registrationOps.add(hreg_task);
        this.selector.wakeup();
    }

    @Override
    public synchronized void deregisterHandler(IOHandler handler, int ops) {
        HRegistrationRequest hreg_task = new HRegistrationRequest(this.selector, handler, 1, ops);
        this.registrationOps.add(hreg_task);
        this.selector.wakeup();
        if (!this.isThreadReactor()) {
            try {
                hreg_task.completion.tryAcquire(100L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    private void handleCallbackError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void requestShutdown() {
        this.requestedShutdown = true;
        this.selector.wakeup();
    }

    public static SyncEventDispatcherReactor create(String name) {
        SyncEventDispatcherReactor reactorInstance = new SyncEventDispatcherReactor(name);
        reactorInstance.serviceThread.start();
        return reactorInstance;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void eventLoop() {
        long lastTimerRun = 0L;
        try {
            while (!this.requestedShutdown) {
                try {
                    while (!this.registrationOps.isEmpty()) {
                        HRegistrationRequest r = (HRegistrationRequest)this.registrationOps.remove();
                        try {
                            r.run();
                        }
                        catch (CancelledKeyException cancelledKeyException) {
                        }
                    }
                    long SELECT_TIMEOUT = 50L;
                    this.selector.select(50L);
                    Set<SelectionKey> keys = this.selector.selectedKeys();
                    this.processReactorChannels(keys);
                    long curTime = System.currentTimeMillis();
                    if (curTime - lastTimerRun < 50L) continue;
                    lastTimerRun = curTime;
                    this.tQueue.runAllTo(curTime);
                }
                catch (IllegalBlockingModeException e) {
                    Trace.info((Object)"Aborting write operation on a blocking socket (Stray write operation) possible SSL negotiation failure.");
                }
                catch (Throwable t) {
                    if (t instanceof OutOfMemoryError && this._die_on_unhandled_OOM) {
                        System.err.println("Reactor got unhandled OutOfMemoryError, exit(1).");
                        System.exit(1);
                    }
                    Trace.error((Object)"unexpected exception in Reactor , continuing ...,  back to the begining of the main loop", t);
                    t.printStackTrace();
                }
            }
            return;
        }
        finally {
            try {
                this.selector.close();
            }
            catch (IOException iOException) {}
        }
    }

    private void processReactorChannels(Set<SelectionKey> keys) {
        if (keys.size() == 0) {
            return;
        }
        Iterator<SelectionKey> iter = keys.iterator();
        while (iter.hasNext()) {
            SelectionKey k = iter.next();
            iter.remove();
            try {
                IOHandler h;
                if (k.isReadable() && k.attachment() instanceof ReadIOHandler) {
                    h = (ReadIOHandler)k.attachment();
                    try {
                        h.read();
                    }
                    catch (Throwable t) {
                        h.notifyFatalError(new JCSMPFatalErrorException("Fatal reactor error", t));
                    }
                }
                if (!k.isWritable() || !(k.attachment() instanceof WriteIOHandler)) continue;
                h = (WriteIOHandler)k.attachment();
                try {
                    h.write(k.channel());
                }
                catch (Throwable t) {
                    h.notifyFatalError(new JCSMPFatalErrorException("Fatal reactor error", t));
                }
            }
            catch (CancelledKeyException cke) {
                Object handler;
                if (Trace.isDebugEnabled()) {
                    Trace.debug((Object)"Reactor caught closed channel (CancelledKeyException). Will attempt to deliver to handler. ", (Throwable)cke);
                }
                if ((handler = k.attachment()) == null) continue;
                if (handler instanceof ReadIOHandler) {
                    ((ReadIOHandler)handler).handleClosedSocketDuringRead(cke, k.channel());
                    continue;
                }
                if (!(handler instanceof WriteIOHandler)) continue;
                ((WriteIOHandler)handler).handleClosedSocketDuringWrite(cke, k.channel());
            }
        }
    }

    @Override
    public void cancelTimer(JCSMPTimer timer) {
        this.tQueue.cancelTimer(timer);
    }

    @Override
    public void cancelAllTimers() {
        this.tQueue.cancelAllTimers();
    }

    @Override
    public JCSMPTimer schedule_absolute(long time, JCSMPTimeoutHandler handler, JCSMPTimer timer) {
        JCSMPTimer tmr = this.tQueue.schedule_absolute(time, handler, timer);
        return tmr;
    }

    @Override
    public JCSMPTimer schedule_relative(long offset, JCSMPTimeoutHandler handler, JCSMPTimer timer) {
        JCSMPTimer tmr = this.tQueue.schedule_relative(offset, handler, timer);
        return tmr;
    }

    @Override
    public JCSMPTimer schedule_absolute(long time, JCSMPTimeoutHandler handler) {
        JCSMPTimer timer = this.tQueue.schedule_absolute(time, handler);
        return timer;
    }

    @Override
    public JCSMPTimer schedule_relative(long offset, JCSMPTimeoutHandler handler) {
        JCSMPTimer timer = this.tQueue.schedule_relative(offset, handler);
        return timer;
    }

    public long getServiceThreadId() {
        return this.serviceThread.getId();
    }

    public boolean isThreadReactor() {
        return Thread.currentThread().getId() == this.getServiceThreadId();
    }

    class SEDReactorThread
    implements Runnable {
        SEDReactorThread() {
        }

        public void run() {
            SyncEventDispatcherReactor.this.eventLoop();
        }
    }

    static final class HRegistrationRequest
    implements Runnable {
        Selector _s;
        IOHandler _h;
        int _mode;
        int _op;
        static final int REGISTER = 0;
        static final int DEREGISTER = 1;
        public final Semaphore completion = new Semaphore(0);

        HRegistrationRequest(Selector selector, IOHandler handler, int mode, int op) {
            this._s = selector;
            this._h = handler;
            this._mode = mode;
            this._op = op;
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public void run() {
            try {
                switch (this._mode) {
                    case 0: {
                        this.register();
                        return;
                    }
                    case 1: {
                        this.deregister();
                        return;
                    }
                }
                return;
            }
            finally {
                this.completion.release();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void register() {
            int opsToReg;
            if (this._h == null) {
                return;
            }
            SelectionKey sk = this._h.getChannel().keyFor(this._s);
            int n = opsToReg = sk == null ? 0 : sk.interestOps();
            if (this._h instanceof ReadIOHandler && (this._op & 1) != 0) {
                opsToReg |= 1;
            }
            if (this._h instanceof WriteIOHandler && (this._op & 4) != 0) {
                opsToReg |= 4;
            }
            try {
                AbstractSelectableChannel ch;
                AbstractSelectableChannel abstractSelectableChannel = ch = this._h.getChannel();
                synchronized (abstractSelectableChannel) {
                    if (ch.isOpen()) {
                        SelectionKey key = ch.register(this._s, opsToReg);
                        key.attach(this._h);
                    }
                }
            }
            catch (ClosedChannelException e) {
                Trace.warn((Object)"Attempted to register interest on a closed channel, cancelling registration request.", (Throwable)e);
            }
        }

        void deregister() {
            if (this._h == null || this._h.getChannel() == null) {
                return;
            }
            SelectionKey sk = this._h.getChannel().keyFor(this._s);
            if (sk != null) {
                int ops = sk.interestOps();
                sk.interestOps(ops &= ~this._op);
            }
        }
    }
}

