/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.shared.arbitrate.impl.setl.zookeeper;

import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.shared.arbitrate.exception.ArbitrateException;
import com.alibaba.otter.shared.arbitrate.impl.config.ArbitrateConfigUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.ArbitrateFactory;
import com.alibaba.otter.shared.arbitrate.impl.setl.SelectArbitrateEvent;
import com.alibaba.otter.shared.arbitrate.impl.setl.helper.StagePathUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.lb.LoadBalanceFactory;
import com.alibaba.otter.shared.arbitrate.impl.setl.monitor.PermitMonitor;
import com.alibaba.otter.shared.arbitrate.impl.setl.zookeeper.monitor.SelectStageListener;
import com.alibaba.otter.shared.arbitrate.impl.zookeeper.ZooKeeperClient;
import com.alibaba.otter.shared.arbitrate.model.EtlEventData;
import com.alibaba.otter.shared.arbitrate.model.ProcessNodeEventData;
import com.alibaba.otter.shared.common.model.config.channel.ChannelStatus;
import com.alibaba.otter.shared.common.model.config.node.Node;
import com.alibaba.otter.shared.common.model.config.pipeline.PipelineParameter;
import com.alibaba.otter.shared.common.utils.JsonUtils;
import com.alibaba.otter.shared.common.utils.zookeeper.ZkClientx;
import java.util.Date;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

public class SelectZooKeeperArbitrateEvent
implements SelectArbitrateEvent {
    private static final Logger logger = LoggerFactory.getLogger(SelectZooKeeperArbitrateEvent.class);
    private ZkClientx zookeeper = ZooKeeperClient.getInstance();

    @Override
    public EtlEventData await(Long pipelineId) throws InterruptedException {
        Assert.notNull((Object)pipelineId);
        PermitMonitor permitMonitor = ArbitrateFactory.getInstance(pipelineId, PermitMonitor.class);
        permitMonitor.waitForPermit();
        SelectStageListener selectStageListener = ArbitrateFactory.getInstance(pipelineId, SelectStageListener.class);
        Long processId = selectStageListener.waitForProcess();
        ChannelStatus status = permitMonitor.getChannelPermit();
        if (status.isStart()) {
            try {
                EtlEventData eventData = new EtlEventData();
                eventData.setPipelineId(pipelineId);
                eventData.setProcessId(processId);
                eventData.setStartTime(new Date().getTime());
                Node node = LoadBalanceFactory.getNextExtractNode(pipelineId);
                if (node == null) {
                    throw new ArbitrateException("Select_single", "no next node");
                }
                eventData.setNextNid(node.getId());
                this.markUsed(eventData);
                return eventData;
            }
            catch (ZkNoNodeException e) {
                logger.error("pipeline[{}] processId[{}] is invalid , retry again", (Object)pipelineId, (Object)processId);
                return this.await(pipelineId);
            }
            catch (ZkException e) {
                throw new ArbitrateException("Select_await", e.getMessage(), e);
            }
        }
        logger.warn("pipelineId[{}] select ignore processId[{}] by status[{}]", new Object[]{pipelineId, processId, status});
        String path = StagePathUtils.getProcess(pipelineId, processId);
        this.zookeeper.delete(path);
        return this.await(pipelineId);
    }

    @Override
    public void single(EtlEventData data) {
        Assert.notNull((Object)data);
        String path = StagePathUtils.getSelectStage(data.getPipelineId(), data.getProcessId());
        data.setCurrNid(ArbitrateConfigUtils.getCurrentNid());
        byte[] bytes = JsonUtils.marshalToByte((Object)data, (SerializerFeature[])new SerializerFeature[]{SerializerFeature.WriteClassName});
        try {
            this.zookeeper.create(path, (Object)bytes, CreateMode.PERSISTENT);
        }
        catch (ZkNoNodeException e) {
            logger.warn("pipelineId[{}] select ignore processId[{}] single by data:{}", new Object[]{data.getPipelineId(), data.getProcessId(), data});
        }
        catch (ZkNodeExistsException e) {
            logger.warn("pipelineId[{}] select ignore processId[{}] single by data:{}", new Object[]{data.getPipelineId(), data.getProcessId(), data});
        }
        catch (ZkException e) {
            throw new ArbitrateException("Select_single", e.getMessage(), e);
        }
    }

    private void markUsed(EtlEventData data) throws ZkNoNodeException, ZkException {
        String path = StagePathUtils.getProcess(data.getPipelineId(), data.getProcessId());
        ProcessNodeEventData eventData = new ProcessNodeEventData();
        Long nid = ArbitrateConfigUtils.getCurrentNid();
        eventData.setNid(nid);
        eventData.setStatus(ProcessNodeEventData.Status.USED);
        eventData.setMode(PipelineParameter.ArbitrateMode.ZOOKEEPER);
        byte[] bytes = JsonUtils.marshalToByte((Object)eventData);
        this.zookeeper.writeData(path, (Object)bytes);
    }
}

