/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.writer;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.state.ConstructState;
import org.apache.gobblin.writer.AvroOrcSchemaConverter;
import org.apache.gobblin.writer.CloseBeforeFlushException;
import org.apache.gobblin.writer.FsDataWriter;
import org.apache.gobblin.writer.FsDataWriterBuilder;
import org.apache.gobblin.writer.GenericRecordToOrcValueWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GobblinOrcWriter
extends FsDataWriter<GenericRecord> {
    private static final Logger log = LoggerFactory.getLogger(GobblinOrcWriter.class);
    static final String ORC_WRITER_PREFIX = "orcWriter.";
    private static final String ORC_WRITER_BATCH_SIZE = "orcWriter.batchSize";
    private static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
    private static final String ORC_WRITER_DEEP_CLEAN_EVERY_BATCH = "orcWriter.deepCleanBatch";
    private final GenericRecordToOrcValueWriter valueWriter;
    @VisibleForTesting
    final VectorizedRowBatch rowBatch;
    private final Writer orcFileWriter;
    private volatile boolean closed = false;
    private final boolean deepCleanBatch;
    private final int batchSize;
    private final Schema avroSchema;

    public GobblinOrcWriter(FsDataWriterBuilder<Schema, GenericRecord> builder, State properties) throws IOException {
        super(builder, properties);
        this.avroSchema = (Schema)builder.getSchema();
        TypeDescription typeDescription = AvroOrcSchemaConverter.getOrcSchema(this.avroSchema);
        this.valueWriter = new GenericRecordToOrcValueWriter(typeDescription, this.avroSchema, properties);
        this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, 1000);
        this.rowBatch = typeDescription.createRowBatch(this.batchSize);
        this.deepCleanBatch = properties.getPropAsBoolean(ORC_WRITER_DEEP_CLEAN_EVERY_BATCH, false);
        log.info("Start to construct a ORC-Native Writer, with batchSize:" + this.batchSize + ", enable batchDeepClean:" + this.deepCleanBatch + "\n, schema in avro format:" + this.avroSchema);
        Configuration conf = new Configuration();
        for (Object key : properties.getProperties().keySet()) {
            conf.set((String)key, properties.getProp((String)key));
        }
        OrcFile.WriterOptions options = OrcFile.writerOptions((Properties)properties.getProperties(), (Configuration)conf);
        options.setSchema(typeDescription);
        this.orcFileWriter = OrcFile.createWriter((Path)this.stagingFile, (OrcFile.WriterOptions)options);
    }

    public long recordsWritten() {
        return this.orcFileWriter.getNumberOfRows();
    }

    public State getFinalState() {
        ConstructState state = new ConstructState(super.getFinalState());
        try {
            state.addOverwriteProperties(new State(GobblinOrcWriter.getPropsWithOrcSchema(this.avroSchema)));
        }
        catch (SerDeException e) {
            throw new RuntimeException("Failure to set schema metadata in finalState properly which could possible lead to incorrect data registration", e);
        }
        return state;
    }

    public void flush() throws IOException {
        if (this.rowBatch.size > 0) {
            this.orcFileWriter.addRowBatch(this.rowBatch);
            this.rowBatch.reset();
            if (this.deepCleanBatch) {
                this.deepCleanRowBatch(this.rowBatch);
            }
        }
    }

    private synchronized void closeInternal() throws IOException {
        if (!this.closed) {
            this.flush();
            this.orcFileWriter.close();
            this.closed = true;
        } else if (this.rowBatch.size > 0) {
            throw new CloseBeforeFlushException(this.avroSchema.getName());
        }
    }

    public void close() throws IOException {
        this.closeInternal();
        super.close();
    }

    public void commit() throws IOException {
        this.closeInternal();
        super.commit();
    }

    public void write(GenericRecord record) throws IOException {
        this.valueWriter.write(record, this.rowBatch);
        if (this.rowBatch.size == this.batchSize) {
            this.orcFileWriter.addRowBatch(this.rowBatch);
            this.rowBatch.reset();
            if (this.deepCleanBatch) {
                log.info("A reset of rowBatch is triggered - releasing holding memory for large object");
                this.deepCleanRowBatch(this.rowBatch);
            }
        }
    }

    @VisibleForTesting
    void deepCleanRowBatch(VectorizedRowBatch rowBatch) {
        for (int i = 0; i < rowBatch.cols.length; ++i) {
            ColumnVector cv = rowBatch.cols[i];
            if (cv == null) continue;
            this.removeRefOfColumnVectorChild(cv);
        }
    }

    private void removeRefOfColumnVectorChild(ColumnVector cv) {
        if (cv instanceof StructColumnVector) {
            StructColumnVector structCv = (StructColumnVector)cv;
            for (ColumnVector childCv : structCv.fields) {
                this.removeRefOfColumnVectorChild(childCv);
            }
        } else if (cv instanceof ListColumnVector) {
            ListColumnVector listCv = (ListColumnVector)cv;
            this.removeRefOfColumnVectorChild(listCv.child);
        } else if (cv instanceof MapColumnVector) {
            MapColumnVector mapCv = (MapColumnVector)cv;
            this.removeRefOfColumnVectorChild(mapCv.keys);
            this.removeRefOfColumnVectorChild(mapCv.values);
        } else if (cv instanceof UnionColumnVector) {
            UnionColumnVector unionCv = (UnionColumnVector)cv;
            for (ColumnVector unionChildCv : unionCv.fields) {
                this.removeRefOfColumnVectorChild(unionChildCv);
            }
        } else if (cv instanceof LongColumnVector) {
            ((LongColumnVector)cv).vector = null;
        } else if (cv instanceof DoubleColumnVector) {
            ((DoubleColumnVector)cv).vector = null;
        } else if (cv instanceof BytesColumnVector) {
            ((BytesColumnVector)cv).vector = null;
            ((BytesColumnVector)cv).start = null;
            ((BytesColumnVector)cv).length = null;
        } else if (cv instanceof DecimalColumnVector) {
            ((DecimalColumnVector)cv).vector = null;
        }
    }

    public boolean isSpeculativeAttemptSafe() {
        return this.writerAttemptIdOptional.isPresent() && ((Object)((Object)this)).getClass() == GobblinOrcWriter.class;
    }

    public static Properties getPropsWithOrcSchema(Schema avroSchema) throws SerDeException {
        Properties properties = new Properties();
        properties.setProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), avroSchema.toString());
        AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(avroSchema);
        properties.setProperty("columns", StringUtils.join((Iterable)aoig.getColumnNames(), (String)","));
        properties.setProperty("columns.types", StringUtils.join((Iterable)aoig.getColumnTypes(), (String)","));
        return properties;
    }
}

