/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.compaction.hive;

import java.io.IOException;
import java.io.InputStream;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.compaction.hive.CompactionRunner;
import org.apache.gobblin.compaction.hive.HdfsIO;
import org.apache.gobblin.compaction.hive.HdfsReader;
import org.apache.gobblin.compaction.hive.HdfsWriter;
import org.apache.gobblin.compaction.hive.HiveAttribute;
import org.apache.gobblin.compaction.hive.HiveManagedTable;
import org.apache.gobblin.compaction.hive.HiveTable;
import org.apache.gobblin.util.HiveJdbcConnector;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AvroExternalTable
extends HiveTable {
    private static final Logger LOG = LoggerFactory.getLogger(AvroExternalTable.class);
    private static final String HIVE_TMPSCHEMA_DIR = "hive.tmpschema.dir";
    private static final String HIVE_TMPDATA_DIR = "hive.tmpdata.dir";
    private static final String HIVE_TMPDATA_DIR_DEFAULT = "/";
    private static final String CREATE_TABLE_STMT = "CREATE EXTERNAL TABLE %1$s  ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' LOCATION '%2$s' TBLPROPERTIES ('avro.schema.url'='%3$s')";
    private final String dataLocationInHdfs;
    private final String schemaLocationInHdfs;
    private final boolean deleteSchemaAfterDone;
    private final boolean deleteDataAfterDone;

    private AvroExternalTable(Builder builder) throws IOException {
        super(builder);
        if (builder.moveDataToTmpHdfsDir) {
            this.dataLocationInHdfs = this.moveDataFileToSeparateHdfsDir(builder.dataLocationInHdfs, builder.extensionToBeMoved);
            this.deleteDataAfterDone = true;
        } else {
            this.dataLocationInHdfs = builder.dataLocationInHdfs;
            this.deleteDataAfterDone = false;
        }
        if (StringUtils.isNotBlank((String)builder.schemaLocationInHdfs)) {
            this.schemaLocationInHdfs = builder.schemaLocationInHdfs;
            this.attributes = this.getAttributesFromAvroSchemaFile();
            this.deleteSchemaAfterDone = false;
        } else {
            Schema schema = this.getSchemaFromAvroDataFile();
            this.attributes = AvroExternalTable.parseSchema(schema);
            this.schemaLocationInHdfs = this.writeSchemaToHdfs(schema);
            this.deleteSchemaAfterDone = true;
        }
    }

    private List<HiveAttribute> getAttributesFromAvroSchemaFile() throws IOException {
        try (InputStream schemaInputStream = new HdfsReader(this.schemaLocationInHdfs).getInputStream();){
            Schema schema = new Schema.Parser().parse(schemaInputStream);
            List<HiveAttribute> list = AvroExternalTable.parseSchema(schema);
            return list;
        }
    }

    private Schema getSchemaFromAvroDataFile() throws IOException {
        String firstDataFilePath = HdfsReader.getFirstDataFilePathInDir(this.dataLocationInHdfs);
        LOG.info("Extracting schema for table " + this.name + " from avro data file " + firstDataFilePath);
        FsInput sin = new HdfsReader(firstDataFilePath).getFsInput();
        try (DataFileReader dfr = new DataFileReader((SeekableInput)sin, (DatumReader)new GenericDatumReader());){
            Schema schema;
            Schema schema2 = schema = dfr.getSchema();
            return schema2;
        }
    }

    private String writeSchemaToHdfs(Schema schema) throws IOException {
        String defaultTmpSchemaDir = AvroExternalTable.getParentDir(this.dataLocationInHdfs);
        String tmpSchemaDir = CompactionRunner.jobProperties.getProperty(HIVE_TMPSCHEMA_DIR, defaultTmpSchemaDir);
        tmpSchemaDir = AvroExternalTable.addSlash(tmpSchemaDir);
        String tmpSchemaPath = tmpSchemaDir + UUID.randomUUID().toString() + ".avsc";
        HdfsWriter writer = new HdfsWriter(tmpSchemaPath);
        LOG.info("writing schema to HDFS location " + tmpSchemaPath);
        writer.write(schema.toString(true));
        return tmpSchemaPath;
    }

    private static String getParentDir(String filePathInHdfs) {
        return new Path(filePathInHdfs).getParent().toString();
    }

    private static List<HiveAttribute> parseSchema(Schema schema) {
        ArrayList<HiveAttribute> attributes = new ArrayList<HiveAttribute>();
        List fields = schema.getFields();
        for (Schema.Field field : fields) {
            attributes.add(AvroExternalTable.convertAvroSchemaFieldToHiveAttribute(field));
        }
        return attributes;
    }

    private static HiveAttribute convertAvroSchemaFieldToHiveAttribute(Schema.Field field) {
        String avroFieldType = field.schema().getType().toString();
        if (avroFieldType.equalsIgnoreCase("UNION")) {
            avroFieldType = AvroExternalTable.extractAvroTypeFromUnion(field);
        }
        if (HiveAttribute.fromAvroType(avroFieldType) == null) {
            throw new RuntimeException("Hive does not support attribute type '" + avroFieldType + "'");
        }
        return new HiveAttribute(field.name(), HiveAttribute.fromAvroType(avroFieldType));
    }

    private static String extractAvroTypeFromUnion(Schema.Field field) {
        if (field.schema().getTypes().size() >= 3) {
            LOG.warn("Avro schema field " + field.name() + " has 3 or more types: using the first non-null type");
        }
        for (Schema schema : field.schema().getTypes()) {
            if (schema.getType().toString().equalsIgnoreCase("NULL")) continue;
            return schema.getType().toString();
        }
        String message = "Avro schema field " + field.name() + " is a union, but it does not contain a non-null field type.";
        LOG.error(message);
        throw new RuntimeException(message);
    }

    public String getDataLocationInHdfs() {
        return this.dataLocationInHdfs;
    }

    public String getSchemaLocationInHdfs() {
        return this.schemaLocationInHdfs;
    }

    @Override
    public void createTable(HiveJdbcConnector conn, String jobID) throws SQLException {
        String tableName = this.getNameWithJobId(jobID);
        String dropTableStmt = String.format("DROP TABLE IF EXISTS %1$s", tableName);
        String hdfsUri = HdfsIO.getHdfsUri();
        String createTableStmt = String.format(CREATE_TABLE_STMT, tableName, hdfsUri + this.dataLocationInHdfs, hdfsUri + this.schemaLocationInHdfs);
        conn.executeStatements(new String[]{dropTableStmt, createTableStmt});
    }

    @Override
    public HiveTable addNewColumnsInSchema(HiveJdbcConnector conn, HiveTable table, String jobId) throws SQLException {
        if (this.hasNoNewColumn(table)) {
            return this;
        }
        HiveManagedTable managedTable = ((HiveManagedTable.Builder)((HiveManagedTable.Builder)((HiveManagedTable.Builder)new HiveManagedTable.Builder().withName(this.name)).withPrimaryKeys(this.primaryKeys)).withAttributes(this.attributes)).build();
        return managedTable.addNewColumnsInSchema(null, table, jobId);
    }

    protected void deleteTmpFilesIfNeeded() throws IllegalArgumentException, IOException {
        if (this.deleteSchemaAfterDone) {
            new HdfsWriter(this.schemaLocationInHdfs).delete();
        }
        if (this.deleteDataAfterDone) {
            new HdfsWriter(this.dataLocationInHdfs).delete();
        }
    }

    private String moveDataFileToSeparateHdfsDir(String sourceDir, String extension) throws IOException {
        String parentDir = CompactionRunner.jobProperties.getProperty(HIVE_TMPDATA_DIR, HIVE_TMPDATA_DIR_DEFAULT);
        parentDir = AvroExternalTable.addSlash(parentDir);
        String destination = parentDir + UUID.randomUUID().toString();
        LOG.info("Moving data file of table " + this.getName() + " to " + destination);
        HdfsWriter.moveSelectFiles(extension, sourceDir, destination);
        LOG.info("Moved data file of table " + this.getName() + " to " + destination);
        return destination;
    }

    private static String addSlash(String dir) {
        if (!dir.endsWith(HIVE_TMPDATA_DIR_DEFAULT) && !dir.endsWith("\\")) {
            return dir + HIVE_TMPDATA_DIR_DEFAULT;
        }
        return dir;
    }

    public boolean hasSamePrimaryKey(AvroExternalTable other) {
        return this.primaryKeys.containsAll(other.primaryKeys) && other.primaryKeys.containsAll(this.primaryKeys);
    }

    public static class Builder
    extends HiveTable.Builder<Builder> {
        private String dataLocationInHdfs = "";
        private String schemaLocationInHdfs = "";
        private boolean moveDataToTmpHdfsDir = false;
        private String extensionToBeMoved;

        public Builder withDataLocation(String dataLocationInHdfs) {
            this.dataLocationInHdfs = dataLocationInHdfs;
            return this;
        }

        public Builder withSchemaLocation(String schemaLocationInHdfs) {
            this.schemaLocationInHdfs = schemaLocationInHdfs;
            return this;
        }

        public Builder withMoveDataToTmpHdfsDir(String extensionToBeMoved) {
            this.moveDataToTmpHdfsDir = true;
            this.extensionToBeMoved = extensionToBeMoved;
            return this;
        }

        public AvroExternalTable build() throws IOException {
            return new AvroExternalTable(this);
        }
    }
}

