/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.shared.arbitrate.impl.setl.zookeeper.monitor;

import com.alibaba.otter.shared.arbitrate.impl.config.ArbitrateConfigUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.ArbitrateFactory;
import com.alibaba.otter.shared.arbitrate.impl.setl.ArbitrateLifeCycle;
import com.alibaba.otter.shared.arbitrate.impl.setl.helper.ReplyProcessQueue;
import com.alibaba.otter.shared.arbitrate.impl.setl.zookeeper.monitor.StageListener;
import com.alibaba.otter.shared.arbitrate.impl.setl.zookeeper.monitor.StageMonitor;
import com.alibaba.otter.shared.arbitrate.impl.zookeeper.ZooKeeperClient;
import com.alibaba.otter.shared.common.utils.zookeeper.ZkClientx;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.ClassUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

public abstract class AbstractStageListener
extends ArbitrateLifeCycle
implements StageListener {
    protected static final Logger logger = LoggerFactory.getLogger(AbstractStageListener.class);
    protected ZkClientx zookeeper = ZooKeeperClient.getInstance();
    protected ReplyProcessQueue replyProcessIds;
    protected ReentrantLock lock = new ReentrantLock();
    protected StageMonitor stageMonitor;

    public AbstractStageListener(Long pipelineId) {
        super(pipelineId);
        int size = ArbitrateConfigUtils.getParallelism(pipelineId) * 10;
        if (size < 100) {
            size = 100;
        }
        this.replyProcessIds = new ReplyProcessQueue(size);
        this.stageMonitor = ArbitrateFactory.getInstance(pipelineId, StageMonitor.class);
        this.stageMonitor.addListener(this);
        this.stageMonitor.reload();
    }

    @Override
    public void processChanged(List<Long> processIds) {
        this.compareReply(processIds);
    }

    @Override
    public void stageChannged(Long processId, List<String> stageNode) {
    }

    @Override
    public synchronized void processTermined(Long processId) {
        logger.info("## {} remove reply id [{}]", (Object)ClassUtils.getShortClassName(this.getClass()), (Object)processId);
        this.replyProcessIds.remove(processId);
    }

    public Long waitForProcess() throws InterruptedException {
        Long processId = this.replyProcessIds.take();
        logger.debug("## {} get reply id [{}]", (Object)ClassUtils.getShortClassName(this.getClass()), (Object)processId);
        return processId;
    }

    protected synchronized void addReply(Long processId) {
        boolean isSuccessed = this.replyProcessIds.offer(processId);
        if (isSuccessed) {
            logger.debug("## {} add reply id [{}]", (Object)ClassUtils.getShortClassName(this.getClass()), (Object)processId);
        } else {
            logger.warn("## {} dup reply id [{}]", (Object)ClassUtils.getShortClassName(this.getClass()), (Object)processId);
        }
    }

    protected synchronized void compareReply(List<Long> processIds) {
        Object[] replyIds;
        for (Object replyId : replyIds = this.replyProcessIds.toArray()) {
            Long processId;
            if (processIds.contains((Long)replyId) || CollectionUtils.isEmpty(processIds) || (processId = processIds.get(0)) <= (Long)replyId) continue;
            this.processTermined((Long)replyId);
        }
    }

    @Override
    public void destory() {
        super.destory();
        logger.info("## destory pipeline[{}] , Listener[{}]", (Object)this.getPipelineId(), (Object)ClassUtils.getShortClassName(this.getClass()));
        this.replyProcessIds.clear();
        this.stageMonitor.removeListener(this);
    }
}

