/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.shared.arbitrate.impl.setl.monitor;

import com.alibaba.otter.shared.arbitrate.exception.ArbitrateException;
import com.alibaba.otter.shared.arbitrate.impl.config.ArbitrateConfigUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.ArbitrateFactory;
import com.alibaba.otter.shared.arbitrate.impl.setl.ArbitrateLifeCycle;
import com.alibaba.otter.shared.arbitrate.impl.setl.helper.StagePathUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.monitor.Monitor;
import com.alibaba.otter.shared.arbitrate.impl.setl.monitor.MonitorScheduler;
import com.alibaba.otter.shared.arbitrate.impl.setl.monitor.PermitMonitor;
import com.alibaba.otter.shared.arbitrate.impl.setl.monitor.listener.MainstemListener;
import com.alibaba.otter.shared.arbitrate.impl.zookeeper.ZooKeeperClient;
import com.alibaba.otter.shared.arbitrate.model.MainStemEventData;
import com.alibaba.otter.shared.common.model.config.channel.ChannelStatus;
import com.alibaba.otter.shared.common.utils.JsonUtils;
import com.alibaba.otter.shared.common.utils.lock.BooleanMutex;
import com.alibaba.otter.shared.common.utils.zookeeper.ZkClientx;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.commons.lang.ClassUtils;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.util.Assert;

public class MainstemMonitor
extends ArbitrateLifeCycle
implements Monitor {
    private static final Logger logger = LoggerFactory.getLogger(MainstemMonitor.class);
    private ZkClientx zookeeper = ZooKeeperClient.getInstance();
    private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
    private int delayTime = 5;
    private volatile MainStemEventData activeData;
    private IZkDataListener dataListener;
    private BooleanMutex mutex = new BooleanMutex(Boolean.valueOf(false));
    private volatile boolean release = false;
    private List<MainstemListener> listeners = Collections.synchronizedList(new ArrayList());

    public MainstemMonitor(Long pipelineId) {
        super(pipelineId);
        this.dataListener = new IZkDataListener(){

            public void handleDataChange(String dataPath, Object data) throws Exception {
                MDC.put((String)"otter", (String)String.valueOf(MainstemMonitor.this.getPipelineId()));
                MainStemEventData mainStemData = (MainStemEventData)JsonUtils.unmarshalFromByte((byte[])((byte[])data), MainStemEventData.class);
                if (!MainstemMonitor.this.isMine(mainStemData.getNid())) {
                    MainstemMonitor.this.mutex.set(Boolean.valueOf(false));
                }
                if (!mainStemData.isActive() && MainstemMonitor.this.isMine(mainStemData.getNid())) {
                    MainstemMonitor.this.release = true;
                    MainstemMonitor.this.releaseMainstem();
                }
                MainstemMonitor.this.activeData = mainStemData;
            }

            public void handleDataDeleted(String dataPath) throws Exception {
                MDC.put((String)"otter", (String)String.valueOf(MainstemMonitor.this.getPipelineId()));
                MainstemMonitor.this.mutex.set(Boolean.valueOf(false));
                if (!MainstemMonitor.this.release && MainstemMonitor.this.isMine(MainstemMonitor.this.activeData.getNid())) {
                    MainstemMonitor.this.initMainstem();
                } else {
                    MainstemMonitor.this.delayExector.schedule(new Runnable(){

                        @Override
                        public void run() {
                            MainstemMonitor.this.initMainstem();
                        }
                    }, (long)MainstemMonitor.this.delayTime, TimeUnit.SECONDS);
                }
            }
        };
        String path = StagePathUtils.getMainStem(this.getPipelineId());
        this.zookeeper.subscribeDataChanges(path, this.dataListener);
        MonitorScheduler.register(this, 300000L, 300000L);
    }

    @Override
    public void reload() {
        if (logger.isDebugEnabled()) {
            logger.debug("## reload mainstem pipeline[{}]", (Object)this.getPipelineId());
        }
        try {
            this.initMainstem();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    public void initMainstem() {
        block5: {
            if (this.isStop()) {
                return;
            }
            PermitMonitor permitMonitor = ArbitrateFactory.getInstance(this.getPipelineId(), PermitMonitor.class);
            ChannelStatus status = permitMonitor.getChannelPermit(true);
            if (status.isStop()) {
                return;
            }
            Long nid = ArbitrateConfigUtils.getCurrentNid();
            String path = StagePathUtils.getMainStem(this.getPipelineId());
            MainStemEventData data = new MainStemEventData();
            data.setStatus(MainStemEventData.Status.TAKEING);
            data.setPipelineId(this.getPipelineId());
            data.setNid(nid);
            byte[] bytes = JsonUtils.marshalToByte((Object)data);
            try {
                this.mutex.set(Boolean.valueOf(false));
                this.zookeeper.create(path, (Object)bytes, CreateMode.EPHEMERAL);
                this.activeData = data;
                this.processActiveEnter();
                this.mutex.set(Boolean.valueOf(true));
            }
            catch (ZkNodeExistsException e) {
                bytes = (byte[])this.zookeeper.readData(path, true);
                if (bytes == null) {
                    this.initMainstem();
                }
                this.activeData = (MainStemEventData)JsonUtils.unmarshalFromByte((byte[])bytes, MainStemEventData.class);
                if (!nid.equals(this.activeData.getNid())) break block5;
                this.mutex.set(Boolean.valueOf(true));
            }
        }
    }

    @Override
    public void destory() {
        super.destory();
        String path = StagePathUtils.getMainStem(this.getPipelineId());
        this.zookeeper.unsubscribeDataChanges(path, this.dataListener);
        this.delayExector.shutdownNow();
        this.releaseMainstem();
        MonitorScheduler.unRegister(this);
    }

    public boolean releaseMainstem() {
        if (this.check()) {
            String path = StagePathUtils.getMainStem(this.getPipelineId());
            this.zookeeper.delete(path);
            this.mutex.set(Boolean.valueOf(false));
            this.processActiveExit();
            return true;
        }
        return false;
    }

    public MainStemEventData getCurrentActiveData() {
        return this.activeData;
    }

    public void waitForActive() throws InterruptedException {
        this.initMainstem();
        this.mutex.get();
    }

    public boolean check() {
        String path = StagePathUtils.getMainStem(this.getPipelineId());
        try {
            MainStemEventData eventData;
            byte[] bytes = (byte[])this.zookeeper.readData(path);
            Long nid = ArbitrateConfigUtils.getCurrentNid();
            this.activeData = eventData = (MainStemEventData)JsonUtils.unmarshalFromByte((byte[])bytes, MainStemEventData.class);
            boolean result = nid.equals(eventData.getNid());
            if (!result) {
                logger.warn("mainstem is running in node[{}] , but not in node[{}]", (Object)eventData.getNid(), (Object)nid);
            }
            return result;
        }
        catch (ZkNoNodeException e) {
            logger.warn("mainstem is not run any in node");
            return false;
        }
        catch (ZkInterruptedException e) {
            logger.warn("mainstem check is interrupt");
            Thread.interrupted();
            return this.check();
        }
        catch (ZkException e) {
            logger.warn("mainstem check is failed");
            return false;
        }
    }

    public void single(MainStemEventData data) {
        Assert.notNull((Object)data);
        Long nid = ArbitrateConfigUtils.getCurrentNid();
        if (!this.check()) {
            return;
        }
        data.setNid(nid);
        String path = StagePathUtils.getMainStem(data.getPipelineId());
        byte[] bytes = JsonUtils.marshalToByte((Object)data);
        try {
            this.zookeeper.writeData(path, (Object)bytes);
        }
        catch (ZkException e) {
            throw new ArbitrateException("mainStem_single", data.toString(), e);
        }
        this.activeData = data;
    }

    private boolean isMine(Long targetNid) {
        return targetNid.equals(ArbitrateConfigUtils.getCurrentNid());
    }

    public void addListener(MainstemListener listener) {
        if (logger.isDebugEnabled()) {
            logger.debug("## pipeline[{}] add listener [{}]", (Object)ClassUtils.getShortClassName(listener.getClass()));
        }
        this.listeners.add(listener);
    }

    public void removeListener(MainstemListener listener) {
        if (logger.isDebugEnabled()) {
            logger.debug("## remove listener [{}]", (Object)ClassUtils.getShortClassName(listener.getClass()));
        }
        this.listeners.remove(listener);
    }

    private void processActiveEnter() {
        for (MainstemListener listener : Lists.newArrayList(this.listeners)) {
            try {
                listener.processActiveEnter();
            }
            catch (Exception e) {
                logger.error("processSwitchActive failed", (Throwable)e);
            }
        }
    }

    private void processActiveExit() {
        for (MainstemListener listener : Lists.newArrayList(this.listeners)) {
            try {
                listener.processActiveExit();
            }
            catch (Exception e) {
                logger.error("processSwitchActive failed", (Throwable)e);
            }
        }
    }

    public void setDelayTime(int delayTime) {
        this.delayTime = delayTime;
    }
}

