package com.alibaba.otter.shared.arbitrate.impl.setl.rpc;

import com.alibaba.otter.shared.arbitrate.exception.ArbitrateException;
import com.alibaba.otter.shared.arbitrate.impl.setl.ArbitrateFactory;
import com.alibaba.otter.shared.arbitrate.impl.setl.ExtractArbitrateEvent;
import com.alibaba.otter.shared.arbitrate.impl.setl.helper.StagePathUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.lb.LoadBalanceFactory;
import com.alibaba.otter.shared.arbitrate.impl.setl.monitor.PermitMonitor;
import com.alibaba.otter.shared.arbitrate.impl.zookeeper.ZooKeeperClient;
import com.alibaba.otter.shared.arbitrate.model.EtlEventData;
import com.alibaba.otter.shared.common.model.config.channel.ChannelStatus;
import com.alibaba.otter.shared.common.model.config.enums.StageType;
import com.alibaba.otter.shared.common.model.config.node.Node;
import com.alibaba.otter.shared.common.utils.zookeeper.ZkClientx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

/* loaded from: input_file:com/alibaba/otter/shared/arbitrate/impl/setl/rpc/ExtractRpcArbitrateEvent.class */
public class ExtractRpcArbitrateEvent implements ExtractArbitrateEvent {
    private static final Logger logger = LoggerFactory.getLogger(ExtractRpcArbitrateEvent.class);
    private ZkClientx zookeeper = ZooKeeperClient.getInstance();
    private RpcStageEventDispatcher rpcStageEventDispatcher;

    @Override // com.alibaba.otter.shared.arbitrate.impl.setl.ExtractArbitrateEvent
    public EtlEventData await(Long l) throws InterruptedException {
        Assert.notNull(l);
        PermitMonitor permitMonitor = (PermitMonitor) ArbitrateFactory.getInstance(l, PermitMonitor.class);
        permitMonitor.waitForPermit();
        RpcStageController rpcStageController = (RpcStageController) ArbitrateFactory.getInstance(l, RpcStageController.class);
        Long waitForProcess = rpcStageController.waitForProcess(StageType.EXTRACT);
        ChannelStatus channelPermit = permitMonitor.getChannelPermit();
        if (!channelPermit.isStart() && !channelPermit.isPause()) {
            logger.warn("pipelineId[{}] extract ignore processId[{}] by status[{}]", new Object[]{l, waitForProcess, channelPermit});
            this.zookeeper.exists(StagePathUtils.getProcess(l, waitForProcess));
            return await(l);
        }
        EtlEventData lastData = rpcStageController.getLastData(waitForProcess);
        Node nextTransformNode = LoadBalanceFactory.getNextTransformNode(l);
        if (nextTransformNode == null) {
            throw new ArbitrateException("Extract_single", "no next node");
        }
        lastData.setNextNid(nextTransformNode.getId());
        return lastData;
    }

    @Override // com.alibaba.otter.shared.arbitrate.impl.setl.ExtractArbitrateEvent
    public void single(EtlEventData etlEventData) {
        Assert.notNull(etlEventData);
        this.rpcStageEventDispatcher.single(StageType.EXTRACT, etlEventData);
    }

    public void setRpcStageEventDispatcher(RpcStageEventDispatcher rpcStageEventDispatcher) {
        this.rpcStageEventDispatcher = rpcStageEventDispatcher;
    }
}
