/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.engine.spark.source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.calcite.avatica.util.Quoting;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.spark.NSparkCubingEngine;
import org.apache.kylin.engine.spark.job.KylinBuildEnv;
import org.apache.kylin.engine.spark.utils.HiveTableRefChecker;
import org.apache.kylin.engine.spark.utils.HiveTransactionTableHelper;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.SparderTypeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NSparkCubingSourceInput
implements NSparkCubingEngine.NSparkCubingSource {
    private static final Logger logger = LoggerFactory.getLogger(NSparkCubingSourceInput.class);

    @Override
    public Dataset<Row> getSourceData(TableDesc table, SparkSession ss, Map<String, String> params) {
        KylinConfig kylinConfig = null;
        kylinConfig = null == KylinBuildEnv.get() ? KylinConfig.getInstanceFromEnv() : KylinBuildEnv.get().kylinConfig();
        logger.info("isRangePartition:{};isTransactional:{};isReadTransactionalTableEnabled:{}", new Object[]{table.isRangePartition(), table.isTransactional(), kylinConfig.isReadTransactionalTableEnabled()});
        List<ColumnDesc> effectiveColumns = this.extractEffectiveColumns(table, ss);
        String sql = this.generateSelectSql(table, effectiveColumns, params, kylinConfig);
        StructType kylinSchema = this.generateKylinSchema(effectiveColumns);
        if (logger.isDebugEnabled()) {
            logger.debug("Source data sql is: {}", (Object)sql);
            logger.debug("Kylin schema: {}", (Object)kylinSchema.treeString());
        }
        Dataset df = ss.sql(sql);
        StructType sparkSchema = df.schema();
        return df.select(SparderTypeUtil.alignDataTypeAndName((StructType)sparkSchema, (StructType)kylinSchema));
    }

    protected List<ColumnDesc> extractEffectiveColumns(TableDesc table, SparkSession ss) {
        ArrayList<ColumnDesc> ret = new ArrayList<ColumnDesc>();
        Dataset sourceTableDS = ss.table(table.getBackTickIdentity());
        Set sourceTableColumns = Arrays.stream(sourceTableDS.columns()).map(StringUtils::upperCase).collect(Collectors.toSet());
        for (ColumnDesc col : table.getColumns()) {
            if (col.isComputedColumn()) continue;
            if (sourceTableColumns.contains(col.getName())) {
                ret.add(col);
                continue;
            }
            logger.warn("Table {} missing column {} in source schema", (Object)table.getTableAlias(), (Object)col.getName());
        }
        return ret;
    }

    protected String generateSelectSql(TableDesc table, List<ColumnDesc> effectiveColumns, Map<String, String> params, KylinConfig kylinConfig) {
        String colString = this.generateColString(effectiveColumns);
        String sql = HiveTableRefChecker.isNeedCreateHiveTemporaryTable(table.isRangePartition(), table.isTransactional(), kylinConfig.isReadTransactionalTableEnabled()) ? HiveTransactionTableHelper.doGetQueryHiveTemporaryTableSql(table, params, colString, KylinBuildEnv.get()) : String.format(Locale.ROOT, "select %s from %s", colString, table.getBackTickIdentity());
        return sql;
    }

    protected String generateColString(List<ColumnDesc> effectiveColumns) {
        return effectiveColumns.stream().map(col -> Quoting.BACK_TICK.string + col.getName() + Quoting.BACK_TICK.string).collect(Collectors.joining(","));
    }

    protected StructType generateKylinSchema(List<ColumnDesc> effectiveColumns) {
        StructType kylinSchema = new StructType();
        for (ColumnDesc columnDesc : effectiveColumns) {
            if (columnDesc.isComputedColumn()) continue;
            kylinSchema = kylinSchema.add(columnDesc.getName(), SparderTypeUtil.toSparkType((DataType)columnDesc.getType(), (boolean)false), true);
        }
        return kylinSchema;
    }
}

