/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.writer;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Properties;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.state.ConstructState;
import org.apache.gobblin.writer.CloseBeforeFlushException;
import org.apache.gobblin.writer.FsDataWriter;
import org.apache.gobblin.writer.FsDataWriterBuilder;
import org.apache.gobblin.writer.OrcValueWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class GobblinBaseOrcWriter<S, D>
extends FsDataWriter<D> {
    private static final Logger log = LoggerFactory.getLogger(GobblinBaseOrcWriter.class);
    static final String ORC_WRITER_PREFIX = "orcWriter.";
    public static final String ORC_WRITER_BATCH_SIZE = "orcWriter.batchSize";
    public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
    private static final String CONTAINER_MEMORY_MBS = "orcWriter.jvm.memory.mbs";
    private static final int DEFAULT_CONTAINER_MEMORY_MBS = 4096;
    private static final String CONTAINER_JVM_MEMORY_XMX_RATIO_KEY = "orcWriter.container.jvmMemoryXmxRatio";
    private static final double DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO_KEY = 1.0;
    static final String CONTAINER_JVM_MEMORY_OVERHEAD_MBS = "orcWriter.container.jvmMemoryOverheadMbs";
    private static final int DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS = 0;
    @VisibleForTesting
    static final String ORC_WRITER_AUTO_TUNE_ENABLED = "orcWriter.autoTuneEnabled";
    private static final boolean ORC_WRITER_AUTO_TUNE_DEFAULT = false;
    private static final long EXEMPLIFIED_RECORD_SIZE_IN_BYTES = 1024L;
    private static final int ESTIMATED_PARALLELISM_WRITERS = 3;
    @VisibleForTesting
    static final String RECORD_SIZE_SCALE_FACTOR = "recordSize.scaleFactor";
    static final int DEFAULT_RECORD_SIZE_SCALE_FACTOR = 6;
    private static final String ORC_WRITER_DEEP_CLEAN_EVERY_BATCH = "orcWriter.deepCleanBatch";
    private final OrcValueWriter<D> valueWriter;
    @VisibleForTesting
    final VectorizedRowBatch rowBatch;
    private final Writer orcFileWriter;
    private volatile boolean closed = false;
    private final boolean deepCleanBatch;
    private final int batchSize;
    protected final S inputSchema;

    protected void autoTunedOrcWriterParams(State properties) {
        double writerRatio = properties.getPropAsDouble(OrcConf.MEMORY_POOL.name(), ((Double)OrcConf.MEMORY_POOL.getDefaultValue()).doubleValue());
        long availableHeapPerWriter = Math.round((double)this.availableHeapSize(properties) * writerRatio / 3.0);
        long estimatedRecordSize = this.getEstimatedRecordSize(properties);
        long rowsBetweenCheck = availableHeapPerWriter * 1024L / estimatedRecordSize;
        properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.name(), (Object)Math.min(rowsBetweenCheck, (long)((Integer)OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue()).intValue()));
        long batchSize = Math.min(rowsBetweenCheck / 4L, 1000L);
        properties.setProp(ORC_WRITER_BATCH_SIZE, (Object)batchSize);
        log.info("Tuned the parameter " + OrcConf.ROWS_BETWEEN_CHECKS.name() + " to be:" + rowsBetweenCheck + "," + ORC_WRITER_BATCH_SIZE + " to be:" + batchSize);
    }

    protected long availableHeapSize(State properties) {
        long physicalMem = Math.round((double)properties.getPropAsLong(CONTAINER_MEMORY_MBS, 4096L) * properties.getPropAsDouble(CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, 1.0));
        long nonHeap = properties.getPropAsLong(CONTAINER_JVM_MEMORY_OVERHEAD_MBS, 0L);
        return physicalMem - nonHeap;
    }

    protected long getEstimatedRecordSize(State properties) {
        long estimatedRecordSizeScale = properties.getPropAsInt(RECORD_SIZE_SCALE_FACTOR, 6);
        return (properties.contains("avg.record.size") ? properties.getPropAsLong("avg.record.size") : 1024L) * estimatedRecordSizeScale;
    }

    public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties) throws IOException {
        super(builder, properties);
        if (properties.getPropAsBoolean(ORC_WRITER_AUTO_TUNE_ENABLED, false)) {
            this.autoTunedOrcWriterParams(properties);
        }
        this.inputSchema = builder.getSchema();
        TypeDescription typeDescription = this.getOrcSchema();
        this.valueWriter = this.getOrcValueWriter(typeDescription, this.inputSchema, properties);
        this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, 1000);
        this.rowBatch = typeDescription.createRowBatch(this.batchSize);
        this.deepCleanBatch = properties.getPropAsBoolean(ORC_WRITER_DEEP_CLEAN_EVERY_BATCH, false);
        log.info("Start to construct a ORC-Native Writer, with batchSize:" + this.batchSize + ", enable batchDeepClean:" + this.deepCleanBatch + "\n, schema in input format:" + this.inputSchema);
        Configuration conf = new Configuration();
        for (Object key : properties.getProperties().keySet()) {
            conf.set((String)key, properties.getProp((String)key));
        }
        OrcFile.WriterOptions options = OrcFile.writerOptions((Properties)properties.getProperties(), (Configuration)conf);
        options.setSchema(typeDescription);
        this.orcFileWriter = OrcFile.createWriter((Path)this.stagingFile, (OrcFile.WriterOptions)options);
    }

    protected abstract TypeDescription getOrcSchema();

    protected abstract OrcValueWriter<D> getOrcValueWriter(TypeDescription var1, S var2, State var3);

    protected abstract Properties getPropsWithOrcSchema() throws SerDeException;

    public long recordsWritten() {
        return this.orcFileWriter.getNumberOfRows();
    }

    public State getFinalState() {
        ConstructState state = new ConstructState(super.getFinalState());
        try {
            state.addOverwriteProperties(new State(this.getPropsWithOrcSchema()));
        }
        catch (SerDeException e) {
            throw new RuntimeException("Failure to set schema metadata in finalState properly which could possible lead to incorrect data registration", e);
        }
        return state;
    }

    public void flush() throws IOException {
        if (this.rowBatch.size > 0) {
            this.orcFileWriter.addRowBatch(this.rowBatch);
            this.rowBatch.reset();
            if (this.deepCleanBatch) {
                this.deepCleanRowBatch(this.rowBatch);
            }
        }
    }

    private synchronized void closeInternal() throws IOException {
        if (!this.closed) {
            this.flush();
            this.orcFileWriter.close();
            this.closed = true;
        } else if (this.rowBatch.size > 0) {
            throw new CloseBeforeFlushException(this.inputSchema.toString());
        }
    }

    public void close() throws IOException {
        this.closeInternal();
        super.close();
    }

    public void commit() throws IOException {
        this.closeInternal();
        super.commit();
    }

    public void write(D record) throws IOException {
        this.valueWriter.write(record, this.rowBatch);
        if (this.rowBatch.size == this.batchSize) {
            this.orcFileWriter.addRowBatch(this.rowBatch);
            this.rowBatch.reset();
            if (this.deepCleanBatch) {
                log.info("A reset of rowBatch is triggered - releasing holding memory for large object");
                this.deepCleanRowBatch(this.rowBatch);
            }
        }
    }

    @VisibleForTesting
    void deepCleanRowBatch(VectorizedRowBatch rowBatch) {
        for (int i = 0; i < rowBatch.cols.length; ++i) {
            ColumnVector cv = rowBatch.cols[i];
            if (cv == null) continue;
            this.removeRefOfColumnVectorChild(cv);
        }
    }

    private void removeRefOfColumnVectorChild(ColumnVector cv) {
        if (cv instanceof StructColumnVector) {
            StructColumnVector structCv = (StructColumnVector)cv;
            for (ColumnVector childCv : structCv.fields) {
                this.removeRefOfColumnVectorChild(childCv);
            }
        } else if (cv instanceof ListColumnVector) {
            ListColumnVector listCv = (ListColumnVector)cv;
            this.removeRefOfColumnVectorChild(listCv.child);
        } else if (cv instanceof MapColumnVector) {
            MapColumnVector mapCv = (MapColumnVector)cv;
            this.removeRefOfColumnVectorChild(mapCv.keys);
            this.removeRefOfColumnVectorChild(mapCv.values);
        } else if (cv instanceof UnionColumnVector) {
            UnionColumnVector unionCv = (UnionColumnVector)cv;
            for (ColumnVector unionChildCv : unionCv.fields) {
                this.removeRefOfColumnVectorChild(unionChildCv);
            }
        } else if (cv instanceof LongColumnVector) {
            ((LongColumnVector)cv).vector = null;
        } else if (cv instanceof DoubleColumnVector) {
            ((DoubleColumnVector)cv).vector = null;
        } else if (cv instanceof BytesColumnVector) {
            ((BytesColumnVector)cv).vector = null;
            ((BytesColumnVector)cv).start = null;
            ((BytesColumnVector)cv).length = null;
        } else if (cv instanceof DecimalColumnVector) {
            ((DecimalColumnVector)cv).vector = null;
        }
    }
}

