/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.shared.arbitrate.impl.setl.zookeeper;

import com.alibaba.otter.shared.arbitrate.exception.ArbitrateException;
import com.alibaba.otter.shared.arbitrate.impl.config.ArbitrateConfigUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.ArbitrateFactory;
import com.alibaba.otter.shared.arbitrate.impl.setl.LoadArbitrateEvent;
import com.alibaba.otter.shared.arbitrate.impl.setl.helper.StagePathUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.monitor.PermitMonitor;
import com.alibaba.otter.shared.arbitrate.impl.setl.zookeeper.TerminZooKeeperArbitrateEvent;
import com.alibaba.otter.shared.arbitrate.impl.setl.zookeeper.monitor.LoadStageListener;
import com.alibaba.otter.shared.arbitrate.impl.zookeeper.ZooKeeperClient;
import com.alibaba.otter.shared.arbitrate.model.EtlEventData;
import com.alibaba.otter.shared.arbitrate.model.TerminEventData;
import com.alibaba.otter.shared.common.model.config.channel.ChannelStatus;
import com.alibaba.otter.shared.common.utils.JsonUtils;
import com.alibaba.otter.shared.common.utils.zookeeper.ZkClientx;
import java.util.Date;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

public class LoadZooKeeperArbitrateEvent
implements LoadArbitrateEvent {
    private static final Logger logger = LoggerFactory.getLogger(LoadZooKeeperArbitrateEvent.class);
    private ZkClientx zookeeper = ZooKeeperClient.getInstance();
    private TerminZooKeeperArbitrateEvent terminEvent;

    @Override
    public EtlEventData await(Long pipelineId) throws InterruptedException {
        Assert.notNull((Object)pipelineId);
        PermitMonitor permitMonitor = ArbitrateFactory.getInstance(pipelineId, PermitMonitor.class);
        permitMonitor.waitForPermit();
        LoadStageListener loadStageListener = ArbitrateFactory.getInstance(pipelineId, LoadStageListener.class);
        Long processId = loadStageListener.waitForProcess();
        try {
            ChannelStatus status = permitMonitor.getChannelPermit();
            if (status.isStart()) {
                String path = StagePathUtils.getTransformStage(pipelineId, processId);
                try {
                    byte[] data = (byte[])this.zookeeper.readData(path);
                    return (EtlEventData)JsonUtils.unmarshalFromByte((byte[])data, EtlEventData.class);
                }
                catch (ZkNoNodeException e) {
                    logger.error("pipeline[{}] processId[{}] is invalid , retry again", (Object)pipelineId, (Object)processId);
                    return this.await(pipelineId);
                }
                catch (ZkException e) {
                    throw e;
                }
            }
            logger.warn("pipelineId[{}] load ignore processId[{}] by status[{}]", new Object[]{pipelineId, processId, status});
            String path = StagePathUtils.getProcess(pipelineId, processId);
            this.zookeeper.delete(path);
            return this.await(pipelineId);
        }
        catch (InterruptedException e) {
            throw e;
        }
        catch (Exception e) {
            throw new ArbitrateException(e);
        }
    }

    @Override
    public void single(EtlEventData data) {
        Assert.notNull((Object)data);
        data.setEndTime(new Date().getTime());
        TerminEventData termin = new TerminEventData();
        termin.setPipelineId(data.getPipelineId());
        termin.setProcessId(data.getProcessId());
        termin.setStartTime(data.getStartTime());
        termin.setEndTime(data.getEndTime());
        termin.setFirstTime(data.getFirstTime());
        termin.setNumber(data.getNumber());
        termin.setBatchId(data.getBatchId());
        termin.setSize(data.getSize());
        termin.setExts(data.getExts());
        termin.setType(TerminEventData.TerminType.NORMAL);
        termin.setCode("setl");
        termin.setDesc("");
        termin.setCurrNid(ArbitrateConfigUtils.getCurrentNid());
        this.terminEvent.single(termin);
    }

    public void release(Long pipelineId) {
    }

    public void setTerminEvent(TerminZooKeeperArbitrateEvent terminEvent) {
        this.terminEvent = terminEvent;
    }
}

