package com.alibaba.otter.shared.arbitrate.impl.setl.rpc;

import com.alibaba.otter.shared.arbitrate.exception.ArbitrateException;
import com.alibaba.otter.shared.arbitrate.impl.config.ArbitrateConfigUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.ArbitrateFactory;
import com.alibaba.otter.shared.arbitrate.impl.setl.ArbitrateLifeCycle;
import com.alibaba.otter.shared.arbitrate.impl.setl.helper.ReplyProcessQueue;
import com.alibaba.otter.shared.arbitrate.impl.setl.helper.StageProgress;
import com.alibaba.otter.shared.arbitrate.impl.setl.rpc.monitor.ProcessListener;
import com.alibaba.otter.shared.arbitrate.impl.setl.rpc.monitor.ProcessMonitor;
import com.alibaba.otter.shared.arbitrate.model.EtlEventData;
import com.alibaba.otter.shared.common.model.config.enums.StageType;
import com.google.common.base.Function;
import com.google.common.collect.MapMaker;
import com.google.common.collect.OtterMigrateMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.ClassUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/alibaba/otter/shared/arbitrate/impl/setl/rpc/RpcStageController.class */
public class RpcStageController extends ArbitrateLifeCycle implements ProcessListener {
    private static final Logger logger = LoggerFactory.getLogger(RpcStageController.class);
    private Map<StageType, ReplyProcessQueue> replys;
    private Map<Long, StageProgress> progress;
    private ProcessMonitor processMonitor;
    private volatile Long lastestLoadedProcessId;

    /* renamed from: com.alibaba.otter.shared.arbitrate.impl.setl.rpc.RpcStageController$2, reason: invalid class name */
    /* loaded from: input_file:com/alibaba/otter/shared/arbitrate/impl/setl/rpc/RpcStageController$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$com$alibaba$otter$shared$common$model$config$enums$StageType = new int[StageType.values().length];

        static {
            try {
                $SwitchMap$com$alibaba$otter$shared$common$model$config$enums$StageType[StageType.SELECT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$alibaba$otter$shared$common$model$config$enums$StageType[StageType.EXTRACT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$alibaba$otter$shared$common$model$config$enums$StageType[StageType.TRANSFORM.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$alibaba$otter$shared$common$model$config$enums$StageType[StageType.LOAD.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public RpcStageController(Long l) {
        super(l);
        this.lastestLoadedProcessId = -1L;
        this.replys = OtterMigrateMap.makeComputingMap(new Function<StageType, ReplyProcessQueue>() { // from class: com.alibaba.otter.shared.arbitrate.impl.setl.rpc.RpcStageController.1
            public ReplyProcessQueue apply(StageType stageType) {
                int parallelism = ArbitrateConfigUtils.getParallelism(RpcStageController.this.getPipelineId()) * 10;
                if (parallelism < 100) {
                    parallelism = 100;
                }
                return new ReplyProcessQueue(parallelism);
            }
        });
        this.progress = new MapMaker().makeMap();
        this.processMonitor = (ProcessMonitor) ArbitrateFactory.getInstance(l, ProcessMonitor.class);
        this.processMonitor.addListener(this);
        this.processMonitor.reload();
    }

    public Long waitForProcess(StageType stageType) throws InterruptedException {
        if (stageType.isSelect()) {
            throw new ArbitrateException("not support");
        }
        return this.replys.get(stageType).take();
    }

    public EtlEventData getLastData(Long l) {
        return this.progress.get(l).getData();
    }

    @Override // com.alibaba.otter.shared.arbitrate.impl.setl.ArbitrateLifeCycle
    public void destory() {
        this.processMonitor.removeListener(this);
        this.replys.clear();
        this.progress.clear();
    }

    public synchronized boolean single(StageType stageType, EtlEventData etlEventData) {
        boolean z = true;
        switch (AnonymousClass2.$SwitchMap$com$alibaba$otter$shared$common$model$config$enums$StageType[stageType.ordinal()]) {
            case 1:
                this.progress.put(etlEventData.getProcessId(), new StageProgress(StageType.SELECT, etlEventData));
                this.replys.get(StageType.EXTRACT).offer(etlEventData.getProcessId());
                break;
            case 2:
                this.progress.put(etlEventData.getProcessId(), new StageProgress(StageType.EXTRACT, etlEventData));
                this.replys.get(StageType.TRANSFORM).offer(etlEventData.getProcessId());
                break;
            case 3:
                this.progress.put(etlEventData.getProcessId(), new StageProgress(StageType.TRANSFORM, etlEventData));
                computeNextLoad();
                break;
            case 4:
                if (this.progress.remove(etlEventData.getProcessId()) != null) {
                    this.lastestLoadedProcessId = etlEventData.getProcessId();
                    computeNextLoad();
                    break;
                } else {
                    z = false;
                    break;
                }
        }
        return z;
    }

    private void computeNextLoad() {
        Long minTransformedProcessId = getMinTransformedProcessId(this.lastestLoadedProcessId);
        if (minTransformedProcessId != null) {
            this.replys.get(StageType.LOAD).offer(minTransformedProcessId);
        }
    }

    private Long getMinTransformedProcessId(Long l) {
        List<Long> currentProcessIds = ((ProcessMonitor) ArbitrateFactory.getInstance(getPipelineId(), ProcessMonitor.class)).getCurrentProcessIds();
        if (CollectionUtils.isEmpty(currentProcessIds) || CollectionUtils.isEmpty(this.progress)) {
            return null;
        }
        Long l2 = null;
        for (Long l3 : currentProcessIds) {
            if (l == null || l3.longValue() > l.longValue()) {
                l2 = l3;
                break;
            }
        }
        if (l2 == null) {
            l2 = Long.valueOf(l.longValue() + 1);
        }
        if (l2 == null) {
            return null;
        }
        StageProgress stageProgress = this.progress.get(l2);
        if (stageProgress != null && stageProgress.getStage().isTransform()) {
            return l2;
        }
        logger.info("rpc compute [{}] but stage [{}]", l2, stageProgress == null ? null : stageProgress.getStage());
        return null;
    }

    @Override // com.alibaba.otter.shared.arbitrate.impl.setl.rpc.monitor.ProcessListener
    public void processChanged(List<Long> list) {
        compareProgress(list);
        Iterator<ReplyProcessQueue> it = this.replys.values().iterator();
        while (it.hasNext()) {
            compareReply(list, it.next());
        }
        computeNextLoad();
    }

    private synchronized void compareProgress(List<Long> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        Long l = list.get(0);
        for (Long l2 : this.progress.keySet()) {
            if (l2.longValue() < l.longValue()) {
                this.progress.remove(l2);
            }
        }
    }

    private synchronized void compareReply(List<Long> list, ReplyProcessQueue replyProcessQueue) {
        for (Object obj : replyProcessQueue.toArray()) {
            if (!list.contains((Long) obj) && !CollectionUtils.isEmpty(list) && list.get(0).longValue() > ((Long) obj).longValue()) {
                logger.info("## {} remove reply id [{}]", ClassUtils.getShortClassName(getClass()), (Long) obj);
                replyProcessQueue.remove((Long) obj);
                this.progress.remove((Long) obj);
            }
        }
    }
}
