/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.engine.mr.steps;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.ByteArrayWritable;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.steps.MapContextGTRecordWriter;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemCuboidMapper<KEYIN>
extends KylinMapper<KEYIN, Object, ByteArrayWritable, ByteArrayWritable> {
    private static final Logger logger = LoggerFactory.getLogger(InMemCuboidMapper.class);
    private CubeInstance cube;
    private CubeDesc cubeDesc;
    private CubeSegment cubeSegment;
    private IMRInput.IMRTableInputFormat flatTableInputFormat;
    private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(64);
    private Future<?> future;

    protected void setup(Mapper.Context context) throws IOException {
        super.bindCurrentConfiguration(context.getConfiguration());
        Configuration conf = context.getConfiguration();
        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
        String cubeName = conf.get("cube.name");
        this.cube = CubeManager.getInstance((KylinConfig)config).getCube(cubeName);
        this.cubeDesc = this.cube.getDescriptor();
        String segmentID = context.getConfiguration().get("cube.segment.id");
        this.cubeSegment = this.cube.getSegmentById(segmentID);
        this.flatTableInputFormat = MRUtil.getBatchCubingInputSide(this.cubeSegment).getFlatTableInputFormat();
        IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc((CubeSegment)this.cubeSegment);
        HashMap dictionaryMap = Maps.newHashMap();
        for (TblColRef col : this.cubeDesc.getAllColumnsHaveDictionary()) {
            Dictionary dict = this.cubeSegment.getDictionary(col);
            if (dict == null) {
                logger.warn("Dictionary for " + col + " was not found.");
            }
            dictionaryMap.put(col, this.cubeSegment.getDictionary(col));
        }
        int taskCount = config.getCubeAlgorithmInMemConcurrentThreads();
        DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(this.cube.getDescriptor(), flatDesc, (Map)dictionaryMap);
        cubeBuilder.setReserveMemoryMB(this.calculateReserveMB(context.getConfiguration()));
        cubeBuilder.setConcurrentThreads(taskCount);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        this.future = executorService.submit(cubeBuilder.buildAsRunnable(this.queue, (ICuboidWriter)new MapContextGTRecordWriter((MapContext<?, ?, ByteArrayWritable, ByteArrayWritable>)context, this.cubeDesc, this.cubeSegment)));
    }

    private int calculateReserveMB(Configuration configuration) {
        int sysAvailMB = MemoryBudgetController.getSystemAvailMB();
        int mrReserve = configuration.getInt("mapreduce.task.io.sort.mb", 100);
        int sysReserve = Math.max(sysAvailMB / 10, 100);
        int reserveMB = mrReserve + sysReserve;
        logger.info("Reserve " + reserveMB + " MB = " + mrReserve + " (MR reserve) + " + sysReserve + " (SYS reserve)");
        return reserveMB;
    }

    @Override
    public void doMap(KEYIN key, Object record, Mapper.Context context) throws IOException, InterruptedException {
        String[] row = this.flatTableInputFormat.parseMapperInput(record);
        List<String> rowAsList = Arrays.asList(row);
        while (!this.future.isDone() && !this.queue.offer(rowAsList, 1L, TimeUnit.SECONDS)) {
        }
    }

    @Override
    protected void doCleanup(Mapper.Context context) throws IOException, InterruptedException {
        logger.info("Totally handled " + this.mapCounter + " records!");
        while (!this.future.isDone() && !this.queue.offer(Collections.emptyList(), 1L, TimeUnit.SECONDS)) {
        }
        try {
            this.future.get();
        }
        catch (Exception e) {
            throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
        }
        this.queue.clear();
    }
}

