/*
 * Decompiled with CFR 0.152.
 */
package org.apache.uima.ducc.rm;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.uima.ducc.common.Node;
import org.apache.uima.ducc.common.NodeIdentity;
import org.apache.uima.ducc.common.utils.DuccCollectionUtils;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.rm.NodeStability;
import org.apache.uima.ducc.rm.scheduler.IJobManager;
import org.apache.uima.ducc.rm.scheduler.IRmJob;
import org.apache.uima.ducc.rm.scheduler.ISchedulerMain;
import org.apache.uima.ducc.rm.scheduler.JobManagerUpdate;
import org.apache.uima.ducc.rm.scheduler.Machine;
import org.apache.uima.ducc.rm.scheduler.ResourceClass;
import org.apache.uima.ducc.rm.scheduler.RmJob;
import org.apache.uima.ducc.rm.scheduler.SchedConstants;
import org.apache.uima.ducc.rm.scheduler.SchedulingException;
import org.apache.uima.ducc.rm.scheduler.Share;
import org.apache.uima.ducc.transport.event.RmStateDuccEvent;
import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
import org.apache.uima.ducc.transport.event.common.IDuccPerWorkItemStatistics;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.IDuccProcessMap;
import org.apache.uima.ducc.transport.event.common.IDuccReservation;
import org.apache.uima.ducc.transport.event.common.IDuccReservationMap;
import org.apache.uima.ducc.transport.event.common.IDuccSchedulingInfo;
import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo;
import org.apache.uima.ducc.transport.event.common.IDuccWork;
import org.apache.uima.ducc.transport.event.common.IDuccWorkExecutable;
import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
import org.apache.uima.ducc.transport.event.common.IDuccWorkMap;
import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation;
import org.apache.uima.ducc.transport.event.common.IDuccWorkService;
import org.apache.uima.ducc.transport.event.common.IProcessState;
import org.apache.uima.ducc.transport.event.common.ITimeWindow;
import org.apache.uima.ducc.transport.event.rm.IRmJobState;
import org.apache.uima.ducc.transport.event.rm.Resource;
import org.apache.uima.ducc.transport.event.rm.RmJobState;

public class JobManagerConverter
implements IJobManager,
SchedConstants {
    DuccLogger logger = DuccLogger.getLogger(JobManagerConverter.class, (String)"RM");
    ISchedulerMain scheduler;
    NodeStability nodeStability = null;
    DuccWorkMap localMap = null;
    JobManagerUpdate lastJobManagerUpdate = new JobManagerUpdate();
    Map<IRmJob, IRmJob> refusedJobs = new HashMap<IRmJob, IRmJob>();
    Map<DuccId, IRmJobState> blacklistedResources = new HashMap<DuccId, IRmJobState>();
    boolean recovery = false;
    boolean first_or_state = true;
    Map<DuccId, IRmJobState> previousJobState = new HashMap<DuccId, IRmJobState>();

    public JobManagerConverter(ISchedulerMain scheduler, NodeStability ns) {
        this.scheduler = scheduler;
        this.localMap = new DuccWorkMap();
        this.nodeStability = ns;
        DuccLogger.setUnthreaded();
        this.recovery = SystemPropertyResolver.getBooleanProperty((String)"ducc.rm.fast.recovery", (boolean)true);
    }

    int toInt(String s, int deflt) {
        try {
            int val = Integer.parseInt(s);
            return val == 0 ? deflt : val;
        }
        catch (Throwable t) {
            return deflt;
        }
    }

    long toLong(String s, long deflt) {
        try {
            long val = Long.parseLong(s);
            return val == 0L ? deflt : val;
        }
        catch (Throwable t) {
            return deflt;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void refuse(IRmJob j, String reason) {
        j.refuse(reason);
        Map<IRmJob, IRmJob> map = this.refusedJobs;
        synchronized (map) {
            this.refusedJobs.put(j, j);
        }
    }

    void blacklistJob(IDuccWork job, long memory, boolean evict) {
        String methodName = "blacklistJob";
        LinkedHashMap<DuccId, Resource> all_shares = null;
        LinkedHashMap<DuccId, Resource> shrunken_shares = null;
        LinkedHashMap expanded_shares = null;
        if (evict) {
            all_shares = new LinkedHashMap<DuccId, Resource>();
            shrunken_shares = new LinkedHashMap<DuccId, Resource>();
            expanded_shares = new LinkedHashMap();
        }
        IDuccProcessMap pm = ((IDuccWorkExecutable)job).getProcessMap();
        int quantum = 0;
        for (IDuccProcess proc : pm.values()) {
            NodeIdentity ni = proc.getNodeIdentity();
            Machine m = this.scheduler.getMachine(ni);
            int share_order = 1;
            if (m == null) continue;
            quantum = m.getQuantum();
            if (proc.isActive() || proc.getProcessState() == IProcessState.ProcessState.Undefined) {
                this.logger.info(methodName, job.getDuccId(), new Object[]{"blacklist", proc.getDuccId(), "state", proc.getProcessState(), "isActive", proc.isActive(), "isComplete", proc.isComplete()});
                m.blacklist(job.getDuccId(), proc.getDuccId(), memory);
                if (!evict) continue;
                share_order = m.getShareOrder();
                Resource r = new Resource(proc.getDuccId(), proc.getNode(), false, share_order, 0L);
                all_shares.put(proc.getDuccId(), r);
                shrunken_shares.put(proc.getDuccId(), r);
                continue;
            }
            this.logger.info(methodName, job.getDuccId(), new Object[]{"whitelist", proc.getDuccId(), "state", proc.getProcessState(), "isActive", proc.isActive(), "isComplete", proc.isComplete()});
            m.whitelist(proc.getDuccId());
        }
        if (evict && shrunken_shares.size() > 0) {
            RmJobState rjs = new RmJobState(job.getDuccId(), quantum, all_shares, shrunken_shares, expanded_shares);
            rjs.setDuccType(job.getDuccType());
            this.blacklistedResources.put(job.getDuccId(), (IRmJobState)rjs);
        }
    }

    void blacklistReservation(IDuccWork job) {
        String methodName = "blacklistReservation";
        this.logger.trace(methodName, job.getDuccId(), new Object[]{"enter"});
        IDuccReservationMap drm = ((IDuccWorkReservation)job).getReservationMap();
        for (IDuccReservation idr : drm.values()) {
            NodeIdentity ni = idr.getNodeIdentity();
            Machine m = this.scheduler.getMachine(ni);
            if (m == null) {
                this.logger.warn(methodName, job.getDuccId(), new Object[]{"Problem whitelisting: cannot find machine", ni.getName()});
                continue;
            }
            m.blacklist(job.getDuccId(), idr.getDuccId(), -1L);
        }
    }

    void blacklist(IDuccWork job, int memory) {
        String methodName = "blacklist";
        this.logger.trace(methodName, job.getDuccId(), new Object[]{"enter"});
        switch (job.getDuccType()) {
            case Job: {
                this.blacklistJob(job, memory, true);
                break;
            }
            case Service: 
            case Pop: {
                switch (((IDuccWorkService)job).getServiceDeploymentType()) {
                    case uima: 
                    case custom: {
                        this.blacklistJob(job, memory, true);
                        break;
                    }
                    case other: {
                        this.blacklistJob(job, memory, false);
                    }
                }
                break;
            }
            case Reservation: {
                this.blacklistReservation(job);
                break;
            }
            default: {
                this.logger.error(methodName, job.getDuccId(), new Object[]{"Unknown job type", job.getDuccType(), "ignoring in blacklist."});
            }
        }
    }

    void whitelist(IDuccWork job) {
        String methodName = "whitelist";
        switch (job.getDuccType()) {
            case Job: 
            case Service: 
            case Pop: {
                for (IDuccProcess idp : ((IDuccWorkJob)job).getProcessMap().values()) {
                    NodeIdentity ni = idp.getNodeIdentity();
                    Machine m = this.scheduler.getMachine(ni);
                    if (m == null) {
                        this.logger.warn(methodName, job.getDuccId(), new Object[]{"Problem whitelisting: cannot find machine", ni.getName()});
                        continue;
                    }
                    m.whitelist(idp.getDuccId());
                }
                break;
            }
            case Reservation: {
                for (IDuccReservation idp : ((IDuccWorkReservation)job).getReservationMap().values()) {
                    NodeIdentity ni = idp.getNodeIdentity();
                    Machine m = this.scheduler.getMachine(ni);
                    if (m == null) {
                        this.logger.warn(methodName, job.getDuccId(), new Object[]{"Problem whitelisting: cannot find machine", ni.getName()});
                        continue;
                    }
                    m.whitelist(idp.getDuccId());
                }
                break;
            }
            default: {
                this.logger.error(methodName, job.getDuccId(), new Object[]{"Unknown job type", job.getDuccType(), "ignoring in blacklist."});
            }
        }
    }

    String getElapsedTime(ITimeWindow w) {
        if (w == null) {
            return "0";
        }
        return w.getDiff();
    }

    boolean isRecovered(IDuccWork job) {
        switch (job.getDuccType()) {
            case Job: 
            case Service: 
            case Pop: {
                IDuccProcessMap pm = ((IDuccWorkExecutable)job).getProcessMap();
                return pm.size() > 0 && !job.isCompleted();
            }
            case Reservation: {
                IDuccReservationMap rm = ((IDuccWorkReservation)job).getReservationMap();
                return rm.size() > 0 && !job.isCompleted();
            }
        }
        throw new IllegalStateException("Cannot recognize job type for " + job.getDuccId() + ": found " + job.getDuccType());
    }

    void jobUpdate(Object state, IDuccWork job) {
        String methodName = "jobUpdate";
        IDuccSchedulingInfo si = job.getSchedulingInfo();
        DuccId jobid = job.getDuccId();
        IRmJob j = this.scheduler.getJob(jobid);
        if (j == null) {
            return;
        }
        int total_work = this.toInt(si.getWorkItemsTotal(), this.scheduler.getDefaultNTasks());
        int completed_work = this.toInt(si.getWorkItemsCompleted(), 0) + this.toInt(si.getWorkItemsError(), 0);
        int max_shares = this.toInt(si.getProcessesMax(), Integer.MAX_VALUE);
        int existing_max_shares = j.getMaxShares();
        int remaining_work = Math.max(total_work - completed_work, 0);
        double arith_mean = Double.NaN;
        IDuccPerWorkItemStatistics stats = si.getPerWorkItemStatistics();
        if (stats != null) {
            arith_mean = stats.getMean();
        }
        double skewed_mean = si.getAvgTimeForWorkItemsSkewedByActive();
        this.logger.info(methodName, job.getDuccId(), new Object[]{String.format("tot: %d %s -> %s compl: %s err: %s rem: %d mean: %f skew: %f", total_work, state, job.getStateObject(), si.getWorkItemsCompleted(), si.getWorkItemsError(), remaining_work, arith_mean, skewed_mean)});
        if (skewed_mean > 0.0) {
            arith_mean = skewed_mean;
        }
        if (max_shares != existing_max_shares) {
            j.setMaxShares(max_shares);
            this.logger.info(methodName, job.getDuccId(), new Object[]{"Max shares adjusted from", existing_max_shares, "to", max_shares, "(incoming)", si.getProcessesMax()});
        }
        j.setNQuestions(total_work, remaining_work, arith_mean);
        if (job instanceof IDuccWorkJob) {
            if (j.setInitWait(((IDuccWorkJob)job).isRunnable())) {
                this.logger.info(methodName, jobid, new Object[]{"Set Initialized."});
                this.scheduler.signalInitialized(j);
            }
            if (((IDuccWorkJob)job).isCompleting()) {
                j.markComplete();
            }
        } else {
            j.setInitWait(true);
        }
    }

    private boolean receiveExecutable(IRmJob j, IDuccWork job, boolean mustRecover) {
        String methodName = "receiveExecutable";
        IDuccWorkExecutable de = (IDuccWorkExecutable)job;
        IDuccProcessMap pm = de.getProcessMap();
        if (mustRecover) {
            for (IDuccProcess proc : pm.values()) {
                IProcessState.ProcessState state = proc.getProcessState();
                String pid = proc.getPID();
                NodeIdentity ni = proc.getNodeIdentity();
                if (proc.isComplete()) {
                    this.logger.debug(methodName, j.getId(), new Object[]{"Skipping process", pid, "on", ni.getName(), "beacause state is", state});
                    continue;
                }
                Machine m = this.scheduler.getMachine(ni);
                if (m == null) {
                    this.refuse(j, "Cannot restore job because node " + ni.getName() + " is unknown.");
                    return false;
                }
                DuccId id = proc.getDuccId();
                Share s = new Share(id, m, j, m.getShareOrder());
                long mem = proc.getResidentMemory();
                long investment = proc.getWiMillisInvestment();
                this.logger.info(methodName, j.getId(), new Object[]{"Assigning share in state", state, "pid", pid, "for recovery", s.toString()});
                j.recoverShare(s);
                s.update(j.getId(), mem, investment, state, proc.getTimeWindowInit(), pid);
            }
            this.logger.info(methodName, j.getId(), new Object[]{"Scheduling for recovery."});
            this.scheduler.signalRecovery(j);
        } else {
            this.logger.info(methodName, j.getId(), new Object[]{"Scheduling as new."});
            this.scheduler.signalNewWork(j);
        }
        return true;
    }

    private boolean receiveReservation(IRmJob j, IDuccWork job, boolean mustRecover) {
        String methodName = "receiveReservation";
        j.setReservation();
        IDuccWorkReservation dr = (IDuccWorkReservation)job;
        IDuccReservationMap rm = dr.getReservationMap();
        if (mustRecover) {
            for (IDuccReservation res : rm.values()) {
                NodeIdentity ni = res.getNodeIdentity();
                Machine m = this.scheduler.getMachine(ni);
                if (m == null) {
                    this.refuse(j, "Cannot restore reservation because node " + ni.getName() + " is unknown.");
                    return false;
                }
                DuccId id = res.getDuccId();
                Share s = new Share(id, m, j, m.getShareOrder());
                s.setFixed();
                j.recoverShare(s);
                this.logger.debug(methodName, j.getId(), new Object[]{"Assigning share for recovery", s.toString()});
            }
            this.logger.info(methodName, j.getId(), new Object[]{"Scheduling for recovery."});
            this.scheduler.signalRecovery(j);
        } else {
            this.logger.info(methodName, j.getId(), new Object[]{"Scheduling as new."});
            this.scheduler.signalNewWork(j);
        }
        return true;
    }

    boolean jobArrives(IDuccWork job) {
        String methodName = "jobArrives";
        this.logger.trace(methodName, job.getDuccId(), new Object[]{"Job arives"});
        this.logger.trace(methodName, job.getDuccId(), new Object[]{"Job is of type", job.getDuccType()});
        RmJob j = new RmJob(job.getDuccId());
        boolean mustRecover = this.isRecovered(job);
        IDuccSchedulingInfo si = job.getSchedulingInfo();
        IDuccStandardInfo sti = job.getStandardInfo();
        String name = sti.getDescription();
        if (name == null) {
            name = "A Job With No Name.";
        }
        String user_name = sti.getUser().trim();
        j.setUserName(user_name);
        j.setJobName(name);
        j.setServiceId(this.toLong(job.getServiceId(), 0L));
        int threads = this.toInt(si.getThreadsPerProcess(), this.scheduler.getDefaultNThreads());
        int user_priority = this.toInt(si.getSchedulingPriority(), 100);
        int total_work = this.toInt(si.getWorkItemsTotal(), this.scheduler.getDefaultNTasks());
        int completed_work = this.toInt(si.getWorkItemsCompleted(), 0);
        int remaining_work = Math.max(total_work - completed_work, 1);
        this.logger.info(methodName, job.getDuccId(), new Object[]{"total_work", total_work, "completed_work", completed_work, "remaining_work", remaining_work});
        int memory = this.toInt(si.getMemorySizeRequested(), this.scheduler.getDefaultMemory());
        String className = si.getSchedulingClass();
        if (className == null) {
            switch (job.getDuccType()) {
                case Job: {
                    className = this.scheduler.getDefaultFairShareName();
                    break;
                }
                case Service: 
                case Pop: 
                case Reservation: {
                    className = this.scheduler.getDefaultReserveName();
                }
            }
            if (className == null) {
                j.refuse("No scheduling class defined and no default class configured.");
                return false;
            }
        }
        j.setThreads(threads);
        j.setUserPriority(user_priority);
        j.setNQuestions(total_work, remaining_work, 0.0);
        j.setClassName(className);
        switch (si.getMemoryUnits()) {
            case GB: {
                break;
            }
            default: {
                this.logger.warn(methodName, job.getDuccId(), new Object[]{"Memory units other than GB are not currently supported.  Job returned."});
            }
        }
        j.setMemory(memory);
        j.init();
        j.setTimestamp(Long.parseLong(sti.getDateOfSubmission()));
        if (job instanceof IDuccWorkJob) {
            j.setInitWait(((IDuccWorkJob)job).isRunnable());
        } else {
            j.setInitWait(true);
        }
        j.setDuccType(job.getDuccType());
        switch (job.getDuccType()) {
            case Service: 
            case Pop: {
                if (((IDuccWorkService)job).getServiceDeploymentType() != IDuccWorkService.ServiceDeploymentType.other) break;
                j.setArbitraryProcess();
                break;
            }
        }
        boolean status = true;
        int max_processes = 0;
        ResourceClass rescl = this.scheduler.getResourceClass(className);
        if (rescl == null) {
            this.refuse(j, "Cannot find priority class " + className + " for job");
            this.blacklist(job, memory);
            return false;
        }
        if (!rescl.authorized(user_name)) {
            this.refuse(j, "User '" + user_name + "' not authorized to use class '" + className + "'");
            if (!mustRecover) {
                return false;
            }
        }
        j.setResourceClass(rescl);
        switch (job.getDuccType()) {
            case Service: 
            case Pop: {
                switch (rescl.getPolicy()) {
                    case FAIR_SHARE: {
                        this.refuse(j, "Services and managed reservations are not allowed to be FAIR_SHARE");
                        break;
                    }
                    case FIXED_SHARE: {
                        j.setMaxShares(1);
                        break;
                    }
                    case RESERVE: {
                        j.setMaxShares(1);
                    }
                }
                status = this.receiveExecutable(j, job, mustRecover);
                this.logger.trace(methodName, j.getId(), new Object[]{"Serivce, or Pop arrives, accepted:", status});
                break;
            }
            case Job: {
                max_processes = this.toInt(si.getProcessesMax(), 10);
                switch (rescl.getPolicy()) {
                    case FAIR_SHARE: {
                        j.setMaxShares(max_processes);
                        break;
                    }
                    case FIXED_SHARE: {
                        j.setMaxShares(max_processes);
                        break;
                    }
                    case RESERVE: {
                        j.setMaxShares(max_processes);
                    }
                }
                status = this.receiveExecutable(j, job, mustRecover);
                this.logger.trace(methodName, j.getId(), new Object[]{"Job arrives, accepted:", status});
                break;
            }
            case Reservation: {
                j.setMaxShares(1);
                status = this.receiveReservation(j, job, mustRecover);
                this.logger.trace(methodName, j.getId(), new Object[]{"Reservation arrives, accepted:", status});
                break;
            }
            default: {
                this.refuse(j, "Unknown job type: " + job.getDuccType());
                status = false;
            }
        }
        return status;
    }

    void jobRemoved(DuccId id) {
        String methodName = "jobRemoved";
        this.logger.trace(methodName, id, new Object[]{"Signalling removal"});
        this.scheduler.signalCompletion(id);
        this.localMap.removeDuccWork(id);
        this.logger.trace(methodName, id, new Object[]{"Remove signalled"});
    }

    public void reconcileProcesses(DuccId jobid, IDuccWork l, IDuccWork r) {
        String methodName = "reconcileProcess";
        IDuccProcessMap lpm = ((IDuccWorkJob)l).getProcessMap();
        IDuccProcessMap rpm = ((IDuccWorkJob)r).getProcessMap();
        DuccCollectionUtils.DuccMapDifference diffmap = DuccCollectionUtils.difference((Map)lpm, (Map)rpm);
        Map lproc = diffmap.getLeft();
        for (IDuccProcess p : lproc.values()) {
            Share s = this.scheduler.getShare(p.getDuccId());
            if (s != null) {
                long mem = p.getResidentMemory();
                long investment = p.getWiMillisInvestment();
                IProcessState.ProcessState state = p.getProcessState();
                String pid = p.getPID();
                this.logger.info(methodName, jobid, new Object[]{"New process ", s.toString(), mem, state, pid});
                if (s.update(jobid, mem, investment, state, p.getTimeWindowInit(), pid)) continue;
                throw new SchedulingException(jobid, "Process assignemnt arrives for share " + s.toString() + " but jobid " + jobid + " does not match share " + s.getJob().getId());
            }
            this.logger.warn(methodName, jobid, p.getDuccId(), new Object[]{"share not found?"});
        }
        Map rproc = diffmap.getRight();
        for (IDuccProcess p : rproc.values()) {
            Share s = this.scheduler.getShare(p.getDuccId());
            IRmJob j = this.scheduler.getJob(jobid);
            if (j == null) {
                throw new SchedulingException(jobid, "Process completion arrives for share " + s.toString() + " but job " + jobid + "cannot be found.");
            }
            switch (l.getDuccType()) {
                case Job: {
                    break;
                }
                default: {
                    j.markComplete();
                }
            }
            this.scheduler.signalCompletion(j, s);
            this.logger.info(methodName, jobid, new Object[]{String.format("Process %5s", p.getPID()), "Completion:", s.toString()});
        }
        for (DuccCollectionUtils.DuccMapValueDifference pd : diffmap) {
            long init_timeR;
            IDuccProcess pl = (IDuccProcess)pd.getLeft();
            IDuccProcess pr = (IDuccProcess)pd.getRight();
            Share sl = this.scheduler.getShare(pl.getDuccId());
            Share sr = this.scheduler.getShare(pr.getDuccId());
            String shareL = sl == null ? "<none>" : sl.toString();
            String shareR = sr == null ? "<none>" : sr.toString();
            ITimeWindow initL = pl.getTimeWindowInit();
            ITimeWindow initR = pr.getTimeWindowInit();
            long init_timeL = initL == null ? 0L : initL.getElapsedMillis();
            long l2 = init_timeR = initR == null ? 0L : initR.getElapsedMillis();
            if (this.logger.isTrace()) {
                this.logger.trace(methodName, jobid, new Object[]{"\n\tReconciling. incoming.(did, pid, mem, state, share, initTime, investment)", pl.getDuccId(), pl.getPID(), pl.getResidentMemory(), pl.getProcessState(), shareL, init_timeL, pl.getWiMillisInvestment(), "\n\tReconciling. existing.(did, pid, mem, state, share, initTime, investment)", pr.getDuccId(), pr.getPID(), pr.getResidentMemory(), pr.getProcessState(), shareR, init_timeR, pr.getWiMillisInvestment()});
            } else {
                if (pr.getPID() == null && pl.getPID() != null) {
                    this.logger.trace(methodName, jobid, new Object[]{String.format("Process %5s", pl.getPID()), "PID assignement for share", shareL});
                }
                if (pl.getProcessState() != pr.getProcessState()) {
                    this.logger.info(methodName, jobid, new Object[]{String.format("Process %5s", pl.getPID()), shareL, "State:", pr.getProcessState(), "->", pl.getProcessState(), this.getElapsedTime(pr.getTimeWindowInit()), this.getElapsedTime(pr.getTimeWindowRun())});
                }
            }
            long mem = pl.getResidentMemory();
            long investment = pl.getWiMillisInvestment();
            IProcessState.ProcessState state = pl.getProcessState();
            String pid = pl.getPID();
            Share s = this.scheduler.getShare(pl.getDuccId());
            if (pl.isActive()) {
                if (s == null) {
                    this.logger.warn(methodName, jobid, new Object[]{"Update for share from process", pl.getPID(), pl.getDuccId(), "but cannot find share."});
                    continue;
                }
                if (s.update(jobid, mem, investment, state, pl.getTimeWindowInit(), pid)) continue;
                throw new SchedulingException(jobid, "Process update arrives for share " + s.toString() + " but jobid " + jobid + " does not match job in share " + s.getJob().getId());
            }
            if (pl.isComplete()) {
                IRmJob j = this.scheduler.getJob(jobid);
                if (s != null) {
                    this.scheduler.signalCompletion(j, s);
                    this.logger.info(methodName, jobid, new Object[]{"Process", pl.getPID(), " completed due to state", state});
                }
                switch (l.getDuccType()) {
                    case Job: {
                        break;
                    }
                    default: {
                        j.markComplete();
                        break;
                    }
                }
                continue;
            }
            this.logger.info(methodName, jobid, new Object[]{"Process", pl.getPID(), "ignoring update because of state", state});
        }
    }

    public void eventArrives(IDuccWorkMap jobMap) {
        String methodName = "eventArrives";
        if (jobMap.size() == 0) {
            this.logger.debug(methodName, null, new Object[]{"No state from Orchestrator"});
            return;
        }
        if (!this.scheduler.isInitialized()) {
            return;
        }
        if (this.scheduler.mustRecover()) {
            this.localMap = new DuccWorkMap();
            this.lastJobManagerUpdate = new JobManagerUpdate();
            this.blacklistedResources.clear();
            this.refusedJobs.clear();
            this.first_or_state = true;
        }
        if (this.first_or_state) {
            this.first_or_state = false;
            this.scheduler.setRecovery(false);
            if (!this.recoverFromOrchestrator(jobMap)) {
                this.logger.info(methodName, null, new Object[]{"There are no active jobs in map so can't build up state. Waiting for init stability."});
                return;
            }
            if (this.recovery) {
                this.logger.info(methodName, null, new Object[]{"Fast recovery is enabled: Recovered state from Orchestrator, starting scheduler."});
                this.scheduler.start();
            }
        }
        if (!this.scheduler.ready()) {
            this.logger.info(methodName, null, new Object[]{"Orchestrator event is discarded: scheduler is waiting for init stability or is paused for reconfig.."});
            return;
        }
        DuccCollectionUtils.DuccMapDifference diffmap = DuccCollectionUtils.difference((Map)jobMap, (Map)this.localMap);
        for (Object o : jobMap.values()) {
            IDuccWork w = (IDuccWork)o;
            this.logger.trace(methodName, w.getDuccId(), new Object[]{"Arrives in JmStateEvent state =", w.getStateObject()});
        }
        Map jobs = diffmap.getLeft();
        for (IDuccWork w : jobs.values()) {
            if (w.isSchedulable()) {
                this.logger.info(methodName, w.getDuccId(), new Object[]{"Incoming, state = ", w.getStateObject()});
                try {
                    if (!this.jobArrives(w)) continue;
                    this.localMap.addDuccWork(w);
                }
                catch (Exception e) {
                    this.logger.error(methodName, w.getDuccId(), new Object[]{"Can't receive job because of exception", e});
                }
                continue;
            }
            this.logger.info(methodName, w.getDuccId(), new Object[]{"Received non-schedulable job, state = ", w.getStateObject()});
            this.whitelist(w);
        }
        jobs = diffmap.getRight();
        for (IDuccWork w : jobs.values()) {
            this.logger.info(methodName, w.getDuccId(), new Object[]{"Gone"});
            this.jobRemoved(w.getDuccId());
        }
        for (DuccCollectionUtils.DuccMapValueDifference jd : diffmap) {
            IDuccWork r = (IDuccWork)jd.getRight();
            IDuccWork l = (IDuccWork)jd.getLeft();
            if (!l.isSchedulable()) {
                this.logger.info(methodName, l.getDuccId(), new Object[]{"Removing unschedulable:", r.getStateObject(), "->", l.getStateObject()});
                this.jobRemoved(r.getDuccId());
                continue;
            }
            this.localMap.addDuccWork(l);
            this.scheduler.signalState(l.getDuccId(), l.getStateObject().toString());
            switch (l.getDuccType()) {
                case Job: {
                    this.jobUpdate(r.getStateObject(), l);
                    this.reconcileProcesses(l.getDuccId(), l, r);
                    break;
                }
                case Service: 
                case Pop: {
                    if (((IDuccWorkService)l).getServiceDeploymentType() == IDuccWorkService.ServiceDeploymentType.other) {
                        this.logger.info(methodName, l.getDuccId(), new Object[]{"[P] State: ", r.getStateObject(), "->", l.getStateObject()});
                        this.reconcileProcesses(l.getDuccId(), l, r);
                        break;
                    }
                    if (r.getStateObject() == l.getStateObject()) break;
                    this.logger.info(methodName, l.getDuccId(), new Object[]{"[S] State: ", r.getStateObject(), "->", l.getStateObject()});
                    this.reconcileProcesses(l.getDuccId(), l, r);
                    break;
                }
                case Reservation: {
                    if (r.getStateObject() == l.getStateObject()) break;
                    this.logger.info(methodName, l.getDuccId(), new Object[]{"[R] State: ", r.getStateObject(), "->", l.getStateObject()});
                    break;
                }
                case Undefined: {
                    throw new SchedulingException(l.getDuccId(), "Work arrives as type Undefined - should have been filtered out by now.");
                }
            }
        }
        this.logger.trace(methodName, null, new Object[]{"Done with JmStateDuccEvent with some jobs processed"});
    }

    Map<Share, Share> sanityCheckForOrchestrator(IRmJob j, Map<Share, Share> shares, Map<Share, Share> expanded) {
        String methodName = "sanityCheckForOrchestrator";
        IDuccWork w = this.localMap.findDuccWork(j.getId());
        if (w == null) {
            return null;
        }
        if (shares == null) {
            return null;
        }
        HashMap<Share, Share> ret = new HashMap<Share, Share>();
        switch (w.getDuccType()) {
            case Job: 
            case Service: {
                IDuccWorkExecutable de = (IDuccWorkExecutable)w;
                IDuccProcessMap pm = de.getProcessMap();
                for (Share s : shares.values()) {
                    IDuccProcess p = (IDuccProcess)pm.get((Object)s.getId());
                    if (p != null || expanded != null && expanded.containsKey(s)) continue;
                    this.logger.warn(methodName, j.getId(), new Object[]{"Redrive share assignment: ", s});
                    ret.put(s, s);
                }
                break;
            }
            case Reservation: {
                IDuccWorkReservation de = (IDuccWorkReservation)w;
                IDuccReservationMap rm = de.getReservationMap();
                for (Share s : shares.values()) {
                    IDuccReservation r = (IDuccReservation)rm.get((Object)s.getId());
                    if (r != null || expanded != null && expanded.containsKey(s)) continue;
                    this.logger.warn(methodName, j.getId(), new Object[]{"Redrive share assignment:", s});
                    ret.put(s, s);
                }
                break;
            }
        }
        return ret;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RmStateDuccEvent createState(JobManagerUpdate jmu) {
        String methodName = "createState";
        Map<DuccId, IRmJobState> rmJobState = null;
        if (jmu == null) {
            rmJobState = this.previousJobState;
        } else {
            rmJobState = new HashMap<DuccId, IRmJobState>();
            HashMap<IRmJob, IRmJob> refused = new HashMap<IRmJob, IRmJob>();
            Map<IRmJob, IRmJob> map = this.refusedJobs;
            synchronized (map) {
                refused.putAll(this.refusedJobs);
                this.refusedJobs.clear();
            }
            for (IRmJob j : refused.values()) {
                RmJobState rjs = new RmJobState(j.getId(), j.getRefusalReason());
                rjs.setDuccType(j.getDuccType());
                rmJobState.put(j.getId(), (IRmJobState)rjs);
            }
            Map<DuccId, IRmJob> jobs = jmu.getAllJobs();
            Map<DuccId, HashMap<Share, Share>> shrunken = jmu.getShrunkenShares();
            Map<DuccId, HashMap<Share, Share>> expanded = jmu.getExpandedShares();
            for (IRmJob j : jobs.values()) {
                Resource r;
                if (j.isRefused()) {
                    RmJobState rjs = new RmJobState(j.getId(), j.getRefusalReason());
                    rjs.setDuccType(j.getDuccType());
                    rmJobState.put(j.getId(), (IRmJobState)rjs);
                    this.jobRemoved(j.getId());
                    this.logger.warn(methodName, j.getId(), new Object[]{"Refusal: ", j.getRefusalReason()});
                    continue;
                }
                LinkedHashMap<DuccId, Resource> all_shares = new LinkedHashMap<DuccId, Resource>();
                LinkedHashMap<DuccId, Resource> shrunken_shares = new LinkedHashMap<DuccId, Resource>();
                LinkedHashMap<DuccId, Resource> expanded_shares = new LinkedHashMap<DuccId, Resource>();
                Map<Share, Share> shares = null;
                Map<Share, Share> redrive = null;
                shares = j.getAssignedShares();
                if (shares != null) {
                    ArrayList sorted = new ArrayList(shares.values());
                    Collections.sort(sorted, new RmJob.ShareByInvestmentSorter());
                    for (Share s : sorted) {
                        Resource r2 = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), s.getInitializationTime());
                        all_shares.put(s.getId(), r2);
                    }
                    redrive = this.sanityCheckForOrchestrator(j, shares, (Map<Share, Share>)expanded.get(j.getId()));
                }
                if ((shares = (Map)shrunken.get(j.getId())) != null) {
                    for (Share s : shares.values()) {
                        r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), 0L);
                        shrunken_shares.put(s.getId(), r);
                    }
                }
                if ((shares = (Map)expanded.get(j.getId())) != null) {
                    for (Share s : shares.values()) {
                        r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), 0L);
                        expanded_shares.put(s.getId(), r);
                    }
                }
                if (redrive != null) {
                    for (Share s : redrive.values()) {
                        r = new Resource(s.getId(), s.getNode(), s.isPurged(), s.getShareOrder(), 0L);
                        expanded_shares.put(s.getId(), r);
                    }
                }
                RmJobState rjs = new RmJobState(j.getId(), (j.getShareQuantum() >> 20) * j.getShareOrder(), all_shares, shrunken_shares, expanded_shares);
                rjs.setDuccType(j.getDuccType());
                rjs.setReason(j.getReason());
                rmJobState.put(j.getId(), (IRmJobState)rjs);
            }
            rmJobState.putAll(this.blacklistedResources);
            this.blacklistedResources.clear();
            this.previousJobState = rmJobState;
        }
        RmStateDuccEvent response = new RmStateDuccEvent(rmJobState);
        try {
            this.logger.info(methodName, null, new Object[]{"Schedule sent to Orchestrator"});
            this.logger.info(methodName, null, new Object[]{response.toString()});
        }
        catch (Exception e) {
            this.logger.error(methodName, null, (Throwable)e, new Object[0]);
        }
        return response;
    }

    boolean recoverFromOrchestrator(IDuccWorkMap jobmap) {
        String methodName = "recoverFromOrchestrator";
        HashMap<Node, Node> nodes = new HashMap<Node, Node>();
        block9: for (Object o : jobmap.values()) {
            IDuccWork w = (IDuccWork)o;
            String prefix = "?";
            switch (w.getDuccType()) {
                case Job: {
                    prefix = "JOB";
                    break;
                }
                case Service: {
                    prefix = "SVC";
                    break;
                }
                case Reservation: {
                    prefix = "RES";
                }
            }
            if (w.isCompleted()) {
                this.logger.info(methodName, w.getDuccId(), new Object[]{"Ignoring completed work:", w.getDuccType(), ":", w.getStateObject()});
                continue;
            }
            switch (w.getDuccType()) {
                case Job: 
                case Service: {
                    IDuccWorkExecutable de = (IDuccWorkExecutable)w;
                    IDuccProcessMap pm = de.getProcessMap();
                    this.logger.info(methodName, w.getDuccId(), new Object[]{"Receive:", prefix, w.getDuccType(), w.getStateObject(), "processes[", pm.size(), "] Completed:", w.isCompleted()});
                    for (IDuccProcess proc : pm.values()) {
                        String pid = proc.getPID();
                        IProcessState.ProcessState state = proc.getProcessState();
                        Node n = proc.getNode();
                        if (n == null) {
                            this.logger.info(methodName, w.getDuccId(), new Object[]{"   Process[", pid, "] state [", state, "] is complete[", proc.isComplete(), "] Node [N/A] mem[N/A"});
                            continue;
                        }
                        long mem = n.getNodeMetrics().getNodeMemory().getMemTotal();
                        this.logger.info(methodName, w.getDuccId(), new Object[]{"   Process[", pid, "] state [", state, "] is complete [", proc.isComplete(), "] Node [", n.getNodeIdentity().getName() + "." + proc.getDuccId(), "] mem [", mem, "]"});
                        this.logger.info(methodName, w.getDuccId(), new Object[]{"      Recover node[", n.getNodeIdentity().getName()});
                        nodes.put(n, n);
                    }
                    continue block9;
                }
                case Reservation: {
                    IDuccWorkExecutable de = (IDuccWorkReservation)w;
                    IDuccReservationMap rm = de.getReservationMap();
                    this.logger.info(methodName, w.getDuccId(), new Object[]{"Receive:", prefix, w.getDuccType(), w.getStateObject(), "processes[", rm.size(), "] Completed:", w.isCompleted()});
                    for (IDuccReservation r : rm.values()) {
                        Node n = r.getNode();
                        if (n == null) {
                            this.logger.info(methodName, w.getDuccId(), new Object[]{"    Node [N/A] mem[N/A"});
                            continue;
                        }
                        long mem = n.getNodeMetrics().getNodeMemory().getMemTotal();
                        this.logger.info(methodName, w.getDuccId(), new Object[]{"   Node[", n.getNodeIdentity().getName(), "] mem[", mem, "]"});
                        nodes.put(n, n);
                    }
                    continue block9;
                }
                default: {
                    this.logger.info(methodName, w.getDuccId(), new Object[]{"Received work of type ?", w.getDuccType()});
                }
            }
        }
        this.logger.info(methodName, null, new Object[]{"Recovered[", nodes.size(), "] nodes from OR state."});
        for (Node n : nodes.values()) {
            this.nodeStability.nodeArrives(n);
        }
        return nodes.size() != 0;
    }
}

