/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.shared.arbitrate.impl.setl.memory;

import com.alibaba.otter.shared.arbitrate.impl.config.ArbitrateConfigUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.ArbitrateLifeCycle;
import com.alibaba.otter.shared.arbitrate.impl.setl.helper.ReplyProcessQueue;
import com.alibaba.otter.shared.arbitrate.impl.setl.helper.StageProgress;
import com.alibaba.otter.shared.arbitrate.model.EtlEventData;
import com.alibaba.otter.shared.arbitrate.model.TerminEventData;
import com.alibaba.otter.shared.common.model.config.enums.StageType;
import com.google.common.base.Function;
import com.google.common.collect.MapMaker;
import com.google.common.collect.OtterMigrateMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.util.CollectionUtils;

public class MemoryStageController
extends ArbitrateLifeCycle {
    private AtomicLong atomicMaxProcessId = new AtomicLong(0L);
    private Map<StageType, ReplyProcessQueue> replys;
    private Map<Long, StageProgress> progress;
    private BlockingQueue<TerminEventData> termins;
    private StageProgress nullProgress = new StageProgress();

    public MemoryStageController(Long pipelineId) {
        super(pipelineId);
        this.replys = OtterMigrateMap.makeComputingMap((Function)new Function<StageType, ReplyProcessQueue>(){

            public ReplyProcessQueue apply(StageType input) {
                int size = ArbitrateConfigUtils.getParallelism(MemoryStageController.this.getPipelineId()) * 10;
                if (size < 100) {
                    size = 100;
                }
                return new ReplyProcessQueue(size);
            }
        });
        this.progress = new MapMaker().makeMap();
        this.termins = new LinkedBlockingQueue<TerminEventData>(20);
    }

    public Long waitForProcess(StageType stage) throws InterruptedException {
        if (stage.isSelect() && !this.replys.containsKey(stage)) {
            this.initSelect();
        }
        Long processId = this.replys.get(stage).take();
        if (stage.isSelect()) {
            this.progress.put(processId, this.nullProgress);
        }
        return processId;
    }

    public EtlEventData getLastData(Long processId) {
        return this.progress.get(processId).getData();
    }

    @Override
    public synchronized void destory() {
        this.replys.clear();
        this.progress.clear();
    }

    public synchronized void clearProgress(Long processId) {
        this.progress.remove(processId);
    }

    public synchronized void termin(TerminEventData.TerminType type) {
        ArrayList<Long> processIds = new ArrayList<Long>(this.progress.keySet());
        Collections.sort(processIds);
        for (Long processId : processIds) {
            EtlEventData eventData = this.progress.get(processId).getData();
            TerminEventData data = new TerminEventData();
            data.setPipelineId(this.getPipelineId());
            data.setType(type);
            data.setCode("channel");
            data.setDesc(type.toString());
            data.setProcessId(processId);
            if (eventData != null) {
                data.setBatchId(eventData.getBatchId());
                data.setCurrNid(eventData.getCurrNid());
                data.setStartTime(eventData.getStartTime());
                data.setEndTime(eventData.getEndTime());
                data.setFirstTime(eventData.getFirstTime());
                data.setNumber(eventData.getNumber());
                data.setSize(eventData.getSize());
                data.setExts(eventData.getExts());
            }
            this.offerTermin(data);
            this.progress.remove(processId);
        }
        this.initSelect();
    }

    public synchronized boolean single(StageType stage, EtlEventData etlEventData) {
        boolean result = false;
        switch (stage) {
            case SELECT: {
                if (!this.progress.containsKey(etlEventData.getProcessId())) break;
                this.progress.put(etlEventData.getProcessId(), new StageProgress(stage, etlEventData));
                this.replys.get(StageType.EXTRACT).offer(etlEventData.getProcessId());
                result = true;
                break;
            }
            case EXTRACT: {
                if (!this.progress.containsKey(etlEventData.getProcessId())) break;
                this.progress.put(etlEventData.getProcessId(), new StageProgress(stage, etlEventData));
                this.replys.get(StageType.TRANSFORM).offer(etlEventData.getProcessId());
                result = true;
                break;
            }
            case TRANSFORM: {
                if (this.progress.containsKey(etlEventData.getProcessId())) {
                    this.progress.put(etlEventData.getProcessId(), new StageProgress(stage, etlEventData));
                    result = true;
                }
                this.computeNextLoad();
                break;
            }
            case LOAD: {
                StageProgress removed = this.progress.remove(etlEventData.getProcessId());
                this.computeNextLoad();
                if (removed == null) break;
                this.replys.get(StageType.SELECT).offer(this.atomicMaxProcessId.incrementAndGet());
                result = true;
                break;
            }
        }
        return result;
    }

    public void offerTermin(TerminEventData data) {
        try {
            this.termins.put(data);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    public void ackTermin(TerminEventData data) {
    }

    public int sizeTermin() {
        return this.termins.size();
    }

    public TerminEventData waitTermin() throws InterruptedException {
        return this.termins.take();
    }

    private synchronized void initSelect() {
        ReplyProcessQueue queue = this.replys.get(StageType.SELECT);
        int parallelism = ArbitrateConfigUtils.getParallelism(this.getPipelineId());
        while (parallelism-- > 0 && queue.size() <= parallelism) {
            queue.offer(this.atomicMaxProcessId.incrementAndGet());
        }
    }

    private void computeNextLoad() {
        Long processId = this.getMinTransformedProcessId();
        if (processId != null) {
            this.replys.get(StageType.LOAD).offer(processId);
        }
    }

    private synchronized Long getMinTransformedProcessId() {
        Long processId;
        StageProgress stage;
        if (!CollectionUtils.isEmpty(this.progress) && (stage = this.progress.get(processId = Collections.min(this.progress.keySet()))) != null && stage != this.nullProgress && stage.getStage().isTransform()) {
            return processId;
        }
        return null;
    }
}

