/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.node.etl;

import com.alibaba.otter.node.common.config.ConfigClientService;
import com.alibaba.otter.node.common.config.NodeTaskListener;
import com.alibaba.otter.node.common.config.NodeTaskService;
import com.alibaba.otter.node.common.config.model.NodeTask;
import com.alibaba.otter.node.etl.OtterContextLocator;
import com.alibaba.otter.node.etl.OtterControllerMBean;
import com.alibaba.otter.node.etl.common.datasource.DataSourceService;
import com.alibaba.otter.node.etl.common.db.dialect.DbDialectFactory;
import com.alibaba.otter.node.etl.common.jmx.StageAggregationCollector;
import com.alibaba.otter.node.etl.common.task.GlobalTask;
import com.alibaba.otter.node.etl.extract.ExtractTask;
import com.alibaba.otter.node.etl.load.LoadTask;
import com.alibaba.otter.node.etl.select.SelectTask;
import com.alibaba.otter.node.etl.transform.TransformTask;
import com.alibaba.otter.shared.arbitrate.ArbitrateEventService;
import com.alibaba.otter.shared.arbitrate.ArbitrateManageService;
import com.alibaba.otter.shared.arbitrate.impl.manage.NodeSessionExpired;
import com.alibaba.otter.shared.arbitrate.impl.zookeeper.SessionExpiredNotification;
import com.alibaba.otter.shared.arbitrate.impl.zookeeper.ZooKeeperClient;
import com.alibaba.otter.shared.common.model.config.ConfigException;
import com.alibaba.otter.shared.common.model.config.enums.StageType;
import com.alibaba.otter.shared.common.model.config.node.Node;
import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline;
import com.alibaba.otter.shared.common.utils.AddressUtils;
import com.alibaba.otter.shared.common.utils.JsonUtils;
import com.alibaba.otter.shared.common.utils.version.VersionInfo;
import com.google.common.base.Function;
import com.google.common.collect.MapMaker;
import com.google.common.collect.OtterMigrateMap;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.lang.management.OperatingSystemMXBean;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OtterController
implements NodeTaskListener,
OtterControllerMBean {
    private static final Logger logger = LoggerFactory.getLogger(OtterController.class);
    private Map<Long, Map<StageType, GlobalTask>> controllers = OtterMigrateMap.makeComputingMap((Function)new Function<Long, Map<StageType, GlobalTask>>(){

        public Map<StageType, GlobalTask> apply(Long pipelineId) {
            return new MapMaker().makeMap();
        }
    });
    private ConfigClientService configClientService;
    private ArbitrateManageService arbitrateManageService;
    private NodeTaskService nodeTaskService;
    private DataSourceService dataSourceService;
    private DbDialectFactory dbDialectFactory;
    private ArbitrateEventService arbitrateEventService;
    private ExecutorService executorService;
    private StageAggregationCollector stageAggregationCollector;

    public void start() throws Throwable {
        this.initNid();
        this.nodeTaskService.addListener((NodeTaskListener)this);
    }

    public void stop() throws Throwable {
        for (Map<StageType, GlobalTask> tasks : this.controllers.values()) {
            for (GlobalTask task : tasks.values()) {
                try {
                    task.shutdown();
                }
                catch (Exception e) {
                    logger.error("##shutdown task error!", (Throwable)e);
                }
            }
        }
        try {
            Long nid = this.configClientService.currentNode().getId();
            this.arbitrateManageService.nodeEvent().destory(Long.valueOf(nid));
        }
        catch (Exception e) {
            logger.error("##destory node error!", (Throwable)e);
        }
        try {
            this.arbitrateEventService.toolEvent().release();
        }
        catch (Exception e) {
            logger.error("##destory arbitrate error!", (Throwable)e);
        }
        try {
            this.nodeTaskService.stopNode();
        }
        catch (Exception e) {
            logger.error("##stop node error!", (Throwable)e);
        }
        try {
            OtterContextLocator.close();
        }
        catch (Exception e) {
            logger.error("##cloes spring error!", (Throwable)e);
        }
        ZooKeeperClient.destory();
    }

    public boolean process(List<NodeTask> nodeTasks) {
        if (nodeTasks == null || nodeTasks.isEmpty()) {
            return true;
        }
        for (NodeTask nodeTask : nodeTasks) {
            boolean shutdown = nodeTask.isShutdown();
            Long pipelineId = nodeTask.getPipeline().getId();
            if (shutdown) {
                Map<StageType, GlobalTask> tasks = this.controllers.remove(pipelineId);
                if (tasks != null) {
                    logger.info("INFO ## shutdown this pipeline sync ,the pipelineId = {} and tasks = {}", (Object)pipelineId, tasks.keySet());
                    this.stopPipeline(pipelineId, tasks);
                    continue;
                }
                logger.info("INFO ## this pipeline id = {} is not start sync", (Object)pipelineId);
                continue;
            }
            this.startPipeline(nodeTask);
        }
        return true;
    }

    public void startPipeline(NodeTask nodeTask) {
        Long pipelineId = nodeTask.getPipeline().getId();
        this.releasePipeline(pipelineId);
        Map<StageType, GlobalTask> tasks = this.controllers.get(pipelineId);
        List stage = nodeTask.getStage();
        List event = nodeTask.getEvent();
        for (int i = 0; i < stage.size(); ++i) {
            StageType stageType = (StageType)stage.get(i);
            NodeTask.TaskEvent taskEvent = (NodeTask.TaskEvent)event.get(i);
            if (taskEvent.isCreate()) {
                this.startTask(nodeTask.getPipeline(), tasks, stageType);
                continue;
            }
            this.stopTask(tasks, stageType);
        }
    }

    private void startTask(Pipeline pipeline, Map<StageType, GlobalTask> tasks, StageType taskType) {
        if (tasks.get(taskType) != null && tasks.get(taskType).isAlive()) {
            logger.warn("WARN ## this task = {} has started", (Object)taskType);
        }
        GlobalTask task = null;
        if (taskType.isSelect()) {
            task = new SelectTask(pipeline.getId());
        } else if (taskType.isExtract()) {
            task = new ExtractTask(pipeline.getId());
        } else if (taskType.isTransform()) {
            task = new TransformTask(pipeline.getId());
        } else if (taskType.isLoad()) {
            task = new LoadTask(pipeline.getId());
        }
        if (task != null) {
            OtterContextLocator.autowire(task);
            task.start();
            tasks.put(taskType, task);
            logger.info("INFO ## start this task = {} success", (Object)taskType.toString());
        }
    }

    private void stopTask(Map<StageType, GlobalTask> tasks, StageType taskType) {
        GlobalTask task = tasks.remove(taskType);
        if (task != null) {
            task.shutdown();
            logger.info("INFO ## taskName = {} has shutdown", (Object)taskType);
        } else {
            logger.info("INFo ## taskName = {} is not started", (Object)taskType);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stopPipeline(Long pipelineId, Map<StageType, GlobalTask> tasks) {
        for (GlobalTask task : tasks.values()) {
            try {
                task.shutdown();
            }
            catch (Exception e) {
                logger.error("## stop s/e/t/l task error!", (Throwable)e);
            }
            finally {
                tasks.remove(task);
            }
        }
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException e) {
            logger.error("ERROR ## ", (Throwable)e);
        }
        this.releasePipeline(pipelineId);
        this.arbitrateEventService.toolEvent().release(pipelineId);
    }

    private void releasePipeline(Long pipelineId) {
        this.dataSourceService.destroy(pipelineId);
        this.dbDialectFactory.destory(pipelineId);
    }

    private void initNid() {
        String nid = System.getProperty("nid");
        if (StringUtils.isEmpty((String)nid)) {
            throw new ConfigException("nid is not set!");
        }
        logger.info("INFO ## the nodeId = {}", (Object)nid);
        this.checkNidVaild(nid);
        this.arbitrateManageService.nodeEvent().init(Long.valueOf(nid));
        NodeSessionExpired sessionExpired = new NodeSessionExpired();
        sessionExpired.setNodeEvent(this.arbitrateManageService.nodeEvent());
        ZooKeeperClient.registerNotification((SessionExpiredNotification)sessionExpired);
    }

    private void checkNidVaild(String nid) {
        Node node = this.configClientService.currentNode();
        String hostIp = AddressUtils.getHostIp();
        String nodeIp = node.getIp();
        int nodePort = node.getPort().intValue();
        if (!AddressUtils.isHostIp((String)nodeIp)) {
            throw new IllegalArgumentException(String.format("node[%s] ip[%s] port[%s] , but your host ip[%s] is not matched!", nid, nodeIp, nodePort, hostIp));
        }
    }

    @Override
    public String getHeapMemoryUsage() {
        MemoryUsage memoryUsage = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
        return JsonUtils.marshalToString((Object)memoryUsage);
    }

    @Override
    public String getNodeSystemInfo() {
        OperatingSystemMXBean mbean = ManagementFactory.getOperatingSystemMXBean();
        StringBuilder buf = new StringBuilder();
        buf.append("").append(mbean.getName()).append(' ').append(mbean.getVersion()).append(' ').append(mbean.getArch());
        buf.append(" @ ").append(mbean.getAvailableProcessors()).append(" cores");
        buf.append(" , \u3010 load average:").append(mbean.getSystemLoadAverage()).append(" \u3011");
        return buf.toString();
    }

    @Override
    public String getNodeVersionInfo() {
        return VersionInfo.getVersion() + " [ r" + VersionInfo.getRevision() + " ] @ " + VersionInfo.getDate();
    }

    @Override
    public int getRunningPipelineCount() {
        return this.controllers.size();
    }

    @Override
    public List<Long> getRunningPipelines() {
        return new ArrayList<Long>(this.controllers.keySet());
    }

    @Override
    public int getThreadActiveSize() {
        if (this.executorService instanceof ThreadPoolExecutor) {
            ThreadPoolExecutor pool = (ThreadPoolExecutor)this.executorService;
            return pool.getActiveCount();
        }
        return 0;
    }

    @Override
    public int getThreadPoolSize() {
        if (this.executorService instanceof ThreadPoolExecutor) {
            ThreadPoolExecutor pool = (ThreadPoolExecutor)this.executorService;
            return pool.getCorePoolSize();
        }
        return 0;
    }

    @Override
    public void setThreadPoolSize(int size) {
        if (this.executorService instanceof ThreadPoolExecutor) {
            ThreadPoolExecutor pool = (ThreadPoolExecutor)this.executorService;
            pool.setCorePoolSize(size);
            pool.setMaximumPoolSize(size);
        }
    }

    @Override
    public void setProfile(boolean profile) {
        this.stageAggregationCollector.setProfiling(profile);
    }

    @Override
    public boolean isSelectRunning(Long pipelineId) {
        return this.controllers.get(pipelineId).containsKey(StageType.SELECT);
    }

    @Override
    public boolean isExtractRunning(Long pipelineId) {
        return this.controllers.get(pipelineId).containsKey(StageType.EXTRACT);
    }

    @Override
    public boolean isTransformRunning(Long pipelineId) {
        return this.controllers.get(pipelineId).containsKey(StageType.TRANSFORM);
    }

    @Override
    public boolean isLoadRunning(Long pipelineId) {
        return this.controllers.get(pipelineId).containsKey(StageType.LOAD);
    }

    @Override
    public String selectStageAggregation(Long pipelineId) {
        return this.stageAggregationCollector.histogram(pipelineId, StageType.SELECT);
    }

    @Override
    public String extractStageAggregation(Long pipelineId) {
        return this.stageAggregationCollector.histogram(pipelineId, StageType.EXTRACT);
    }

    @Override
    public String transformStageAggregation(Long pipelineId) {
        return this.stageAggregationCollector.histogram(pipelineId, StageType.TRANSFORM);
    }

    @Override
    public String loadStageAggregation(Long pipelineId) {
        return this.stageAggregationCollector.histogram(pipelineId, StageType.LOAD);
    }

    @Override
    public String selectPendingProcess(Long pipelineId) {
        return this.pendingProcess(pipelineId, StageType.SELECT);
    }

    @Override
    public String extractPendingProcess(Long pipelineId) {
        return this.pendingProcess(pipelineId, StageType.EXTRACT);
    }

    @Override
    public String transformPendingProcess(Long pipelineId) {
        return this.pendingProcess(pipelineId, StageType.TRANSFORM);
    }

    @Override
    public String loadPendingProcess(Long pipelineId) {
        return this.pendingProcess(pipelineId, StageType.LOAD);
    }

    private String pendingProcess(Long pipelineId, StageType stage) {
        GlobalTask task = this.controllers.get(pipelineId).get(stage);
        if (task != null) {
            return "stage:" + stage + " , pending:[" + StringUtils.join(task.getPendingProcess(), (char)',') + "]";
        }
        return "node don't running stage:" + stage;
    }

    public void setNodeTaskService(NodeTaskService nodeTaskService) {
        this.nodeTaskService = nodeTaskService;
    }

    public void setConfigClientService(ConfigClientService configClientService) {
        this.configClientService = configClientService;
    }

    public void setArbitrateManageService(ArbitrateManageService arbitrateManageService) {
        this.arbitrateManageService = arbitrateManageService;
    }

    public void setDataSourceService(DataSourceService dataSourceService) {
        this.dataSourceService = dataSourceService;
    }

    public void setDbDialectFactory(DbDialectFactory dbDialectFactory) {
        this.dbDialectFactory = dbDialectFactory;
    }

    public void setArbitrateEventService(ArbitrateEventService arbitrateEventService) {
        this.arbitrateEventService = arbitrateEventService;
    }

    public void setStageAggregationCollector(StageAggregationCollector stageAggregationCollector) {
        this.stageAggregationCollector = stageAggregationCollector;
    }

    public void setExecutorService(ExecutorService executorService) {
        this.executorService = executorService;
    }
}

