/*
 * Decompiled with CFR 0.152.
 */
package org.kaazing.gateway.service.broadcast;

import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Resource;
import org.apache.mina.core.future.CloseFuture;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IoSession;
import org.kaazing.gateway.service.Service;
import org.kaazing.gateway.service.ServiceContext;
import org.kaazing.gateway.service.ServiceProperties;
import org.kaazing.gateway.service.broadcast.BroadcastServiceHandler;
import org.kaazing.gateway.util.Utils;
import org.kaazing.gateway.util.scheduler.SchedulerProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BroadcastService
implements Service {
    private final Logger logger = LoggerFactory.getLogger((String)"service.broadcast");
    private final Logger gatewayLogger = LoggerFactory.getLogger((String)"org.kaazing.gateway.server.Gateway");
    private static final String BROADCAST_SERVICE_MAXIMUM_PENDING_BYTES = "org.kaazing.gateway.server.service.broadcast.MAXIMUM_PENDING_BYTES";
    private static final String BROADCAST_SERVICE_DISCONNECT_CLIENTS_ON_RECONNECT = "org.kaazing.gateway.server.service.broadcast.DISCONNECT_CLIENTS_ON_RECONNECT";
    private static final String ON_CLIENT_MESSAGE = "on.client.message";
    private ScheduledExecutorService scheduler;
    private final AtomicBoolean reconnect = new AtomicBoolean(false);
    private BroadcastServiceHandler handler;
    private ServiceContext serviceContext;
    private Properties configuration;
    private String connectURI;
    private int reconnectDelay;
    private final ConnectTask connectTask = new ConnectTask();

    @Resource(name="configuration")
    public void setConfiguration(Properties configuration) {
        this.configuration = configuration;
    }

    @Resource(name="schedulerProvider")
    public void setSchedulerProvider(SchedulerProvider provider) {
        this.scheduler = provider.getScheduler("broadcast_reconnect", false);
    }

    public String getType() {
        return "broadcast";
    }

    public void init(ServiceContext serviceContext) throws Exception {
        this.serviceContext = serviceContext;
        boolean disconnectClientsOnReconnect = Utils.parseBoolean((String)BROADCAST_SERVICE_DISCONNECT_CLIENTS_ON_RECONNECT, (String)this.configuration.getProperty(BROADCAST_SERVICE_DISCONNECT_CLIENTS_ON_RECONNECT), (boolean)false);
        long maximumScheduledWriteBytes = Utils.parsePositiveInteger((String)BROADCAST_SERVICE_MAXIMUM_PENDING_BYTES, (String)this.configuration.getProperty(BROADCAST_SERVICE_MAXIMUM_PENDING_BYTES), (long)Long.MAX_VALUE);
        OnClientMessage onClientMessage = OnClientMessage.fromString(serviceContext.getProperties().get(ON_CLIENT_MESSAGE));
        if (maximumScheduledWriteBytes != Long.MAX_VALUE) {
            this.gatewayLogger.info(String.format("Broadcast service: limiting maximum scheduled write bytes to %d", maximumScheduledWriteBytes));
        }
        this.handler = new BroadcastServiceHandler(disconnectClientsOnReconnect, maximumScheduledWriteBytes, onClientMessage, serviceContext.getLogger());
        Collection connectURIs = serviceContext.getConnects();
        ServiceProperties properties = serviceContext.getProperties();
        String reconnectDelay = properties.get("reconnect.delay");
        if (connectURIs == null || connectURIs.isEmpty()) {
            throw new IllegalArgumentException("Missing required connect");
        }
        this.connectURI = (String)connectURIs.iterator().next();
        this.reconnectDelay = reconnectDelay != null ? Integer.parseInt(reconnectDelay) : 3000;
    }

    public void start() throws Exception {
        this.reconnect.set(true);
        this.serviceContext.bind(this.serviceContext.getAccepts(), (IoHandler)this.handler);
        this.serviceContext.bindConnectsIfNecessary(this.serviceContext.getConnects());
        try {
            if (this.connectURI != null) {
                this.scheduler.schedule(this.connectTask, 0L, TimeUnit.MILLISECONDS);
            }
        }
        catch (Exception e) {
            this.logger.error("Unable to configure connectURI scheduler: " + e);
        }
    }

    public void stop() throws Exception {
        this.quiesce();
        this.serviceContext.unbindConnectsIfNecessary(this.serviceContext.getConnects());
        if (this.serviceContext != null) {
            for (IoSession session : this.serviceContext.getActiveSessions()) {
                session.close(true);
            }
        }
        this.connectTask.stop();
    }

    public void quiesce() throws Exception {
        this.reconnect.set(false);
        if (this.serviceContext != null) {
            this.serviceContext.unbind(this.serviceContext.getAccepts(), (IoHandler)this.handler);
        }
    }

    public void destroy() throws Exception {
        this.scheduler.shutdownNow();
    }

    public static enum OnClientMessage {
        NOOP("noop"),
        BROADCAST("broadcast");

        private final String type;

        private OnClientMessage(String type) {
            this.type = type;
        }

        static OnClientMessage fromString(String str) throws Exception {
            if (str == null) {
                return NOOP;
            }
            for (OnClientMessage e : OnClientMessage.values()) {
                if (!e.type.equalsIgnoreCase(str)) continue;
                return e;
            }
            throw new Exception(String.format("%s type not valid Enum type for %s", str, OnClientMessage.class));
        }
    }

    private final class ConnectTask
    implements Runnable {
        private final AtomicReference<IoSession> session = new AtomicReference();

        public void stop() {
            IoSession connection = this.session.get();
            if (connection != null && !connection.isClosing()) {
                connection.close(true);
            }
        }

        @Override
        public void run() {
            BroadcastService.this.serviceContext.connect(BroadcastService.this.connectURI, BroadcastService.this.handler.getListenHandler(), null).addListener((IoFutureListener)new IoFutureListener<ConnectFuture>(){

                public void operationComplete(ConnectFuture future) {
                    if (future.isConnected()) {
                        IoSession newSession = future.getSession();
                        newSession.getCloseFuture().addListener((IoFutureListener)new IoFutureListener<CloseFuture>(){

                            public void operationComplete(CloseFuture future) {
                                ConnectTask.this.session.set(null);
                                if (BroadcastService.this.reconnect.get()) {
                                    BroadcastService.this.scheduler.schedule(BroadcastService.this.connectTask, (long)BroadcastService.this.reconnectDelay, TimeUnit.MILLISECONDS);
                                }
                            }
                        });
                        ConnectTask.this.session.set(newSession);
                    } else {
                        BroadcastService.this.scheduler.schedule(BroadcastService.this.connectTask, (long)BroadcastService.this.reconnectDelay, TimeUnit.MILLISECONDS);
                    }
                }
            });
        }
    }
}

