package com.alibaba.otter.node.etl;

import com.alibaba.otter.node.common.config.ConfigClientService;
import com.alibaba.otter.node.common.config.NodeTaskListener;
import com.alibaba.otter.node.common.config.NodeTaskService;
import com.alibaba.otter.node.common.config.model.NodeTask;
import com.alibaba.otter.node.etl.common.datasource.DataSourceService;
import com.alibaba.otter.node.etl.common.db.dialect.DbDialectFactory;
import com.alibaba.otter.node.etl.common.jmx.StageAggregationCollector;
import com.alibaba.otter.node.etl.common.task.GlobalTask;
import com.alibaba.otter.node.etl.extract.ExtractTask;
import com.alibaba.otter.node.etl.load.LoadTask;
import com.alibaba.otter.node.etl.select.SelectTask;
import com.alibaba.otter.node.etl.transform.TransformTask;
import com.alibaba.otter.shared.arbitrate.ArbitrateEventService;
import com.alibaba.otter.shared.arbitrate.ArbitrateManageService;
import com.alibaba.otter.shared.arbitrate.impl.manage.NodeSessionExpired;
import com.alibaba.otter.shared.arbitrate.impl.zookeeper.ZooKeeperClient;
import com.alibaba.otter.shared.common.model.config.ConfigException;
import com.alibaba.otter.shared.common.model.config.enums.StageType;
import com.alibaba.otter.shared.common.model.config.node.Node;
import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline;
import com.alibaba.otter.shared.common.utils.AddressUtils;
import com.alibaba.otter.shared.common.utils.JsonUtils;
import com.alibaba.otter.shared.common.utils.version.VersionInfo;
import com.google.common.base.Function;
import com.google.common.collect.MapMaker;
import com.google.common.collect.OtterMigrateMap;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/otter/node/etl/OtterController.class */
public class OtterController implements NodeTaskListener, OtterControllerMBean {
    private static final Logger logger = LoggerFactory.getLogger(OtterController.class);
    private Map<Long, Map<StageType, GlobalTask>> controllers = OtterMigrateMap.makeComputingMap(new Function<Long, Map<StageType, GlobalTask>>() { // from class: com.alibaba.otter.node.etl.OtterController.1
        public Map<StageType, GlobalTask> apply(Long l) {
            return new MapMaker().makeMap();
        }
    });
    private ConfigClientService configClientService;
    private ArbitrateManageService arbitrateManageService;
    private NodeTaskService nodeTaskService;
    private DataSourceService dataSourceService;
    private DbDialectFactory dbDialectFactory;
    private ArbitrateEventService arbitrateEventService;
    private ExecutorService executorService;
    private StageAggregationCollector stageAggregationCollector;

    public void start() throws Throwable {
        initNid();
        this.nodeTaskService.addListener(this);
    }

    public void stop() throws Throwable {
        Iterator<Map<StageType, GlobalTask>> it = this.controllers.values().iterator();
        while (it.hasNext()) {
            Iterator<GlobalTask> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                try {
                    it2.next().shutdown();
                } catch (Exception e) {
                    logger.error("##shutdown task error!", e);
                }
            }
        }
        try {
            this.arbitrateManageService.nodeEvent().destory(Long.valueOf(this.configClientService.currentNode().getId().longValue()));
        } catch (Exception e2) {
            logger.error("##destory node error!", e2);
        }
        try {
            this.arbitrateEventService.toolEvent().release();
        } catch (Exception e3) {
            logger.error("##destory arbitrate error!", e3);
        }
        try {
            this.nodeTaskService.stopNode();
        } catch (Exception e4) {
            logger.error("##stop node error!", e4);
        }
        try {
            OtterContextLocator.close();
        } catch (Exception e5) {
            logger.error("##cloes spring error!", e5);
        }
        ZooKeeperClient.destory();
    }

    public boolean process(List<NodeTask> list) {
        if (list == null || list.isEmpty()) {
            return true;
        }
        for (NodeTask nodeTask : list) {
            boolean isShutdown = nodeTask.isShutdown();
            Long id = nodeTask.getPipeline().getId();
            if (isShutdown) {
                Map<StageType, GlobalTask> remove = this.controllers.remove(id);
                if (remove != null) {
                    logger.info("INFO ## shutdown this pipeline sync ,the pipelineId = {} and tasks = {}", id, remove.keySet());
                    stopPipeline(id, remove);
                } else {
                    logger.info("INFO ## this pipeline id = {} is not start sync", id);
                }
            } else {
                startPipeline(nodeTask);
            }
        }
        return true;
    }

    public void startPipeline(NodeTask nodeTask) {
        Long id = nodeTask.getPipeline().getId();
        releasePipeline(id);
        Map<StageType, GlobalTask> map = this.controllers.get(id);
        List stage = nodeTask.getStage();
        List event = nodeTask.getEvent();
        for (int i = 0; i < stage.size(); i++) {
            StageType stageType = (StageType) stage.get(i);
            if (((NodeTask.TaskEvent) event.get(i)).isCreate()) {
                startTask(nodeTask.getPipeline(), map, stageType);
            } else {
                stopTask(map, stageType);
            }
        }
    }

    private void startTask(Pipeline pipeline, Map<StageType, GlobalTask> map, StageType stageType) {
        if (map.get(stageType) != null && map.get(stageType).isAlive()) {
            logger.warn("WARN ## this task = {} has started", stageType);
        }
        GlobalTask globalTask = null;
        if (stageType.isSelect()) {
            globalTask = new SelectTask(pipeline.getId());
        } else if (stageType.isExtract()) {
            globalTask = new ExtractTask(pipeline.getId());
        } else if (stageType.isTransform()) {
            globalTask = new TransformTask(pipeline.getId());
        } else if (stageType.isLoad()) {
            globalTask = new LoadTask(pipeline.getId());
        }
        if (globalTask != null) {
            OtterContextLocator.autowire(globalTask);
            globalTask.start();
            map.put(stageType, globalTask);
            logger.info("INFO ## start this task = {} success", stageType.toString());
        }
    }

    private void stopTask(Map<StageType, GlobalTask> map, StageType stageType) {
        GlobalTask remove = map.remove(stageType);
        if (remove == null) {
            logger.info("INFo ## taskName = {} is not started", stageType);
        } else {
            remove.shutdown();
            logger.info("INFO ## taskName = {} has shutdown", stageType);
        }
    }

    private void stopPipeline(Long l, Map<StageType, GlobalTask> map) {
        for (GlobalTask globalTask : map.values()) {
            try {
                try {
                    globalTask.shutdown();
                    map.remove(globalTask);
                } catch (Exception e) {
                    logger.error("## stop s/e/t/l task error!", e);
                    map.remove(globalTask);
                }
            } catch (Throwable th) {
                map.remove(globalTask);
                throw th;
            }
        }
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e2) {
            logger.error("ERROR ## ", e2);
        }
        releasePipeline(l);
        this.arbitrateEventService.toolEvent().release(l);
    }

    private void releasePipeline(Long l) {
        this.dataSourceService.destroy(l);
        this.dbDialectFactory.destory(l);
    }

    private void initNid() {
        String property = System.getProperty(OtterConstants.NID_NAME);
        if (StringUtils.isEmpty(property)) {
            throw new ConfigException("nid is not set!");
        }
        logger.info("INFO ## the nodeId = {}", property);
        checkNidVaild(property);
        this.arbitrateManageService.nodeEvent().init(Long.valueOf(property));
        NodeSessionExpired nodeSessionExpired = new NodeSessionExpired();
        nodeSessionExpired.setNodeEvent(this.arbitrateManageService.nodeEvent());
        ZooKeeperClient.registerNotification(nodeSessionExpired);
    }

    private void checkNidVaild(String str) {
        Node currentNode = this.configClientService.currentNode();
        String hostIp = AddressUtils.getHostIp();
        String ip = currentNode.getIp();
        int intValue = currentNode.getPort().intValue();
        if (!AddressUtils.isHostIp(ip)) {
            throw new IllegalArgumentException(String.format("node[%s] ip[%s] port[%s] , but your host ip[%s] is not matched!", str, ip, Integer.valueOf(intValue), hostIp));
        }
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public String getHeapMemoryUsage() {
        return JsonUtils.marshalToString(ManagementFactory.getMemoryMXBean().getHeapMemoryUsage());
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public String getNodeSystemInfo() {
        OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean();
        StringBuilder sb = new StringBuilder();
        sb.append("").append(operatingSystemMXBean.getName()).append(' ').append(operatingSystemMXBean.getVersion()).append(' ').append(operatingSystemMXBean.getArch());
        sb.append(" @ ").append(operatingSystemMXBean.getAvailableProcessors()).append(" cores");
        sb.append(" , 【 load average:").append(operatingSystemMXBean.getSystemLoadAverage()).append(" 】");
        return sb.toString();
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public String getNodeVersionInfo() {
        return VersionInfo.getVersion() + " [ r" + VersionInfo.getRevision() + " ] @ " + VersionInfo.getDate();
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public int getRunningPipelineCount() {
        return this.controllers.size();
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public List<Long> getRunningPipelines() {
        return new ArrayList(this.controllers.keySet());
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public int getThreadActiveSize() {
        if (this.executorService instanceof ThreadPoolExecutor) {
            return ((ThreadPoolExecutor) this.executorService).getActiveCount();
        }
        return 0;
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public int getThreadPoolSize() {
        if (this.executorService instanceof ThreadPoolExecutor) {
            return ((ThreadPoolExecutor) this.executorService).getCorePoolSize();
        }
        return 0;
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public void setThreadPoolSize(int i) {
        if (this.executorService instanceof ThreadPoolExecutor) {
            ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) this.executorService;
            threadPoolExecutor.setCorePoolSize(i);
            threadPoolExecutor.setMaximumPoolSize(i);
        }
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public void setProfile(boolean z) {
        this.stageAggregationCollector.setProfiling(z);
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public boolean isSelectRunning(Long l) {
        return this.controllers.get(l).containsKey(StageType.SELECT);
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public boolean isExtractRunning(Long l) {
        return this.controllers.get(l).containsKey(StageType.EXTRACT);
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public boolean isTransformRunning(Long l) {
        return this.controllers.get(l).containsKey(StageType.TRANSFORM);
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public boolean isLoadRunning(Long l) {
        return this.controllers.get(l).containsKey(StageType.LOAD);
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public String selectStageAggregation(Long l) {
        return this.stageAggregationCollector.histogram(l, StageType.SELECT);
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public String extractStageAggregation(Long l) {
        return this.stageAggregationCollector.histogram(l, StageType.EXTRACT);
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public String transformStageAggregation(Long l) {
        return this.stageAggregationCollector.histogram(l, StageType.TRANSFORM);
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public String loadStageAggregation(Long l) {
        return this.stageAggregationCollector.histogram(l, StageType.LOAD);
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public String selectPendingProcess(Long l) {
        return pendingProcess(l, StageType.SELECT);
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public String extractPendingProcess(Long l) {
        return pendingProcess(l, StageType.EXTRACT);
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public String transformPendingProcess(Long l) {
        return pendingProcess(l, StageType.TRANSFORM);
    }

    @Override // com.alibaba.otter.node.etl.OtterControllerMBean
    public String loadPendingProcess(Long l) {
        return pendingProcess(l, StageType.LOAD);
    }

    private String pendingProcess(Long l, StageType stageType) {
        GlobalTask globalTask = this.controllers.get(l).get(stageType);
        return globalTask != null ? "stage:" + stageType + " , pending:[" + StringUtils.join(globalTask.getPendingProcess(), ',') + "]" : "node don't running stage:" + stageType;
    }

    public void setNodeTaskService(NodeTaskService nodeTaskService) {
        this.nodeTaskService = nodeTaskService;
    }

    public void setConfigClientService(ConfigClientService configClientService) {
        this.configClientService = configClientService;
    }

    public void setArbitrateManageService(ArbitrateManageService arbitrateManageService) {
        this.arbitrateManageService = arbitrateManageService;
    }

    public void setDataSourceService(DataSourceService dataSourceService) {
        this.dataSourceService = dataSourceService;
    }

    public void setDbDialectFactory(DbDialectFactory dbDialectFactory) {
        this.dbDialectFactory = dbDialectFactory;
    }

    public void setArbitrateEventService(ArbitrateEventService arbitrateEventService) {
        this.arbitrateEventService = arbitrateEventService;
    }

    public void setStageAggregationCollector(StageAggregationCollector stageAggregationCollector) {
        this.stageAggregationCollector = stageAggregationCollector;
    }

    public void setExecutorService(ExecutorService executorService) {
        this.executorService = executorService;
    }
}
