/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.shared.arbitrate.impl.manage;

import com.alibaba.otter.shared.arbitrate.ArbitrateViewService;
import com.alibaba.otter.shared.arbitrate.exception.ArbitrateException;
import com.alibaba.otter.shared.arbitrate.impl.ArbitrateEvent;
import com.alibaba.otter.shared.arbitrate.impl.config.ArbitrateConfigUtils;
import com.alibaba.otter.shared.arbitrate.impl.manage.NodeArbitrateEvent;
import com.alibaba.otter.shared.arbitrate.impl.manage.helper.ManagePathUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.helper.StagePathUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.zookeeper.termin.ErrorTerminProcess;
import com.alibaba.otter.shared.arbitrate.impl.setl.zookeeper.termin.WarningTerminProcess;
import com.alibaba.otter.shared.arbitrate.impl.zookeeper.ZooKeeperClient;
import com.alibaba.otter.shared.arbitrate.model.TerminEventData;
import com.alibaba.otter.shared.common.model.config.channel.Channel;
import com.alibaba.otter.shared.common.model.config.channel.ChannelStatus;
import com.alibaba.otter.shared.common.model.config.node.Node;
import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline;
import com.alibaba.otter.shared.common.model.statistics.stage.ProcessStat;
import com.alibaba.otter.shared.common.utils.JsonUtils;
import com.alibaba.otter.shared.common.utils.zookeeper.ZkClientx;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

public class ChannelArbitrateEvent
implements ArbitrateEvent {
    protected static final Logger logger = LoggerFactory.getLogger(ChannelArbitrateEvent.class);
    private ZkClientx zookeeper = ZooKeeperClient.getInstance();
    private ArbitrateViewService arbitrateViewService;
    private NodeArbitrateEvent nodeEvent;
    private ErrorTerminProcess errorTerminProcess;
    private WarningTerminProcess warningTerminProcess;
    private ExecutorService arbitrateExecutor;

    public void init(Long channelId) {
        String path = ManagePathUtils.getChannelByChannelId(channelId);
        byte[] data = JsonUtils.marshalToByte((Object)ChannelStatus.STOP);
        try {
            this.zookeeper.create(path, (Object)data, CreateMode.PERSISTENT);
        }
        catch (ZkNodeExistsException zkNodeExistsException) {
        }
        catch (ZkNoNodeException e) {
            this.zookeeper.createPersistent(path, (Object)data, true);
        }
        catch (ZkException e) {
            throw new ArbitrateException("Channel_init", channelId.toString(), e);
        }
    }

    public void start(Long channelId) {
        this.updateStatus(channelId, ChannelStatus.START);
    }

    public boolean pause(Long channelId) {
        return this.pause(channelId, true);
    }

    public boolean pause(Long channelId, boolean needTermin) {
        boolean result;
        ChannelStatus currstatus = this.status(channelId);
        boolean status = false;
        boolean bl = result = !needTermin;
        if (currstatus.isStart()) {
            this.updateStatus(channelId, ChannelStatus.PAUSE);
            status = true;
        }
        if (needTermin) {
            try {
                result |= this.termin(channelId, TerminEventData.TerminType.ROLLBACK).booleanValue();
            }
            catch (Throwable e) {
                this.updateStatus(channelId, ChannelStatus.PAUSE);
                throw new ArbitrateException(e);
            }
        }
        return result && status;
    }

    public boolean stop(Long channelId) {
        return this.stop(channelId, true);
    }

    public boolean stop(Long channelId, boolean needTermin) {
        boolean result;
        this.updateStatus(channelId, ChannelStatus.STOP);
        boolean bl = result = !needTermin;
        if (needTermin) {
            try {
                result |= this.termin(channelId, TerminEventData.TerminType.SHUTDOWN).booleanValue();
            }
            catch (Throwable e) {
                this.updateStatus(channelId, ChannelStatus.STOP);
                throw new ArbitrateException(e);
            }
        }
        return result;
    }

    public boolean restart(Long channelId) {
        return this.restart(channelId, true);
    }

    public boolean restart(final Long channelId, boolean needTermin) {
        boolean result = !needTermin;
        boolean status = false;
        if (!this.status(channelId).isStop()) {
            this.updateStatus(channelId, ChannelStatus.PAUSE);
            status = true;
        }
        if (needTermin) {
            try {
                result |= this.termin(channelId, TerminEventData.TerminType.RESTART).booleanValue();
            }
            catch (Throwable e) {
                this.updateStatus(channelId, ChannelStatus.PAUSE);
                throw new ArbitrateException(e);
            }
        }
        if (status || result) {
            this.arbitrateExecutor.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        Thread.sleep(5000L + (long)RandomUtils.nextInt((int)2000));
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    Channel channel = ArbitrateConfigUtils.getChannelByChannelId(channelId);
                    ChannelStatus status = ChannelArbitrateEvent.this.status(channel.getId());
                    if (status.isStop()) {
                        logger.info("channel[{}] is already stop , restart is ignored", (Object)channel.getId());
                    } else if (ChannelArbitrateEvent.this.canStart(channel)) {
                        ChannelArbitrateEvent.this.start(channelId);
                    }
                }
            });
        }
        return result && status;
    }

    public ChannelStatus status(Long channelId) {
        String path = StagePathUtils.getChannelByChannelId(channelId);
        byte[] data = null;
        try {
            data = (byte[])this.zookeeper.readData(path);
        }
        catch (ZkNoNodeException e) {
            return null;
        }
        catch (ZkException e) {
            throw new ArbitrateException("Channel_status", channelId.toString(), e);
        }
        return (ChannelStatus)JsonUtils.unmarshalFromByte((byte[])data, ChannelStatus.class);
    }

    public void destory(Long channelId) {
        String path = ManagePathUtils.getChannelByChannelId(channelId);
        try {
            this.zookeeper.delete(path);
        }
        catch (ZkNoNodeException zkNoNodeException) {
        }
        catch (ZkException e) {
            throw new ArbitrateException("Channel_destory", channelId.toString(), e);
        }
    }

    private Boolean termin(Long channelId, final TerminEventData.TerminType type) throws Exception {
        Channel channel = ArbitrateConfigUtils.getChannelByChannelId(channelId);
        List pipelines = channel.getPipelines();
        ArrayList<Future<Boolean>> futures = new ArrayList<Future<Boolean>>();
        for (final Pipeline pipeline : pipelines) {
            futures.add(this.arbitrateExecutor.submit(new Callable<Boolean>(){

                @Override
                public Boolean call() {
                    TerminEventData data = new TerminEventData();
                    data.setPipelineId(pipeline.getId());
                    data.setType(type);
                    data.setCode("channel");
                    data.setDesc(type.toString());
                    return ChannelArbitrateEvent.this.errorTerminProcess.process(data);
                }
            }));
        }
        boolean result = false;
        ExecutionException exception = null;
        int index = 0;
        for (Future future : futures) {
            try {
                result |= ((Boolean)future.get()).booleanValue();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            catch (ExecutionException e) {
                this.sendWarningMessage(((Pipeline)pipelines.get(index)).getId(), e);
                exception = e;
            }
            ++index;
        }
        if (exception != null) {
            throw exception;
        }
        return result;
    }

    private void sendWarningMessage(Long pipelineId, Exception e) {
        this.sendWarningMessage(pipelineId, ExceptionUtils.getFullStackTrace((Throwable)e));
    }

    private void sendWarningMessage(Long pipelineId, String message) {
        TerminEventData eventData = new TerminEventData();
        eventData.setPipelineId(pipelineId);
        eventData.setType(TerminEventData.TerminType.WARNING);
        eventData.setCode("channel");
        eventData.setDesc(message);
        this.warningTerminProcess.process(eventData);
    }

    private boolean canStart(Channel channel) {
        List<Long> liveNodes = this.nodeEvent.liveNodes();
        for (Pipeline pipeline : channel.getPipelines()) {
            List<Long> nids = this.getNids(pipeline.getSelectNodes());
            if (!CollectionUtils.containsAny(liveNodes, nids)) {
                logger.error("current live nodes:{} , but select nids:{} , result:{}", new Object[]{liveNodes, nids, CollectionUtils.containsAny(liveNodes, nids)});
                this.sendWarningMessage(pipeline.getId(), "can't restart by no select live node");
                return false;
            }
            nids = this.getNids(pipeline.getExtractNodes());
            if (!CollectionUtils.containsAny(liveNodes, nids)) {
                logger.error("current live nodes:{} , but extract nids:{} , result:{}", new Object[]{liveNodes, nids, CollectionUtils.containsAny(liveNodes, nids)});
                this.sendWarningMessage(pipeline.getId(), "can't restart by no extract live node");
                return false;
            }
            nids = this.getNids(pipeline.getLoadNodes());
            if (!CollectionUtils.containsAny(liveNodes, nids)) {
                logger.error("current live nodes:{} , but transform nids:{} , result:{}", new Object[]{liveNodes, nids, CollectionUtils.containsAny(liveNodes, nids)});
                this.sendWarningMessage(pipeline.getId(), "can't restart by no transform live node");
                return false;
            }
            List<ProcessStat> stats = this.arbitrateViewService.listProcesses(channel.getId(), pipeline.getId());
            if (stats.isEmpty() || this.status(channel.getId()).isStart()) continue;
            ArrayList<Long> processIds = new ArrayList<Long>();
            for (ProcessStat stat : stats) {
                processIds.add(stat.getProcessId());
            }
            this.sendWarningMessage(pipeline.getId(), "can't restart by exist process[" + StringUtils.join(processIds, (char)',') + "]");
            return false;
        }
        return true;
    }

    private List<Long> getNids(List<Node> nodes) {
        ArrayList<Long> nids = new ArrayList<Long>();
        for (Node node : nodes) {
            nids.add(node.getId());
        }
        return nids;
    }

    private void updateStatus(Long channelId, ChannelStatus status) {
        String path = ManagePathUtils.getChannelByChannelId(channelId);
        byte[] data = JsonUtils.marshalToByte((Object)status);
        try {
            this.zookeeper.writeData(path, (Object)data);
        }
        catch (ZkException e) {
            throw new ArbitrateException("Channel_init", channelId.toString(), e);
        }
    }

    public void setErrorTerminProcess(ErrorTerminProcess errorTerminProcess) {
        this.errorTerminProcess = errorTerminProcess;
    }

    public void setWarningTerminProcess(WarningTerminProcess warningTerminProcess) {
        this.warningTerminProcess = warningTerminProcess;
    }

    public void setArbitrateExecutor(ExecutorService arbitrateExecutor) {
        this.arbitrateExecutor = arbitrateExecutor;
    }

    public void setArbitrateViewService(ArbitrateViewService arbitrateViewService) {
        this.arbitrateViewService = arbitrateViewService;
    }

    public void setNodeEvent(NodeArbitrateEvent nodeEvent) {
        this.nodeEvent = nodeEvent;
    }
}

