/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.engine.mr.steps;

import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ArrayPrimitiveWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.DictionaryGenerator;
import org.apache.kylin.dict.IDictionaryBuilder;
import org.apache.kylin.engine.mr.KylinReducer;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducerMapping;
import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.shaded.com.google.common.base.Preconditions;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FactDistinctColumnsReducer
extends KylinReducer<SelfDefineSortableKey, Text, NullWritable, Text> {
    private static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
    private List<Long> baseCuboidRowCountInMappers;
    protected Map<Long, HLLCounter> cuboidHLLMap = null;
    protected long baseCuboidId;
    protected CubeDesc cubeDesc;
    private long totalRowsBeforeMerge = 0L;
    private int samplingPercentage;
    private TblColRef col = null;
    private boolean isStatistics = false;
    private KylinConfig cubeConfig;
    private int taskId;
    private int rowCount = 0;
    private FactDistinctColumnsReducerMapping reducerMapping;
    private boolean buildDictInReducer;
    private IDictionaryBuilder builder;
    private String maxValue = null;
    private String minValue = null;
    public static final String DICT_FILE_POSTFIX = ".rldict";
    public static final String DIMENSION_COL_INFO_FILE_POSTFIX = ".dci";
    private MultipleOutputs mos;

    @Override
    protected void doSetup(Reducer.Context context) throws IOException {
        super.bindCurrentConfiguration(context.getConfiguration());
        Configuration conf = context.getConfiguration();
        this.mos = new MultipleOutputs((TaskInputOutputContext)context);
        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
        String cubeName = conf.get("cube.name");
        CubeInstance cube = CubeManager.getInstance((KylinConfig)config).getCube(cubeName);
        this.cubeConfig = cube.getConfig();
        this.cubeDesc = cube.getDescriptor();
        this.taskId = context.getTaskAttemptID().getTaskID().getId();
        this.reducerMapping = new FactDistinctColumnsReducerMapping(cube);
        logger.info("reducer no " + this.taskId + ", role play " + this.reducerMapping.getRolePlayOfReducer(this.taskId));
        if (this.reducerMapping.isCuboidRowCounterReducer(this.taskId)) {
            this.isStatistics = true;
            this.baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId();
            this.baseCuboidRowCountInMappers = Lists.newArrayList();
            this.cuboidHLLMap = Maps.newHashMap();
            this.samplingPercentage = Integer.parseInt(context.getConfiguration().get("statistics.sampling.percent"));
            logger.info("Reducer " + this.taskId + " handling stats");
        } else {
            this.col = this.reducerMapping.getColForReducer(this.taskId);
            Preconditions.checkNotNull((Object)this.col);
            this.buildDictInReducer = config.isBuildDictInReducerEnabled();
            if (this.cubeDesc.getDictionaryBuilderClass(this.col) != null) {
                this.buildDictInReducer = false;
            }
            if (this.reducerMapping.getReducerNumForDimCol(this.col) > 1) {
                this.buildDictInReducer = false;
            }
            if (this.buildDictInReducer) {
                this.builder = DictionaryGenerator.newDictionaryBuilder((DataType)this.col.getType());
                this.builder.init(null, 0, null);
            }
            logger.info("Reducer " + this.taskId + " handling column " + this.col + ", buildDictInReducer=" + this.buildDictInReducer);
        }
    }

    @Override
    public void doReduce(SelfDefineSortableKey skey, Iterable<Text> values, Reducer.Context context) throws IOException, InterruptedException {
        Text key = skey.getText();
        if (this.isStatistics) {
            long cuboidId = Bytes.toLong((byte[])key.getBytes(), (int)1, (int)8);
            for (Text value : values) {
                HLLCounter hll = new HLLCounter(this.cubeConfig.getCubeStatsHLLPrecision());
                ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength());
                hll.readRegisters(bf);
                this.totalRowsBeforeMerge += hll.getCountEstimate();
                if (cuboidId == this.baseCuboidId) {
                    this.baseCuboidRowCountInMappers.add(hll.getCountEstimate());
                }
                if (this.cuboidHLLMap.get(cuboidId) != null) {
                    this.cuboidHLLMap.get(cuboidId).merge(hll);
                    continue;
                }
                this.cuboidHLLMap.put(cuboidId, hll);
            }
        } else {
            String value = Bytes.toString((byte[])key.getBytes(), (int)1, (int)(key.getLength() - 1));
            this.logAFewRows(value);
            if (this.cubeDesc.listDimensionColumnsExcludingDerived(true).contains(this.col) && this.col.getType().needCompare()) {
                if (this.minValue == null || this.col.getType().compare(this.minValue, value) > 0) {
                    this.minValue = value;
                }
                if (this.maxValue == null || this.col.getType().compare(this.maxValue, value) < 0) {
                    this.maxValue = value;
                }
            }
            if (this.cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(this.col)) {
                if (this.buildDictInReducer) {
                    this.builder.addValue(value);
                } else {
                    byte[] keyBytes = Bytes.copy((byte[])key.getBytes(), (int)1, (int)(key.getLength() - 1));
                    String fileName = this.col.getIdentity() + "/";
                    this.mos.write("column", (Object)NullWritable.get(), (Object)new Text(keyBytes), fileName);
                }
            }
        }
        ++this.rowCount;
    }

    private void logAFewRows(String value) {
        if (this.rowCount < 10) {
            logger.info("Received value: " + value);
        }
    }

    @Override
    protected void doCleanup(Reducer.Context context) throws IOException, InterruptedException {
        if (this.isStatistics) {
            ArrayList allCuboids = Lists.newArrayList();
            allCuboids.addAll(this.cuboidHLLMap.keySet());
            Collections.sort(allCuboids);
            this.logMapperAndCuboidStatistics(allCuboids);
            this.outputStatistics(allCuboids);
        } else {
            if (this.cubeDesc.listDimensionColumnsExcludingDerived(true).contains(this.col)) {
                this.outputDimRangeInfo();
            }
            if (this.buildDictInReducer) {
                Dictionary dict = this.builder.build();
                this.outputDict(this.col, (Dictionary<String>)dict);
            }
        }
        this.mos.close();
    }

    private void outputDimRangeInfo() throws IOException, InterruptedException {
        if (this.col != null && this.minValue != null) {
            String dimRangeFileName = this.col.getIdentity() + "/" + this.col.getName() + DIMENSION_COL_INFO_FILE_POSTFIX;
            this.mos.write("partition", (Object)NullWritable.get(), (Object)new Text(this.minValue.getBytes(StandardCharsets.UTF_8)), dimRangeFileName);
            this.mos.write("partition", (Object)NullWritable.get(), (Object)new Text(this.maxValue.getBytes(StandardCharsets.UTF_8)), dimRangeFileName);
            logger.info("write dimension range info for col : " + this.col.getName() + "  minValue:" + this.minValue + " maxValue:" + this.maxValue);
        }
    }

    private void outputDict(TblColRef col, Dictionary<String> dict) throws IOException, InterruptedException {
        String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
             DataOutputStream outputStream = new DataOutputStream((OutputStream)baos);){
            outputStream.writeUTF(dict.getClass().getName());
            dict.write((DataOutput)outputStream);
            this.mos.write("dict", (Object)NullWritable.get(), (Object)new ArrayPrimitiveWritable((Object)baos.toByteArray()), dictFileName);
        }
    }

    private void outputStatistics(List<Long> allCuboids) throws IOException, InterruptedException {
        String statisticsFileName = "statistics/statistics";
        ByteBuffer valueBuf = ByteBuffer.allocate(0x100000);
        long grandTotal = 0L;
        for (HLLCounter hll : this.cuboidHLLMap.values()) {
            grandTotal += hll.getCountEstimate();
        }
        double mapperOverlapRatio = grandTotal == 0L ? 0.0 : (double)this.totalRowsBeforeMerge / (double)grandTotal;
        this.mos.write("statistics", (Object)new LongWritable(-1L), (Object)new BytesWritable(Bytes.toBytes((double)mapperOverlapRatio)), statisticsFileName);
        this.mos.write("statistics", (Object)new LongWritable(-2L), (Object)new BytesWritable(Bytes.toBytes((int)this.baseCuboidRowCountInMappers.size())), statisticsFileName);
        this.mos.write("statistics", (Object)new LongWritable(0L), (Object)new BytesWritable(Bytes.toBytes((int)this.samplingPercentage)), statisticsFileName);
        for (long i : allCuboids) {
            valueBuf.clear();
            this.cuboidHLLMap.get(i).writeRegisters(valueBuf);
            valueBuf.flip();
            this.mos.write("statistics", (Object)new LongWritable(i), (Object)new BytesWritable(valueBuf.array(), valueBuf.limit()), statisticsFileName);
        }
    }

    private void logMapperAndCuboidStatistics(List<Long> allCuboids) throws IOException {
        logger.info("Cuboid number for task: " + this.taskId + "\t" + allCuboids.size());
        logger.info("Samping percentage: \t" + this.samplingPercentage);
        logger.info("The following statistics are collected based on sampling data.");
        logger.info("Number of Mappers: " + this.baseCuboidRowCountInMappers.size());
        for (int i = 0; i < this.baseCuboidRowCountInMappers.size(); ++i) {
            if (this.baseCuboidRowCountInMappers.get(i) <= 0L) continue;
            logger.info("Base Cuboid in Mapper " + i + " row count: \t " + this.baseCuboidRowCountInMappers.get(i));
        }
        long grantTotal = 0L;
        for (long i : allCuboids) {
            grantTotal += this.cuboidHLLMap.get(i).getCountEstimate();
            logger.info("Cuboid " + i + " row count is: \t " + this.cuboidHLLMap.get(i).getCountEstimate());
        }
        logger.info("Sum of row counts (before merge) is: \t " + this.totalRowsBeforeMerge);
        logger.info("After merge, the row count: \t " + grantTotal);
    }
}

