/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.shared.arbitrate.impl.setl.rpc;

import com.alibaba.otter.shared.arbitrate.exception.ArbitrateException;
import com.alibaba.otter.shared.arbitrate.impl.config.ArbitrateConfigUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.ArbitrateFactory;
import com.alibaba.otter.shared.arbitrate.impl.setl.ArbitrateLifeCycle;
import com.alibaba.otter.shared.arbitrate.impl.setl.helper.ReplyProcessQueue;
import com.alibaba.otter.shared.arbitrate.impl.setl.helper.StageProgress;
import com.alibaba.otter.shared.arbitrate.impl.setl.rpc.monitor.ProcessListener;
import com.alibaba.otter.shared.arbitrate.impl.setl.rpc.monitor.ProcessMonitor;
import com.alibaba.otter.shared.arbitrate.model.EtlEventData;
import com.alibaba.otter.shared.common.model.config.enums.StageType;
import com.google.common.base.Function;
import com.google.common.collect.MapMaker;
import com.google.common.collect.OtterMigrateMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.ClassUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

public class RpcStageController
extends ArbitrateLifeCycle
implements ProcessListener {
    private static final Logger logger = LoggerFactory.getLogger(RpcStageController.class);
    private Map<StageType, ReplyProcessQueue> replys = OtterMigrateMap.makeComputingMap((Function)new Function<StageType, ReplyProcessQueue>(){

        public ReplyProcessQueue apply(StageType input) {
            int size = ArbitrateConfigUtils.getParallelism(RpcStageController.this.getPipelineId()) * 10;
            if (size < 100) {
                size = 100;
            }
            return new ReplyProcessQueue(size);
        }
    });
    private Map<Long, StageProgress> progress = new MapMaker().makeMap();
    private ProcessMonitor processMonitor;
    private volatile Long lastestLoadedProcessId = -1L;

    public RpcStageController(Long pipelineId) {
        super(pipelineId);
        this.processMonitor = ArbitrateFactory.getInstance(pipelineId, ProcessMonitor.class);
        this.processMonitor.addListener(this);
        this.processMonitor.reload();
    }

    public Long waitForProcess(StageType stage) throws InterruptedException {
        if (stage.isSelect()) {
            throw new ArbitrateException("not support");
        }
        return this.replys.get(stage).take();
    }

    public EtlEventData getLastData(Long processId) {
        return this.progress.get(processId).getData();
    }

    @Override
    public void destory() {
        this.processMonitor.removeListener(this);
        this.replys.clear();
        this.progress.clear();
    }

    public synchronized boolean single(StageType stage, EtlEventData etlEventData) {
        boolean result = true;
        switch (stage) {
            case SELECT: {
                this.progress.put(etlEventData.getProcessId(), new StageProgress(StageType.SELECT, etlEventData));
                this.replys.get(StageType.EXTRACT).offer(etlEventData.getProcessId());
                break;
            }
            case EXTRACT: {
                this.progress.put(etlEventData.getProcessId(), new StageProgress(StageType.EXTRACT, etlEventData));
                this.replys.get(StageType.TRANSFORM).offer(etlEventData.getProcessId());
                break;
            }
            case TRANSFORM: {
                this.progress.put(etlEventData.getProcessId(), new StageProgress(StageType.TRANSFORM, etlEventData));
                this.computeNextLoad();
                break;
            }
            case LOAD: {
                StageProgress removed = this.progress.remove(etlEventData.getProcessId());
                if (removed == null) {
                    result = false;
                    break;
                }
                this.lastestLoadedProcessId = etlEventData.getProcessId();
                this.computeNextLoad();
                break;
            }
        }
        return result;
    }

    private void computeNextLoad() {
        Long processId = this.getMinTransformedProcessId(this.lastestLoadedProcessId);
        if (processId != null) {
            this.replys.get(StageType.LOAD).offer(processId);
        }
    }

    private Long getMinTransformedProcessId(Long loadedProcessId) {
        ProcessMonitor processMonitor = ArbitrateFactory.getInstance(this.getPipelineId(), ProcessMonitor.class);
        List<Long> processIds = processMonitor.getCurrentProcessIds();
        if (!CollectionUtils.isEmpty(processIds) && !CollectionUtils.isEmpty(this.progress)) {
            Long result = null;
            for (Long processId : processIds) {
                if (loadedProcessId != null && processId <= loadedProcessId) continue;
                result = processId;
                break;
            }
            if (result == null) {
                result = loadedProcessId + 1L;
            }
            if (result != null) {
                StageProgress stage = this.progress.get(result);
                if (stage != null && stage.getStage().isTransform()) {
                    return result;
                }
                logger.info("rpc compute [{}] but stage [{}]", (Object)result, (Object)(stage == null ? null : stage.getStage()));
                return null;
            }
        }
        return null;
    }

    @Override
    public void processChanged(List<Long> processIds) {
        this.compareProgress(processIds);
        for (ReplyProcessQueue replyProcessIds : this.replys.values()) {
            this.compareReply(processIds, replyProcessIds);
        }
        this.computeNextLoad();
    }

    private synchronized void compareProgress(List<Long> processIds) {
        if (!CollectionUtils.isEmpty(processIds)) {
            Long minProcessId = processIds.get(0);
            for (Long processId : this.progress.keySet()) {
                if (processId >= minProcessId) continue;
                this.progress.remove(processId);
            }
        }
    }

    private synchronized void compareReply(List<Long> processIds, ReplyProcessQueue replyProcessIds) {
        Object[] replyIds;
        for (Object replyId : replyIds = replyProcessIds.toArray()) {
            Long processId;
            if (processIds.contains((Long)replyId) || CollectionUtils.isEmpty(processIds) || (processId = processIds.get(0)) <= (Long)replyId) continue;
            logger.info("## {} remove reply id [{}]", (Object)ClassUtils.getShortClassName(this.getClass()), (Object)((Long)replyId));
            replyProcessIds.remove((Long)replyId);
            this.progress.remove((Long)replyId);
        }
    }
}

