/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.shared.arbitrate.impl.setl.zookeeper.monitor;

import com.alibaba.otter.shared.arbitrate.impl.config.ArbitrateConfigUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.ArbitrateFactory;
import com.alibaba.otter.shared.arbitrate.impl.setl.helper.StagePathUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.monitor.MainstemMonitor;
import com.alibaba.otter.shared.arbitrate.impl.setl.monitor.PermitMonitor;
import com.alibaba.otter.shared.arbitrate.impl.setl.monitor.listener.MainstemListener;
import com.alibaba.otter.shared.arbitrate.impl.setl.monitor.listener.PermitListener;
import com.alibaba.otter.shared.arbitrate.impl.setl.zookeeper.monitor.AbstractStageListener;
import com.alibaba.otter.shared.arbitrate.impl.setl.zookeeper.monitor.StageListener;
import com.alibaba.otter.shared.arbitrate.model.MainStemEventData;
import com.alibaba.otter.shared.arbitrate.model.ProcessNodeEventData;
import com.alibaba.otter.shared.common.utils.JsonUtils;
import java.util.List;
import org.I0Itec.zkclient.exception.ZkException;
import org.apache.commons.lang.StringUtils;
import org.apache.zookeeper.CreateMode;

public class SelectStageListener
extends AbstractStageListener
implements StageListener,
PermitListener,
MainstemListener {
    private volatile boolean isPermit = true;
    private PermitMonitor permitMonitor;
    private MainstemMonitor mainstemMonitor;

    public SelectStageListener(Long pipelineId) {
        super(pipelineId);
        this.permitMonitor = ArbitrateFactory.getInstance(pipelineId, PermitMonitor.class);
        this.mainstemMonitor = ArbitrateFactory.getInstance(pipelineId, MainstemMonitor.class);
        this.permitMonitor.addListener(this);
        this.mainstemMonitor.addListener(this);
        this.recovery(this.getPipelineId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void processChanged(List<Long> processIds) {
        block10: {
            super.processChanged(processIds);
            for (Long processId : processIds) {
                if (this.replyProcessIds.contains(processId)) continue;
                logger.warn("process is not in order, please check processId:{}", (Object)processId);
                this.addReply(processId);
            }
            try {
                String path = StagePathUtils.getProcessRoot(this.getPipelineId());
                int size = ArbitrateConfigUtils.getParallelism(this.getPipelineId()) - processIds.size();
                if (size <= 0) break block10;
                PermitMonitor permit = ArbitrateFactory.getInstance(this.getPipelineId(), PermitMonitor.class);
                if (!permit.isPermit()) {
                    return;
                }
                String mainStemPath = StagePathUtils.getMainStem(this.getPipelineId());
                byte[] bytes = (byte[])this.zookeeper.readData(mainStemPath, true);
                if (bytes == null) {
                    return;
                }
                MainStemEventData eventData = (MainStemEventData)JsonUtils.unmarshalFromByte((byte[])bytes, MainStemEventData.class);
                if (!eventData.getNid().equals(ArbitrateConfigUtils.getCurrentNid())) {
                    return;
                }
                SelectStageListener selectStageListener = this;
                synchronized (selectStageListener) {
                    List currentProcesses = this.zookeeper.getChildren(path);
                    size = ArbitrateConfigUtils.getParallelism(this.getPipelineId()) - currentProcesses.size();
                    if (size > 0) {
                        ProcessNodeEventData nodeData = new ProcessNodeEventData();
                        nodeData.setStatus(ProcessNodeEventData.Status.UNUSED);
                        nodeData.setNid(ArbitrateConfigUtils.getCurrentNid());
                        byte[] nodeBytes = JsonUtils.marshalToByte((Object)nodeData);
                        String processPath = this.zookeeper.create(path + "/", (Object)nodeBytes, CreateMode.PERSISTENT_SEQUENTIAL);
                        String processNode = StringUtils.substringAfterLast((String)processPath, (String)"/");
                        Long processId = StagePathUtils.getProcessId(processNode);
                        this.addReply(processId);
                    }
                }
            }
            catch (ZkException e) {
                this.recovery(this.getPipelineId());
                logger.error("SelectStageListener", (Throwable)e);
            }
        }
    }

    @Override
    public void processChanged(boolean isPermit) {
        if (this.isPermit != isPermit && isPermit) {
            this.stageMonitor.reload();
        }
        this.isPermit = isPermit;
    }

    private void recovery(Long pipelineId) {
        List<Long> currentProcessIds = this.stageMonitor.getCurrentProcessIds(false);
        for (Long processId : currentProcessIds) {
            String path = StagePathUtils.getProcess(pipelineId, processId);
            try {
                byte[] bytes = (byte[])this.zookeeper.readData(path);
                ProcessNodeEventData nodeData = (ProcessNodeEventData)JsonUtils.unmarshalFromByte((byte[])bytes, ProcessNodeEventData.class);
                if (!nodeData.getStatus().isUnUsed()) continue;
                this.addReply(processId);
            }
            catch (ZkException e) {
                logger.error("SelectStageListener", (Throwable)e);
            }
        }
    }

    @Override
    public void processActiveEnter() {
        this.recovery(this.getPipelineId());
        this.stageMonitor.reload();
    }

    @Override
    public void processActiveExit() {
        ArbitrateFactory.destory(this.getPipelineId(), this.getClass());
    }

    @Override
    public void destory() {
        this.permitMonitor.removeListener(this);
        this.mainstemMonitor.removeListener(this);
        super.destory();
    }
}

