/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.engine.mr.steps;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil;
import org.apache.kylin.engine.mr.common.DictionaryGetterUtil;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T>
extends KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    private static final Logger logger = LoggerFactory.getLogger(InMemCuboidMapperBase.class);
    private int reserveMemoryMB;
    private int nSplit = 1;
    private int countOfLastSplit = 0;
    private int counter = 0;
    private int splitRowThreshold = Integer.MAX_VALUE;
    private int unitRows = 1000;
    protected CubeInstance cube;
    protected CubeDesc cubeDesc;
    protected CubeSegment cubeSegment;
    protected Map<TblColRef, Dictionary<String>> dictionaryMap;
    protected IJoinedFlatTableDesc flatDesc;
    protected int taskThreadCount;
    protected BlockingQueue<T> queue = new LinkedBlockingQueue<T>(2000);
    protected InputConverterUnit<T> inputConverterUnit;
    private Future<?> future;

    protected abstract InputConverterUnit<T> getInputConverterUnit(Mapper.Context var1);

    protected abstract T getRecordFromKeyValue(KEYIN var1, VALUEIN var2);

    protected abstract ICuboidWriter getCuboidWriter(Mapper.Context var1);

    @Override
    protected void doSetup(Mapper.Context context) throws IOException {
        DoggedCubeBuilder cubeBuilder;
        super.bindCurrentConfiguration(context.getConfiguration());
        Configuration conf = context.getConfiguration();
        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
        String cubeName = conf.get("cube.name");
        this.cube = CubeManager.getInstance((KylinConfig)config).getCube(cubeName);
        this.cubeDesc = this.cube.getDescriptor();
        String segmentID = context.getConfiguration().get("cube.segment.id");
        this.cubeSegment = this.cube.getSegmentById(segmentID);
        this.flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc((CubeSegment)this.cubeSegment), this.cubeDesc);
        this.dictionaryMap = DictionaryGetterUtil.getDictionaryMap(this.cubeSegment, context.getInputSplit(), conf);
        if (this.cubeDesc.hasMemoryHungryMeasures()) {
            this.unitRows /= 10;
        }
        String cuboidModeName = conf.get("cuboid.mode");
        CuboidScheduler cuboidScheduler = CuboidSchedulerUtil.getCuboidSchedulerByMode(this.cubeSegment, cuboidModeName);
        this.taskThreadCount = config.getCubeAlgorithmInMemConcurrentThreads();
        this.reserveMemoryMB = this.calculateReserveMB(conf);
        this.inputConverterUnit = this.getInputConverterUnit(context);
        try {
            cubeBuilder = (AbstractInMemCubeBuilder)Class.forName(this.cubeSegment.getConfig().getCubeInMemBuilderClass()).getConstructor(CuboidScheduler.class, IJoinedFlatTableDesc.class, Map.class).newInstance(cuboidScheduler, this.flatDesc, this.dictionaryMap);
        }
        catch (Exception e) {
            logger.warn("Fail to initialize cube builder by class name " + this.cubeSegment.getConfig().getCubeInMemBuilderClass() + " due to " + e);
            cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, this.flatDesc, this.dictionaryMap);
        }
        cubeBuilder.setReserveMemoryMB(this.reserveMemoryMB);
        cubeBuilder.setConcurrentThreads(this.taskThreadCount);
        ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("inmemory-cube-building-mapper-%d").build());
        this.future = executorService.submit(cubeBuilder.buildAsRunnable(this.queue, this.inputConverterUnit, this.getCuboidWriter(context)));
    }

    private int calculateReserveMB(Configuration configuration) {
        int sysAvailMB = MemoryBudgetController.getSystemAvailMB();
        int mrReserve = configuration.getInt("mapreduce.task.io.sort.mb", 100);
        int sysReserve = Math.max(sysAvailMB / 10, 100);
        int reserveMB = mrReserve + sysReserve;
        logger.info("Reserve " + reserveMB + " MB = " + mrReserve + " (MR reserve) + " + sysReserve + " (SYS reserve)");
        return reserveMB;
    }

    @Override
    public void doMap(KEYIN key, VALUEIN value, Mapper.Context context) throws IOException, InterruptedException {
        T row = this.getRecordFromKeyValue(key, value);
        if (this.offer(context, row, 1L, TimeUnit.MINUTES, 60)) {
            ++this.counter;
            ++this.countOfLastSplit;
            if (this.counter % 100000 == 0) {
                logger.info("Handled " + this.counter + " records, internal queue size = " + this.queue.size());
            }
        } else {
            throw new IOException("Failed to offer row to internal queue due to queue full!");
        }
        if (this.counter % this.unitRows == 0 && this.shouldCutSplit(this.nSplit, this.countOfLastSplit)) {
            if (!this.offer(context, this.inputConverterUnit.getCutRow(), 1L, TimeUnit.MINUTES, 60)) {
                throw new IOException("Failed to offer row to internal queue due to queue full!");
            }
            this.countOfLastSplit = 0;
            ++this.nSplit;
        }
    }

    @Override
    protected void doCleanup(Mapper.Context context) throws IOException, InterruptedException {
        logger.info("Totally handled " + this.mapCounter + " records!");
        while (!this.future.isDone() && !this.queue.offer(this.inputConverterUnit.getEndRow(), 1L, TimeUnit.SECONDS)) {
        }
        this.futureGet(context);
        this.queue.clear();
    }

    private boolean shouldCutSplit(int nSplit, long splitRowCount) {
        int systemAvailMB = MemoryBudgetController.getSystemAvailMB();
        logger.info(splitRowCount + " records went into split #" + nSplit + "; " + systemAvailMB + " MB left, " + this.reserveMemoryMB + " MB threshold");
        if (splitRowCount >= (long)this.splitRowThreshold) {
            logger.info("Split cut due to hitting splitRowThreshold " + this.splitRowThreshold);
            return true;
        }
        if (systemAvailMB <= this.reserveMemoryMB) {
            logger.info("Split cut due to hitting memory threshold, system avail " + systemAvailMB + " MB <= reserve " + this.reserveMemoryMB + " MB");
            return true;
        }
        return false;
    }

    private boolean offer(Mapper.Context context, T row, long timeout, TimeUnit unit, int nRound) throws IOException, InterruptedException {
        while (nRound > 0) {
            if (this.queue.offer(row, timeout, unit)) {
                return true;
            }
            if (this.future.isDone()) {
                this.futureGet(context);
                throw new IOException("Failed to build cube in mapper due to cubing thread exit unexpectedly");
            }
            --nRound;
        }
        return false;
    }

    private void futureGet(Mapper.Context context) throws IOException {
        try {
            this.future.get();
        }
        catch (Exception e) {
            throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
        }
    }
}

