/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.shared.arbitrate.impl.setl.rpc;

import com.alibaba.otter.shared.arbitrate.exception.ArbitrateException;
import com.alibaba.otter.shared.arbitrate.impl.setl.ArbitrateFactory;
import com.alibaba.otter.shared.arbitrate.impl.setl.ExtractArbitrateEvent;
import com.alibaba.otter.shared.arbitrate.impl.setl.helper.StagePathUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.lb.LoadBalanceFactory;
import com.alibaba.otter.shared.arbitrate.impl.setl.monitor.PermitMonitor;
import com.alibaba.otter.shared.arbitrate.impl.setl.rpc.RpcStageController;
import com.alibaba.otter.shared.arbitrate.impl.setl.rpc.RpcStageEventDispatcher;
import com.alibaba.otter.shared.arbitrate.impl.zookeeper.ZooKeeperClient;
import com.alibaba.otter.shared.arbitrate.model.EtlEventData;
import com.alibaba.otter.shared.common.model.config.channel.ChannelStatus;
import com.alibaba.otter.shared.common.model.config.enums.StageType;
import com.alibaba.otter.shared.common.model.config.node.Node;
import com.alibaba.otter.shared.common.utils.zookeeper.ZkClientx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

public class ExtractRpcArbitrateEvent
implements ExtractArbitrateEvent {
    private static final Logger logger = LoggerFactory.getLogger(ExtractRpcArbitrateEvent.class);
    private ZkClientx zookeeper = ZooKeeperClient.getInstance();
    private RpcStageEventDispatcher rpcStageEventDispatcher;

    @Override
    public EtlEventData await(Long pipelineId) throws InterruptedException {
        Assert.notNull((Object)pipelineId);
        PermitMonitor permitMonitor = ArbitrateFactory.getInstance(pipelineId, PermitMonitor.class);
        permitMonitor.waitForPermit();
        RpcStageController stageController = ArbitrateFactory.getInstance(pipelineId, RpcStageController.class);
        Long processId = stageController.waitForProcess(StageType.EXTRACT);
        ChannelStatus status = permitMonitor.getChannelPermit();
        if (status.isStart() || status.isPause()) {
            EtlEventData eventData = stageController.getLastData(processId);
            Node node = LoadBalanceFactory.getNextTransformNode(pipelineId);
            if (node == null) {
                throw new ArbitrateException("Extract_single", "no next node");
            }
            eventData.setNextNid(node.getId());
            return eventData;
        }
        logger.warn("pipelineId[{}] extract ignore processId[{}] by status[{}]", new Object[]{pipelineId, processId, status});
        String path = StagePathUtils.getProcess(pipelineId, processId);
        this.zookeeper.exists(path);
        return this.await(pipelineId);
    }

    @Override
    public void single(EtlEventData data) {
        Assert.notNull((Object)data);
        this.rpcStageEventDispatcher.single(StageType.EXTRACT, data);
    }

    public void setRpcStageEventDispatcher(RpcStageEventDispatcher rpcStageEventDispatcher) {
        this.rpcStageEventDispatcher = rpcStageEventDispatcher;
    }
}

