/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.shared.arbitrate.impl.setl.zookeeper;

import com.alibaba.otter.shared.arbitrate.exception.ArbitrateException;
import com.alibaba.otter.shared.arbitrate.impl.communication.ArbitrateCommmunicationClient;
import com.alibaba.otter.shared.arbitrate.impl.config.ArbitrateConfigUtils;
import com.alibaba.otter.shared.arbitrate.impl.manage.ChannelArbitrateEvent;
import com.alibaba.otter.shared.arbitrate.impl.setl.ArbitrateFactory;
import com.alibaba.otter.shared.arbitrate.impl.setl.TerminArbitrateEvent;
import com.alibaba.otter.shared.arbitrate.impl.setl.helper.StagePathUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.monitor.TerminMonitor;
import com.alibaba.otter.shared.arbitrate.impl.setl.zookeeper.termin.NormalTerminProcess;
import com.alibaba.otter.shared.arbitrate.impl.setl.zookeeper.termin.WarningTerminProcess;
import com.alibaba.otter.shared.arbitrate.impl.zookeeper.ZooKeeperClient;
import com.alibaba.otter.shared.arbitrate.model.TerminEventData;
import com.alibaba.otter.shared.common.model.config.channel.Channel;
import com.alibaba.otter.shared.common.utils.JsonUtils;
import com.alibaba.otter.shared.common.utils.zookeeper.ZkClientx;
import com.alibaba.otter.shared.communication.core.model.Event;
import com.alibaba.otter.shared.communication.model.arbitrate.StopChannelEvent;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

public class TerminZooKeeperArbitrateEvent
implements TerminArbitrateEvent {
    private static final Logger logger = LoggerFactory.getLogger(TerminZooKeeperArbitrateEvent.class);
    private ZkClientx zookeeper = ZooKeeperClient.getInstance();
    private ArbitrateCommmunicationClient arbitrateCommmunicationClient;
    private NormalTerminProcess normalTerminProcess;
    private WarningTerminProcess warningTerminProcess;
    private ChannelArbitrateEvent channelEvent;

    @Override
    public TerminEventData await(Long pipelineId) throws InterruptedException {
        Assert.notNull((Object)pipelineId);
        TerminMonitor terminMonitor = ArbitrateFactory.getInstance(pipelineId, TerminMonitor.class);
        Long processId = terminMonitor.waitForProcess();
        if (logger.isDebugEnabled()) {
            logger.debug("## await pipeline[{}] processId[{}] is termin", (Object)pipelineId, (Object)processId);
        }
        String path = StagePathUtils.getTermin(pipelineId, processId);
        try {
            byte[] data = (byte[])this.zookeeper.readData(path);
            return (TerminEventData)JsonUtils.unmarshalFromByte((byte[])data, TerminEventData.class);
        }
        catch (ZkNoNodeException e) {
            logger.error("pipeline[{}] processId[{}] is process", (Object)pipelineId, (Object)processId);
            terminMonitor.ack(processId);
            return this.await(pipelineId);
        }
        catch (ZkException e) {
            throw new ArbitrateException("Termin_await", e);
        }
    }

    @Override
    public void exhaust(Long pipelineId) {
        Assert.notNull((Object)pipelineId);
        TerminMonitor terminMonitor = ArbitrateFactory.getInstance(pipelineId, TerminMonitor.class);
        int size = terminMonitor.size();
        try {
            for (int i = 0; i < size; ++i) {
                Long processId = terminMonitor.waitForProcess();
                TerminEventData data = new TerminEventData();
                data.setPipelineId(pipelineId);
                data.setProcessId(processId);
                this.ack(data);
            }
        }
        catch (InterruptedException e) {
            throw new ArbitrateException(e);
        }
    }

    @Override
    public void ack(TerminEventData data) {
        Assert.notNull((Object)data);
        String path = StagePathUtils.getTermin(data.getPipelineId(), data.getProcessId());
        try {
            this.zookeeper.delete(path);
        }
        catch (ZkNoNodeException zkNoNodeException) {
        }
        catch (ZkException e) {
            throw new ArbitrateException("Termin_ack", e);
        }
        TerminMonitor terminMonitor = ArbitrateFactory.getInstance(data.getPipelineId(), TerminMonitor.class);
        terminMonitor.ack(data.getProcessId());
    }

    @Override
    public int size(Long pipelineId) {
        Assert.notNull((Object)pipelineId);
        TerminMonitor terminMonitor = ArbitrateFactory.getInstance(pipelineId, TerminMonitor.class);
        return terminMonitor.size();
    }

    @Override
    public void single(TerminEventData data) {
        TerminEventData.TerminType type = data.getType();
        if (type.isNormal()) {
            Assert.notNull((Object)data.getProcessId());
            this.normalTerminProcess.process(data);
        } else if (type.isWarning()) {
            this.warningTerminProcess.process(data);
        } else {
            boolean restarted;
            Channel channel = ArbitrateConfigUtils.getChannel(data.getPipelineId());
            if (data.getType().isRollback()) {
                boolean paused = this.channelEvent.pause(channel.getId());
                if (paused) {
                    this.warningTerminProcess.process(data);
                }
            } else if (data.getType().isShutdown()) {
                boolean shutdowned = this.channelEvent.stop(channel.getId());
                if (shutdowned) {
                    this.warningTerminProcess.process(data);
                }
                StopChannelEvent event = new StopChannelEvent();
                event.setChannelId(channel.getId());
                this.arbitrateCommmunicationClient.callManager((Event)event);
            } else if (data.getType().isRestart() && (restarted = this.channelEvent.restart(channel.getId()))) {
                this.warningTerminProcess.process(data);
            }
        }
    }

    public void setArbitrateCommmunicationClient(ArbitrateCommmunicationClient arbitrateCommmunicationClient) {
        this.arbitrateCommmunicationClient = arbitrateCommmunicationClient;
    }

    public void setNormalTerminProcess(NormalTerminProcess normalTerminProcess) {
        this.normalTerminProcess = normalTerminProcess;
    }

    public void setWarningTerminProcess(WarningTerminProcess warningTerminProcess) {
        this.warningTerminProcess = warningTerminProcess;
    }

    public void setChannelEvent(ChannelArbitrateEvent channelEvent) {
        this.channelEvent = channelEvent;
    }
}

