/*
 * Decompiled with CFR 0.152.
 */
package org.kaazing.gateway.service.proxy;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.mina.core.filterchain.IoFilter;
import org.apache.mina.core.filterchain.IoFilterAdapter;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.session.IoSessionInitializer;
import org.jboss.netty.channel.socket.Worker;
import org.kaazing.gateway.service.ServiceContext;
import org.kaazing.gateway.service.proxy.AbstractProxyHandler;
import org.kaazing.gateway.service.proxy.ConnectionPool;
import org.kaazing.gateway.transport.BridgeAcceptor;
import org.kaazing.gateway.transport.BridgeServiceFactory;
import org.kaazing.gateway.transport.Transport;
import org.kaazing.gateway.transport.TransportFactory;
import org.kaazing.gateway.transport.nio.internal.NioSocketAcceptor;
import org.kaazing.gateway.util.scheduler.SchedulerProvider;
import org.kaazing.mina.netty.util.threadlocal.VicariousThreadLocal;
import org.slf4j.Logger;

public final class ServiceConnectManager {
    private final ServiceContext serviceCtx;
    private final AbstractProxyHandler connectHandler;
    private final String connectURI;
    private final AtomicBoolean serviceConnected = new AtomicBoolean(true);
    private final SchedulerProvider schedulerProvider;
    private ServiceHeartbeat heartbeat;
    private final Logger logger;
    private final int interval;
    private final NioSocketAcceptor tcpAcceptor;
    private HeartbeatFilter heartbeatFilter;
    private int preparedConnectionCount;
    private AtomicLong lastSuccessfulConnectTime = new AtomicLong(0L);
    private AtomicLong lastFailedConnectTime = new AtomicLong(0L);
    private AtomicBoolean heartbeatPingResult = new AtomicBoolean(false);
    private long heartbeatPingTimestamp = 0L;
    private AtomicInteger heartbeatPingCount = new AtomicInteger(0);
    private AtomicInteger heartbeatPingSuccesses = new AtomicInteger(0);
    private AtomicInteger heartbeatPingFailures = new AtomicInteger(0);
    private final ThreadLocal<ConnectionPool> connectionPool = new VicariousThreadLocal();

    public ServiceConnectManager(ServiceContext service, AbstractProxyHandler connectHandler, BridgeServiceFactory bridgeServiceFactory, String connectURI, int interval, int preparedConnectionCount) {
        this.serviceCtx = service;
        this.connectHandler = connectHandler;
        this.connectURI = connectURI;
        this.schedulerProvider = service.getSchedulerProvider();
        this.logger = service.getLogger();
        this.interval = interval;
        TransportFactory transportFactory = bridgeServiceFactory.getTransportFactory();
        Transport tcp = transportFactory.getTransport("tcp");
        assert (tcp != null);
        BridgeAcceptor bridgeAcceptor = tcp.getAcceptor();
        assert (bridgeAcceptor instanceof NioSocketAcceptor);
        this.tcpAcceptor = (NioSocketAcceptor)bridgeAcceptor;
        int workerCount = service.getProcessorCount();
        assert (workerCount > 0);
        this.preparedConnectionCount = preparedConnectionCount;
        if (preparedConnectionCount > 0 && preparedConnectionCount < workerCount) {
            this.preparedConnectionCount = workerCount;
            if (this.logger.isWarnEnabled()) {
                this.logger.warn(String.format("Configured prepared.connection.count %d for %s service has been increased to number of IO threads %d for extra efficiency", preparedConnectionCount, this.serviceCtx.getServiceType(), this.preparedConnectionCount, workerCount));
            }
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug(String.format("%s service with thread alignment, using prepared.connection.count=%d", this.serviceCtx.getServiceType(), preparedConnectionCount));
        }
        this.heartbeatFilter = interval > 0 ? new HeartbeatFilter(interval) : null;
    }

    public void start() {
        Object tmpConnectListener = this.interval > 0 ? new IoFutureListener<ConnectFuture>(){

            public void operationComplete(ConnectFuture future) {
                ServiceConnectManager.this.heartbeatFilter.setServiceConnected(future.isConnected());
                ServiceConnectManager.this.updateConnectTimes(future.isConnected());
            }
        } : new IoFutureListener<ConnectFuture>(){

            public void operationComplete(ConnectFuture future) {
                ServiceConnectManager.this.updateConnectTimes(future.isConnected());
            }
        };
        Object connectListener = tmpConnectListener;
        Worker[] workers = this.tcpAcceptor.getWorkers();
        assert (this.preparedConnectionCount == 0 || this.preparedConnectionCount >= workers.length) : "Prepared connection count must be 0, or >= number of IO threads";
        int minCountPerThread = this.preparedConnectionCount / workers.length;
        int remainder = this.preparedConnectionCount % workers.length;
        for (Worker worker : workers) {
            int count = remainder-- > 0 ? minCountPerThread + 1 : minCountPerThread;
            FutureTask<ConnectionPool> startConnectionPoolTask = new FutureTask<ConnectionPool>(new Callable<ConnectionPool>((IoFutureListener)connectListener, count){
                final /* synthetic */ IoFutureListener val$connectListener;
                final /* synthetic */ int val$count;
                {
                    this.val$connectListener = ioFutureListener;
                    this.val$count = n;
                }

                @Override
                public ConnectionPool call() {
                    ConnectionPool currentPool = (ConnectionPool)ServiceConnectManager.this.connectionPool.get();
                    if (currentPool == null) {
                        currentPool = new ConnectionPool(ServiceConnectManager.this.serviceCtx, ServiceConnectManager.this.connectHandler, ServiceConnectManager.this.connectURI, ServiceConnectManager.this.heartbeatFilter, (IoFutureListener<ConnectFuture>)this.val$connectListener, this.val$count, true);
                        ServiceConnectManager.this.connectionPool.set(currentPool);
                    }
                    currentPool.start();
                    return currentPool;
                }
            });
            worker.executeInIoThread(startConnectionPoolTask);
        }
    }

    public ConnectFuture getNextConnectFuture(final IoSessionInitializer<ConnectFuture> connectInitializer) {
        ConnectionPool pool = this.connectionPool.get();
        if (pool == null) {
            FutureTask<ConnectFuture> delegateGetConnectFuture = new FutureTask<ConnectFuture>(new Callable<ConnectFuture>(){

                @Override
                public ConnectFuture call() {
                    ConnectionPool delegatePool = (ConnectionPool)ServiceConnectManager.this.connectionPool.get();
                    if (delegatePool != null) {
                        return delegatePool.getNextConnectFuture((IoSessionInitializer<ConnectFuture>)connectInitializer);
                    }
                    return null;
                }
            });
            Worker[] workers = this.tcpAcceptor.getWorkers();
            int randomWorker = (int)(Math.random() * (double)workers.length);
            workers[randomWorker].executeInIoThread(delegateGetConnectFuture);
            try {
                return delegateGetConnectFuture.get();
            }
            catch (ExecutionException executionEX) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(String.format("Failed to get connectFuture to %s from delegate connection pool due to exception", this.connectURI), (Throwable)executionEX);
                } else {
                    this.logger.warn(String.format("Failed to get ConnectFuture to %s from delegate connection pool due to exception %s", this.connectURI, executionEX));
                }
                return null;
            }
            catch (InterruptedException interruptedEx) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(String.format("Failed to get connectFuture to %s from delegate connection pool due to exception", this.connectURI), (Throwable)interruptedEx);
                } else {
                    this.logger.warn(String.format("Failed to get ConnectFuture to %s from delegate connection pool due to exception %s", this.connectURI, interruptedEx));
                }
                return null;
            }
        }
        return pool.getNextConnectFuture(connectInitializer);
    }

    int getPreparedConnectionCount() {
        return this.preparedConnectionCount;
    }

    public long getLastSuccessfulConnectTime() {
        return this.lastSuccessfulConnectTime.get();
    }

    public long getLastFailedConnectTime() {
        return this.lastFailedConnectTime.get();
    }

    public boolean getLastHeartbeatPingResult() {
        return this.heartbeatPingResult.get();
    }

    public long getLastHeartbeatPingTimestamp() {
        return this.heartbeatPingTimestamp;
    }

    public int getHeartbeatPingCount() {
        return this.heartbeatPingCount.get();
    }

    public int getHeartbeatPingSuccessesCount() {
        return this.heartbeatPingSuccesses.get();
    }

    public int getHeartbeatPingFailuresCount() {
        return this.heartbeatPingFailures.get();
    }

    public boolean isServiceConnected() {
        return this.serviceConnected.get();
    }

    public boolean isHeartbeatRunning() {
        if (this.heartbeat == null) {
            return false;
        }
        return this.heartbeat.heartbeatTask.get() != null;
    }

    private void updateConnectTimes(boolean connected) {
        if (connected) {
            this.lastSuccessfulConnectTime.set(System.currentTimeMillis());
        } else {
            this.lastFailedConnectTime.set(System.currentTimeMillis());
        }
    }

    private class HeartbeatHandler
    extends IoHandlerAdapter {
        private HeartbeatHandler() {
        }

        public void sessionOpened(IoSession session) throws Exception {
            session.close(false);
            ServiceConnectManager.this.start();
        }
    }

    private class ServiceHeartbeat
    implements Runnable {
        private final int interval;
        private final AtomicReference<ScheduledFuture<?>> heartbeatTask;
        private final HeartbeatHandler handler;
        private final AtomicInteger nextDelay;
        private final ScheduledExecutorService executor;

        private ServiceHeartbeat(int interval) {
            this.interval = interval;
            this.handler = new HeartbeatHandler();
            this.heartbeatTask = new AtomicReference();
            this.nextDelay = new AtomicInteger(interval);
            this.executor = ServiceConnectManager.this.schedulerProvider.getScheduler("ServiceHeartbeat", false);
        }

        private void schedule() {
            this.schedule(0);
        }

        private synchronized void schedule(int delay) {
            ScheduledFuture<?> future = this.heartbeatTask.get();
            if (future != null) {
                return;
            }
            if (delay >= this.interval) {
                delay = this.interval;
            }
            if (this.executor.isShutdown() || this.executor.isTerminated()) {
                return;
            }
            future = this.executor.schedule(this, (long)delay, TimeUnit.SECONDS);
            this.heartbeatTask.set(future);
        }

        private synchronized void cancel() {
            ScheduledFuture currentHeartbeatTask = this.heartbeatTask.getAndSet(null);
            if (currentHeartbeatTask != null) {
                currentHeartbeatTask.cancel(false);
            }
            if (ServiceConnectManager.this.logger.isTraceEnabled()) {
                ServiceConnectManager.this.logger.trace(String.format("ServiceHeartBeat.cancel (%s): set nextDelay to 0", ServiceConnectManager.this.connectURI));
            }
            this.nextDelay.set(0);
        }

        @Override
        public synchronized void run() {
            ScheduledFuture<?> currentHeartbeatTask = this.heartbeatTask.get();
            if (currentHeartbeatTask != null) {
                if (ServiceConnectManager.this.logger.isTraceEnabled()) {
                    ServiceConnectManager.this.logger.trace(String.format("ServiceHeartBeat.run: Current Heartbeat task is " + currentHeartbeatTask + "; starting to heartbeat-connect to %s", ServiceConnectManager.this.connectURI));
                }
                ConnectFuture connectFuture = null;
                try {
                    connectFuture = ServiceConnectManager.this.serviceCtx.connect(ServiceConnectManager.this.connectURI, (IoHandler)this.handler, null);
                    connectFuture.addListener(ServiceConnectManager.this.heartbeatFilter.getConnectListener());
                }
                catch (Exception ex) {
                    if (ServiceConnectManager.this.logger.isDebugEnabled()) {
                        ServiceConnectManager.this.logger.debug(String.format("ServiceHeartBeat.run: exception connecting to uri %s", ServiceConnectManager.this.connectURI), (Throwable)ex);
                    }
                    ServiceConnectManager.this.logger.info(String.format("ServiceHeartBeat.run: exception connecting to uri %s, %s", ServiceConnectManager.this.connectURI, ex));
                }
                int delay = this.nextDelay.get();
                if (delay == 0) {
                    this.nextDelay.compareAndSet(0, 1);
                } else if (delay < this.interval) {
                    this.nextDelay.compareAndSet(delay, delay * 2);
                }
                if (this.heartbeatTask.compareAndSet(currentHeartbeatTask, null)) {
                    if (ServiceConnectManager.this.logger.isTraceEnabled()) {
                        ServiceConnectManager.this.logger.trace(String.format("ServiceHeartBeat.run adding listener to connect future to reschedule task", new Object[0]));
                    }
                    connectFuture.addListener((IoFutureListener)new IoFutureListener<ConnectFuture>(){

                        public void operationComplete(ConnectFuture future) {
                            Throwable t;
                            if (ServiceConnectManager.this.logger.isTraceEnabled()) {
                                ServiceConnectManager.this.logger.trace(String.format("ServiceHeartBeat.run.connectFuture.operationComplete (%s): nextDelay is %d", ServiceConnectManager.this.connectURI, ServiceHeartbeat.this.nextDelay.get()));
                            }
                            if ((t = future.getException()) != null && t instanceof IllegalStateException) {
                                if (ServiceConnectManager.this.logger.isTraceEnabled()) {
                                    ServiceConnectManager.this.logger.trace("ServiceHeartBeat.run.connectFuture.Completed: not rescheduling as connector is being shut down.", t);
                                }
                                return;
                            }
                            if (ServiceHeartbeat.this.nextDelay.get() != 0) {
                                ServiceHeartbeat.this.schedule(ServiceHeartbeat.this.nextDelay.get());
                            }
                        }
                    });
                }
                if (ServiceConnectManager.this.logger.isTraceEnabled()) {
                    ServiceConnectManager.this.logger.trace(String.format("ServiceHeartBeat.run finished executing heartbeat", new Object[0]));
                }
            }
        }
    }

    class HeartbeatFilter
    extends IoFilterAdapter {
        private final AtomicInteger sessionCount = new AtomicInteger(0);
        private final IoFutureListener<ConnectFuture> connectListener = new IoFutureListener<ConnectFuture>(){

            public void operationComplete(ConnectFuture future) {
                HeartbeatFilter.this.setServiceConnected(future.isConnected(), true);
            }
        };

        private HeartbeatFilter(int interval) {
            ServiceConnectManager.this.heartbeat = new ServiceHeartbeat(interval);
            ServiceConnectManager.this.heartbeat.schedule(interval);
        }

        public void sessionClosed(IoFilter.NextFilter nextFilter, IoSession session) throws Exception {
            if (this.sessionCount.decrementAndGet() == 0) {
                ServiceConnectManager.this.heartbeat.schedule();
            }
            super.sessionClosed(nextFilter, session);
        }

        public void sessionOpened(IoFilter.NextFilter nextFilter, IoSession session) throws Exception {
            if (this.sessionCount.getAndIncrement() == 0) {
                if (ServiceConnectManager.this.logger.isTraceEnabled()) {
                    ServiceConnectManager.this.logger.trace(String.format("First session connected to service, cancelling heartbeat for %s", ServiceConnectManager.this.connectURI));
                }
                this.cancelHeartbeat();
            }
            super.sessionOpened(nextFilter, session);
        }

        private IoFutureListener<ConnectFuture> getConnectListener() {
            return this.connectListener;
        }

        void cancelHeartbeat() {
            ServiceConnectManager.this.heartbeat.cancel();
        }

        void setServiceConnected(boolean successfullyConnected) {
            this.setServiceConnected(successfullyConnected, false);
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        void setServiceConnected(boolean successfullyConnected, boolean isHeartbeat) {
            if (successfullyConnected) {
                ServiceConnectManager.this.heartbeatPingSuccesses.incrementAndGet();
                ServiceConnectManager.this.heartbeatPingResult.set(true);
            } else {
                ServiceConnectManager.this.heartbeatPingFailures.incrementAndGet();
                ServiceConnectManager.this.heartbeatPingResult.set(false);
            }
            boolean changedConnectionState = ServiceConnectManager.this.serviceConnected.compareAndSet(!successfullyConnected, successfullyConnected);
            if (changedConnectionState) {
                if (!successfullyConnected) {
                    try {
                        if (ServiceConnectManager.this.logger.isInfoEnabled()) {
                            ServiceConnectManager.this.logger.info(String.format("Quiescing service with connect uri '%s'.", ServiceConnectManager.this.connectURI));
                        }
                        ServiceConnectManager.this.serviceCtx.getService().quiesce();
                        if (ServiceConnectManager.this.logger.isTraceEnabled()) {
                            ServiceConnectManager.this.logger.trace(String.format("Quiesced service with connect uri '%s'.", ServiceConnectManager.this.connectURI));
                        }
                    }
                    catch (Exception ex) {
                        throw new RuntimeException(ex);
                    }
                    ServiceConnectManager.this.heartbeat.schedule();
                    return;
                }
                if (this.sessionCount.get() > 0) {
                    ServiceConnectManager.this.heartbeat.cancel();
                }
                try {
                    if (ServiceConnectManager.this.logger.isInfoEnabled()) {
                        ServiceConnectManager.this.logger.info(String.format("Starting service with connect uri '%s'.", ServiceConnectManager.this.connectURI));
                    }
                    ServiceConnectManager.this.serviceCtx.getService().start();
                    if (!ServiceConnectManager.this.logger.isTraceEnabled()) return;
                    ServiceConnectManager.this.logger.trace(String.format("Started service with connect uri '%s'.", ServiceConnectManager.this.connectURI));
                    return;
                }
                catch (Exception ex) {
                    throw new RuntimeException(ex);
                }
            }
            if (!successfullyConnected || !isHeartbeat) return;
            ServiceConnectManager.this.start();
        }
    }
}

