/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.shared.arbitrate.impl.setl.rpc;

import com.alibaba.otter.shared.arbitrate.impl.config.ArbitrateConfigUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.ArbitrateFactory;
import com.alibaba.otter.shared.arbitrate.impl.setl.LoadArbitrateEvent;
import com.alibaba.otter.shared.arbitrate.impl.setl.helper.StagePathUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.monitor.PermitMonitor;
import com.alibaba.otter.shared.arbitrate.impl.setl.rpc.RpcStageController;
import com.alibaba.otter.shared.arbitrate.impl.setl.rpc.RpcStageEventDispatcher;
import com.alibaba.otter.shared.arbitrate.impl.setl.rpc.TerminExecutor;
import com.alibaba.otter.shared.arbitrate.impl.setl.rpc.TerminRpcArbitrateEvent;
import com.alibaba.otter.shared.arbitrate.impl.zookeeper.ZooKeeperClient;
import com.alibaba.otter.shared.arbitrate.model.EtlEventData;
import com.alibaba.otter.shared.arbitrate.model.TerminEventData;
import com.alibaba.otter.shared.common.model.config.channel.ChannelStatus;
import com.alibaba.otter.shared.common.model.config.enums.StageType;
import com.alibaba.otter.shared.common.utils.zookeeper.ZkClientx;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

public class LoadRpcArbitrateEvent
implements LoadArbitrateEvent {
    private static final Logger logger = LoggerFactory.getLogger(LoadRpcArbitrateEvent.class);
    private TerminRpcArbitrateEvent terminEvent;
    private ZkClientx zookeeper = ZooKeeperClient.getInstance();
    private RpcStageEventDispatcher rpcStageEventDispatcher;

    @Override
    public EtlEventData await(Long pipelineId) throws InterruptedException {
        String path;
        Assert.notNull((Object)pipelineId);
        PermitMonitor permitMonitor = ArbitrateFactory.getInstance(pipelineId, PermitMonitor.class);
        permitMonitor.waitForPermit();
        RpcStageController stageController = ArbitrateFactory.getInstance(pipelineId, RpcStageController.class);
        Long processId = stageController.waitForProcess(StageType.LOAD);
        ChannelStatus status = permitMonitor.getChannelPermit();
        if (status.isStart()) {
            return stageController.getLastData(processId);
        }
        status = permitMonitor.getChannelPermit(true);
        if (status.isStart()) {
            return stageController.getLastData(processId);
        }
        if (status.isPause() && this.zookeeper.exists(path = StagePathUtils.getProcess(pipelineId, processId))) {
            return stageController.getLastData(processId);
        }
        logger.warn("pipelineId[{}] load ignore processId[{}] by status[{}]", new Object[]{pipelineId, processId, status});
        return this.await(pipelineId);
    }

    @Override
    public void single(final EtlEventData data) {
        Assert.notNull((Object)data);
        data.setEndTime(new Date().getTime());
        boolean result = this.rpcStageEventDispatcher.single(StageType.LOAD, data);
        if (result) {
            TerminExecutor executor = ArbitrateFactory.getInstance(data.getPipelineId(), TerminExecutor.class);
            executor.submit(new Runnable(){

                @Override
                public void run() {
                    TerminEventData termin = new TerminEventData();
                    termin.setPipelineId(data.getPipelineId());
                    termin.setProcessId(data.getProcessId());
                    termin.setStartTime(data.getStartTime());
                    termin.setEndTime(data.getEndTime());
                    termin.setFirstTime(data.getFirstTime());
                    termin.setNumber(data.getNumber());
                    termin.setBatchId(data.getBatchId());
                    termin.setSize(data.getSize());
                    termin.setExts(data.getExts());
                    termin.setType(TerminEventData.TerminType.NORMAL);
                    termin.setCode("setl");
                    termin.setDesc("");
                    termin.setCurrNid(ArbitrateConfigUtils.getCurrentNid());
                    LoadRpcArbitrateEvent.this.terminEvent.single(termin);
                }
            });
        }
    }

    public void setRpcStageEventDispatcher(RpcStageEventDispatcher rpcStageEventDispatcher) {
        this.rpcStageEventDispatcher = rpcStageEventDispatcher;
    }

    public void setTerminEvent(TerminRpcArbitrateEvent terminEvent) {
        this.terminEvent = terminEvent;
    }
}

