/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.engine.mr;

import java.util.Set;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidModeEnum;
import org.apache.kylin.cube.cuboid.CuboidUtil;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.IMROutput2;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.engine.mr.steps.InMemCuboidFromBaseCuboidJob;
import org.apache.kylin.engine.mr.steps.NDCuboidJob;
import org.apache.kylin.engine.mr.streaming.ColumnToRowJob;
import org.apache.kylin.engine.mr.streaming.MergeDictJob;
import org.apache.kylin.engine.mr.streaming.SaveDictStep;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.stream.core.util.HDFSUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingCubingJobBuilder
extends JobBuilderSupport {
    private static final Logger logger = LoggerFactory.getLogger(StreamingCubingJobBuilder.class);
    private final IMROutput2.IMRBatchCubingOutputSide2 outputSide;

    public StreamingCubingJobBuilder(CubeSegment newSegment, String submitter) {
        super(newSegment, submitter);
        this.outputSide = MRUtil.getBatchCubingOutputSide2(this.seg);
    }

    public CubingJob build() {
        logger.info("MR_V2 new job to BUILD streaming segment " + this.seg);
        CubingJob result = CubingJob.createStreamJob(this.seg, this.submitter, this.config);
        String jobId = result.getId();
        String streamingStoragePath = this.getStreamingIndexPath(jobId);
        String cuboidRootPath = this.getCuboidRootPath(jobId);
        result.addTask(this.createMergeDictStep(streamingStoragePath, jobId, result));
        result.addTask(this.createSaveDictStep(jobId, result));
        String tmpBaseCuboidPath = this.getBaseCuboidPathForStreaming(jobId);
        this.addBuildBaseCuboidStep(result, tmpBaseCuboidPath, streamingStoragePath);
        result.addTask(this.createCalculateStatsFromBaseCuboid(tmpBaseCuboidPath, this.getStatisticsPath(jobId)));
        result.addTask(this.createSaveStatisticsStep(jobId));
        this.outputSide.addStepPhase2_BuildDictionary(result);
        result.addTask(this.createInMemCubingStep(jobId, CuboidModeEnum.CURRENT, cuboidRootPath, tmpBaseCuboidPath));
        this.outputSide.addStepPhase3_BuildCube(result);
        this.outputSide.addStepPhase4_Cleanup(result);
        return result;
    }

    private void addBuildBaseCuboidStep(CubingJob result, String outputBaseCuboidPath, String streamingStoragePath) {
        result.addTask(this.createBaseCuboidStep(streamingStoragePath, outputBaseCuboidPath));
    }

    private void addLayerCubingStepsOnBaseCuboid(CubingJob result, String jobId, String cuboidRootPath) {
        int maxLevel = CuboidUtil.getLongestDepth((Set)this.seg.getCuboidScheduler().getAllCuboidIds());
        result.addTask(this.createBaseCuboidStep(StreamingCubingJobBuilder.getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId));
        for (int i = 1; i <= maxLevel; ++i) {
            result.addTask(this.createNDimensionCuboidStep(StreamingCubingJobBuilder.getCuboidOutputPathsByLevel(cuboidRootPath, i - 1), StreamingCubingJobBuilder.getCuboidOutputPathsByLevel(cuboidRootPath, i), i, jobId));
        }
    }

    private MapReduceExecutable createMergeDictStep(String streamingStoragePath, String jobId, DefaultChainedExecutable jobFlow) {
        MapReduceExecutable mergeDict = new MapReduceExecutable();
        mergeDict.setName("Build Dimension Dictionaries For Steaming Job");
        StringBuilder cmd = new StringBuilder();
        this.appendMapReduceParameters(cmd, "cube_merge");
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "jobname", "Build Dimension Dictionaries For Steaming Job");
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "input", streamingStoragePath);
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "cubename", this.seg.getRealization().getName());
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "segmentname", this.seg.getName());
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "output", this.getDictPath(jobId));
        String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
        mergeDict.setMapReduceParams(cmd.toString());
        mergeDict.setMapReduceJobClass(MergeDictJob.class);
        mergeDict.setLockPathName(cubeName);
        mergeDict.setIsNeedLock(true);
        mergeDict.setIsNeedReleaseLock(false);
        mergeDict.setJobFlowJobId(jobFlow.getId());
        return mergeDict;
    }

    private MapReduceExecutable createInMemCubingStep(String jobId, CuboidModeEnum cuboidMode, String cuboidRootPath, String tmpBaseCuboidPath) {
        MapReduceExecutable cubeStep = new MapReduceExecutable();
        StringBuilder cmd = new StringBuilder();
        this.appendMapReduceParameters(cmd, "inmem");
        cubeStep.setName("Build Cube In-Mem");
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "cubename", this.seg.getRealization().getName());
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "segmentid", this.seg.getUuid());
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "input", tmpBaseCuboidPath);
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "output", StreamingCubingJobBuilder.getInMemCuboidPath(cuboidRootPath));
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + this.seg.getRealization().getName());
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "cubingJobId", jobId);
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "cuboidMode", cuboidMode.toString());
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "updateShard", "true");
        cubeStep.setMapReduceParams(cmd.toString());
        cubeStep.setMapReduceJobClass(InMemCuboidFromBaseCuboidJob.class);
        cubeStep.setCounterSaveAs(",,byteSizeBytes");
        return cubeStep;
    }

    private MapReduceExecutable createNDimensionCuboidStep(String parentPath, String outputPath, int level, String jobId) {
        MapReduceExecutable ndCuboidStep = new MapReduceExecutable();
        ndCuboidStep.setName("Build N-Dimension Cuboid : level " + level);
        StringBuilder cmd = new StringBuilder();
        this.appendMapReduceParameters(cmd);
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "cubename", this.seg.getRealization().getName());
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "segmentid", this.seg.getUuid());
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "input", parentPath);
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "output", outputPath);
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + this.seg.getRealization().getName() + "_Step");
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "level", "" + level);
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "cubingJobId", jobId);
        ndCuboidStep.setMapReduceParams(cmd.toString());
        ndCuboidStep.setMapReduceJobClass(this.getNDCuboidJob());
        return ndCuboidStep;
    }

    protected Class<? extends AbstractHadoopJob> getNDCuboidJob() {
        return NDCuboidJob.class;
    }

    private MapReduceExecutable createBaseCuboidStep(String streamingStoragePath, String basicCuboidOutputPath) {
        MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
        StringBuilder cmd = new StringBuilder();
        this.appendMapReduceParameters(cmd, "inmem");
        baseCuboidStep.setName("Build Base Cuboid Data For Streaming Job");
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "cubename", this.seg.getRealization().getName());
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "segmentname", this.seg.getName());
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "input", streamingStoragePath);
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "output", basicCuboidOutputPath);
        StreamingCubingJobBuilder.appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + this.seg.getRealization().getName());
        baseCuboidStep.setMapReduceParams(cmd.toString());
        baseCuboidStep.setMapReduceJobClass(ColumnToRowJob.class);
        baseCuboidStep.setCounterSaveAs("sourceRecordCount,sourceSizeBytes");
        return baseCuboidStep;
    }

    private SaveDictStep createSaveDictStep(String jobId, DefaultChainedExecutable jobFlow) {
        SaveDictStep result = new SaveDictStep();
        String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
        result.setName("Save Cube Dictionaries");
        CubingExecutableUtil.setCubeName(this.seg.getRealization().getName(), result.getParams());
        CubingExecutableUtil.setSegmentId(this.seg.getUuid(), result.getParams());
        CubingExecutableUtil.setDictsPath(this.getDictPath(jobId), result.getParams());
        CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
        result.setIsNeedReleaseLock(true);
        result.setJobFlowJobId(jobFlow.getId());
        result.setLockPathName(cubeName);
        return result;
    }

    public String getStreamingIndexPath(String jobId) {
        return HDFSUtil.getStreamingSegmentFilePath((String)this.seg.getRealization().getName(), (String)this.seg.getName());
    }

    private String getBaseCuboidPathForStreaming(String jobId) {
        return this.getRealizationRootPath(jobId) + "/stream_temp/" + "base_cuboid";
    }

    private String getDictPath(String jobId) {
        return this.getRealizationRootPath(jobId) + "/dict";
    }
}

