package com.alibaba.otter.shared.arbitrate.impl.setl.zookeeper;

import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.shared.arbitrate.exception.ArbitrateException;
import com.alibaba.otter.shared.arbitrate.impl.config.ArbitrateConfigUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.ArbitrateFactory;
import com.alibaba.otter.shared.arbitrate.impl.setl.ExtractArbitrateEvent;
import com.alibaba.otter.shared.arbitrate.impl.setl.helper.StagePathUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.lb.LoadBalanceFactory;
import com.alibaba.otter.shared.arbitrate.impl.setl.monitor.PermitMonitor;
import com.alibaba.otter.shared.arbitrate.impl.setl.zookeeper.monitor.ExtractStageListener;
import com.alibaba.otter.shared.arbitrate.impl.zookeeper.ZooKeeperClient;
import com.alibaba.otter.shared.arbitrate.model.EtlEventData;
import com.alibaba.otter.shared.common.model.config.channel.ChannelStatus;
import com.alibaba.otter.shared.common.model.config.node.Node;
import com.alibaba.otter.shared.common.utils.JsonUtils;
import com.alibaba.otter.shared.common.utils.zookeeper.ZkClientx;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

/* loaded from: input_file:com/alibaba/otter/shared/arbitrate/impl/setl/zookeeper/ExtractZooKeeperArbitrateEvent.class */
public class ExtractZooKeeperArbitrateEvent implements ExtractArbitrateEvent {
    private static final Logger logger = LoggerFactory.getLogger(ExtractZooKeeperArbitrateEvent.class);
    private ZkClientx zookeeper = ZooKeeperClient.getInstance();

    @Override // com.alibaba.otter.shared.arbitrate.impl.setl.ExtractArbitrateEvent
    public EtlEventData await(Long l) throws InterruptedException {
        Assert.notNull(l);
        PermitMonitor permitMonitor = (PermitMonitor) ArbitrateFactory.getInstance(l, PermitMonitor.class);
        permitMonitor.waitForPermit();
        Long waitForProcess = ((ExtractStageListener) ArbitrateFactory.getInstance(l, ExtractStageListener.class)).waitForProcess();
        ChannelStatus channelPermit = permitMonitor.getChannelPermit();
        if (!channelPermit.isStart()) {
            logger.warn("pipelineId[{}] extract ignore processId[{}] by status[{}]", new Object[]{l, waitForProcess, channelPermit});
            this.zookeeper.delete(StagePathUtils.getProcess(l, waitForProcess));
            return await(l);
        }
        try {
            EtlEventData etlEventData = (EtlEventData) JsonUtils.unmarshalFromByte((byte[]) this.zookeeper.readData(StagePathUtils.getSelectStage(l, waitForProcess)), EtlEventData.class);
            Node nextTransformNode = LoadBalanceFactory.getNextTransformNode(l);
            if (nextTransformNode == null) {
                throw new ArbitrateException("Extract_single", "no next node");
            }
            etlEventData.setNextNid(nextTransformNode.getId());
            return etlEventData;
        } catch (ZkNoNodeException e) {
            logger.error("pipeline[{}] processId[{}] is invalid , retry again", l, waitForProcess);
            return await(l);
        } catch (ZkException e2) {
            throw new ArbitrateException("Extract_await", e2.getMessage(), e2);
        }
    }

    @Override // com.alibaba.otter.shared.arbitrate.impl.setl.ExtractArbitrateEvent
    public void single(EtlEventData etlEventData) {
        Assert.notNull(etlEventData);
        String extractStage = StagePathUtils.getExtractStage(etlEventData.getPipelineId(), etlEventData.getProcessId());
        etlEventData.setCurrNid(ArbitrateConfigUtils.getCurrentNid());
        try {
            this.zookeeper.create(extractStage, JsonUtils.marshalToByte(etlEventData, new SerializerFeature[]{SerializerFeature.WriteClassName}), CreateMode.PERSISTENT);
        } catch (ZkException e) {
            throw new ArbitrateException("Extract_single", e.getMessage(), e);
        } catch (ZkNodeExistsException e2) {
            logger.warn("pipelineId[{}] extract ignore processId[{}] single by data:{}", new Object[]{etlEventData.getPipelineId(), etlEventData.getProcessId(), etlEventData});
        } catch (ZkNoNodeException e3) {
            logger.warn("pipelineId[{}] extract ignore processId[{}] single by data:{}", new Object[]{etlEventData.getPipelineId(), etlEventData.getProcessId(), etlEventData});
        } catch (ZkInterruptedException e4) {
        }
    }
}
