/*
 * Decompiled with CFR 0.152.
 */
package org.apache.uima.ducc.rm;

import java.util.Timer;
import java.util.TimerTask;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.uima.ducc.common.admin.event.DuccAdminEvent;
import org.apache.uima.ducc.common.admin.event.RmAdminQLoad;
import org.apache.uima.ducc.common.admin.event.RmAdminQOccupancy;
import org.apache.uima.ducc.common.admin.event.RmAdminReply;
import org.apache.uima.ducc.common.admin.event.RmAdminVaryOff;
import org.apache.uima.ducc.common.admin.event.RmAdminVaryOn;
import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties;
import org.apache.uima.ducc.common.component.AbstractDuccComponent;
import org.apache.uima.ducc.common.main.DuccService;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
import org.apache.uima.ducc.rm.JobManagerConverter;
import org.apache.uima.ducc.rm.NodeStability;
import org.apache.uima.ducc.rm.ResourceManager;
import org.apache.uima.ducc.rm.scheduler.ISchedulerMain;
import org.apache.uima.ducc.rm.scheduler.JobManagerUpdate;
import org.apache.uima.ducc.rm.scheduler.SchedConstants;
import org.apache.uima.ducc.rm.scheduler.Scheduler;
import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
import org.apache.uima.ducc.transport.event.DuccEvent;
import org.apache.uima.ducc.transport.event.RmStateDuccEvent;
import org.apache.uima.ducc.transport.event.common.DuccWorkMap;

public class ResourceManagerComponent
extends AbstractDuccComponent
implements ResourceManager,
SchedConstants,
Runnable {
    private static DuccLogger logger = DuccLogger.getLogger(ResourceManagerComponent.class, (String)"RM");
    int nodeStability;
    int initStability;
    int nodeMetricsUpdateRate;
    boolean schedulerReady = false;
    ISchedulerMain scheduler;
    JobManagerConverter converter;
    int schedulingRatio = 6;
    int schedulingEpoch = 60000;
    DuccEventDispatcher eventDispatcher;
    String stateEndpoint;
    NodeStability stabilityManager = null;
    long epoch_counter = 0L;
    int stabilityCount = 0;
    Timer stabilityTimer = new Timer();

    public ResourceManagerComponent(CamelContext context) {
        super("ResourceManager", context);
        this.scheduler = new Scheduler();
    }

    public ISchedulerMain getScheduler() {
        return this.scheduler;
    }

    public boolean isSchedulerReady() {
        return this.schedulerReady;
    }

    public void setNodeStability(NodeStability ns) {
        this.stabilityManager = ns;
    }

    public DuccLogger getLogger() {
        return logger;
    }

    private void startRmAdminChannel(final String endpoint, final AbstractDuccComponent delegate) throws Exception {
        this.getContext().addRoutes((RoutesBuilder)new RouteBuilder(){

            public void configure() {
                System.out.println("Configuring RM Admin Channel on Endpoint:" + endpoint);
                this.onException(Exception.class).handled(true).process((Processor)new AbstractDuccComponent.ErrorProcessor((AbstractDuccComponent)ResourceManagerComponent.this));
                this.from(endpoint).routeId("RMAdminRoute").unmarshal().xstream().process((Processor)new RmAdminEventProcessor(delegate));
            }
        });
        if (logger != null) {
            logger.info("startRMAdminChannel", null, new Object[]{"Admin Channel Activated on endpoint:" + endpoint});
        }
    }

    public void start(DuccService service, String[] args) throws Exception {
        String methodName = "start";
        this.converter = new JobManagerConverter(this.scheduler, this.stabilityManager);
        super.start(service, args);
        DuccDaemonRuntimeProperties.getInstance().boot(DuccDaemonRuntimeProperties.DaemonName.ResourceManager, super.getProcessJmxUrl());
        this.initStability = SystemPropertyResolver.getIntProperty((String)"ducc.rm.init.stability", (int)3);
        this.nodeStability = SystemPropertyResolver.getIntProperty((String)"ducc.rm.node.stability", (int)5);
        this.nodeMetricsUpdateRate = SystemPropertyResolver.getIntProperty((String)"ducc.agent.node.metrics.publish.rate", (int)60000);
        this.schedulingRatio = SystemPropertyResolver.getIntProperty((String)"ducc.rm.state.publish.ratio", (int)4);
        this.schedulingEpoch = SystemPropertyResolver.getIntProperty((String)"ducc.rm.state.publish.rate", (int)60000);
        String adminEndpoint = System.getProperty("ducc.rm.admin.endpoint");
        if (adminEndpoint == null) {
            logger.warn(methodName, null, new Object[]{"No admin endpoint configured.  Not starting admin channel."});
        } else {
            this.startRmAdminChannel(adminEndpoint, this);
        }
        this.scheduler.init();
        this.startStabilityTimer();
        Thread rmThread = new Thread(this);
        rmThread.setDaemon(true);
        rmThread.start();
        this.schedulerReady = true;
    }

    @Override
    public RmStateDuccEvent getState() throws Exception {
        String methodName = "getState";
        JobManagerUpdate jobManagerUpdate = null;
        try {
            logger.info(methodName, null, new Object[]{"-------------------- Entering scheduling loop --------------------"});
            jobManagerUpdate = this.scheduler.schedule();
            logger.info(methodName, null, new Object[]{"-------------------- Scheduling loop returns  --------------------"});
        }
        catch (Exception e1) {
            logger.error(methodName, null, new Object[]{"Error running scheduler:", e1});
        }
        try {
            if (jobManagerUpdate != null) {
                return this.converter.createState(jobManagerUpdate);
            }
        }
        catch (Exception e) {
            logger.error(methodName, null, new Object[]{"Error converting state for Orchestrator", e});
        }
        return null;
    }

    public void setTransportConfiguration(DuccEventDispatcher eventDispatcher, String endpoint) {
        this.eventDispatcher = eventDispatcher;
        this.stateEndpoint = endpoint;
    }

    @Override
    public void run() {
        while (true) {
            this.runScheduler();
        }
    }

    public void runScheduler() {
        String methodName = "runScheduler";
        while (true) {
            try {
                Thread.sleep(this.schedulingEpoch);
            }
            catch (InterruptedException e) {
                logger.info(methodName, null, new Object[]{"Scheduling wait interrupted, executing out-of-band epoch."});
            }
            try {
                logger.info(methodName, null, new Object[]{"--------", ++this.epoch_counter, "------- Entering scheduling loop --------------------"});
                JobManagerUpdate jobManagerUpdate = this.scheduler.schedule();
                if (jobManagerUpdate != null) {
                    RmStateDuccEvent ev = this.converter.createState(jobManagerUpdate);
                    this.eventDispatcher.dispatch(this.stateEndpoint, (DuccEvent)ev, "");
                }
                logger.info(methodName, null, new Object[]{"--------", this.epoch_counter, "------- Scheduling loop returns  --------------------"});
                continue;
            }
            catch (Throwable e1) {
                logger.fatal(methodName, null, e1, new Object[0]);
                continue;
            }
            break;
        }
    }

    protected void startStabilityTimer() {
        String methodName = "startStabilityTimer";
        logger.info(methodName, null, new Object[]{"Starting stability timer[", this.nodeMetricsUpdateRate, "] init stability[", this.initStability, "]"});
        this.stabilityTimer.schedule((TimerTask)new StabilityTask(), this.nodeMetricsUpdateRate);
    }

    @Override
    public void onOrchestratorStateUpdate(DuccWorkMap map) {
        String methodName = "onJobManagerStateUpdate";
        try {
            logger.info(methodName, null, new Object[]{"-------> OR state arrives"});
            this.converter.eventArrives(map);
        }
        catch (Throwable e) {
            logger.error(methodName, null, new Object[]{"Excepton processing Orchestrator event:", e});
        }
    }

    private class StabilityTask
    extends TimerTask {
        private StabilityTask() {
        }

        @Override
        public void run() {
            if (++ResourceManagerComponent.this.stabilityCount < ResourceManagerComponent.this.initStability) {
                logger.info("NodeStability", null, new Object[]{"NodeStability wait:  Countdown", ResourceManagerComponent.this.stabilityCount, ":", ResourceManagerComponent.this.initStability});
                ResourceManagerComponent.this.stabilityTimer.schedule((TimerTask)new StabilityTask(), ResourceManagerComponent.this.nodeMetricsUpdateRate);
            } else {
                ResourceManagerComponent.this.stabilityTimer = null;
                ResourceManagerComponent.this.scheduler.start();
                logger.info("NodeStability", null, new Object[]{"Initial node stability reached: scheduler started."});
            }
        }
    }

    class RmAdminEventProcessor
    implements Processor {
        final AbstractDuccComponent delegate;

        public RmAdminEventProcessor(AbstractDuccComponent delegate) {
            this.delegate = delegate;
        }

        public void process(Exchange exchange) throws Exception {
            String methodName = "RmAdminEventProcessor.process";
            Object body = exchange.getIn().getBody();
            logger.info(methodName, null, new Object[]{"Received Admin Message of Type:", body.getClass().getName()});
            RmAdminReply reply = null;
            if (body instanceof DuccAdminEvent) {
                DuccAdminEvent dae = (DuccAdminEvent)body;
                if (body instanceof RmAdminVaryOff) {
                    if (!ResourceManagerComponent.this.validateAdministrator(dae)) {
                        reply = new RmAdminReply("Not authorized");
                    } else {
                        RmAdminVaryOff vo = (RmAdminVaryOff)body;
                        reply = new RmAdminReply(ResourceManagerComponent.this.scheduler.varyoff(vo.getNodes()));
                    }
                } else if (body instanceof RmAdminVaryOn) {
                    if (!ResourceManagerComponent.this.validateAdministrator(dae)) {
                        reply = new RmAdminReply("Not authorized");
                    } else {
                        RmAdminVaryOn vo = (RmAdminVaryOn)body;
                        reply = new RmAdminReply(ResourceManagerComponent.this.scheduler.varyon(vo.getNodes()));
                    }
                } else if (body instanceof RmAdminQLoad) {
                    reply = ResourceManagerComponent.this.scheduler.queryLoad();
                } else if (body instanceof RmAdminQOccupancy) {
                    reply = ResourceManagerComponent.this.scheduler.queryOccupancy();
                }
            } else {
                reply = new RmAdminReply("Unrecognized RM admin request.");
            }
            exchange.getIn().setBody((Object)reply);
        }
    }
}

