/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.mr.hive;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.hive.HiveIcebergRecordWriter;
import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class HiveIcebergOutputCommitter
extends OutputCommitter {
    private static final String FOR_COMMIT_EXTENSION = ".forCommit";
    private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class);

    public void setupJob(JobContext jobContext) {
    }

    public void setupTask(TaskAttemptContext taskAttemptContext) {
    }

    public boolean needsTaskCommit(TaskAttemptContext context) {
        return TaskType.REDUCE.equals((Object)context.getTaskAttemptID().getTaskID().getTaskType()) || context.getJobConf().getNumReduceTasks() == 0;
    }

    public void commitTask(TaskAttemptContext context) throws IOException {
        TaskAttemptID attemptID = context.getTaskAttemptID();
        String fileForCommitLocation = HiveIcebergOutputCommitter.generateFileForCommitLocation((Configuration)context.getJobConf(), (JobID)attemptID.getJobID(), attemptID.getTaskID().getId());
        HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(attemptID);
        DataFile[] closedFiles = writer != null ? writer.dataFiles() : new DataFile[]{};
        HiveIcebergOutputCommitter.createFileForCommit(closedFiles, fileForCommitLocation, HiveIcebergStorageHandler.io((Configuration)context.getJobConf()));
    }

    public void abortTask(TaskAttemptContext context) throws IOException {
        HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(context.getTaskAttemptID());
        writer.close(true);
    }

    public void commitJob(JobContext jobContext) throws IOException {
        JobConf conf = jobContext.getJobConf();
        Table table = Catalogs.loadTable((Configuration)conf);
        long startTime = System.currentTimeMillis();
        LOG.info("Committing job has started for table: {}, using location: {}", (Object)table, (Object)HiveIcebergOutputCommitter.generateJobLocation((Configuration)conf, jobContext.getJobID()));
        FileIO io = HiveIcebergStorageHandler.io((Configuration)jobContext.getJobConf());
        List<DataFile> dataFiles = HiveIcebergOutputCommitter.dataFiles(jobContext, io, true);
        if (dataFiles.size() > 0) {
            AppendFiles append = table.newAppend();
            dataFiles.forEach(append::appendFile);
            append.commit();
            LOG.info("Commit took {} ms for table: {} with {} file(s)", new Object[]{System.currentTimeMillis() - startTime, table, dataFiles.size()});
            LOG.debug("Added files {}", dataFiles);
        } else {
            LOG.info("Commit took {} ms for table: {} with no new files", (Object)(System.currentTimeMillis() - startTime), (Object)table);
        }
        this.cleanup(jobContext);
    }

    public void abortJob(JobContext jobContext, int status) throws IOException {
        String location = HiveIcebergOutputCommitter.generateJobLocation((Configuration)jobContext.getJobConf(), jobContext.getJobID());
        LOG.info("Job {} is aborted. Cleaning job location {}", (Object)jobContext.getJobID(), (Object)location);
        FileIO io = HiveIcebergStorageHandler.io((Configuration)jobContext.getJobConf());
        List<DataFile> dataFiles = HiveIcebergOutputCommitter.dataFiles(jobContext, io, false);
        if (dataFiles.size() > 0) {
            Tasks.foreach(dataFiles).retry(3).suppressFailureWhenFinished().onFailure((file, exc) -> LOG.debug("Failed on to remove data file {} on abort job", (Object)file.path(), (Object)exc)).run(file -> io.deleteFile(file.path().toString()));
        }
        this.cleanup(jobContext);
    }

    private void cleanup(JobContext jobContext) throws IOException {
        String location = HiveIcebergOutputCommitter.generateJobLocation((Configuration)jobContext.getJobConf(), jobContext.getJobID());
        LOG.info("Cleaning for job: {} on location: {}", (Object)jobContext.getJobID(), (Object)location);
        Tasks.foreach(location).retry(3).suppressFailureWhenFinished().onFailure((file, exc) -> LOG.debug("Failed on to remove directory {} on cleanup job", file, (Object)exc)).run(file -> {
            Path toDelete = new Path(file);
            FileSystem fs = Util.getFs(toDelete, (Configuration)jobContext.getJobConf());
            fs.delete(toDelete, true);
        }, IOException.class);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static List<DataFile> dataFiles(JobContext jobContext, FileIO io, boolean throwOnFailure) {
        JobConf conf = jobContext.getJobConf();
        int expectedFiles = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks();
        ExecutorService executor = null;
        try {
            executor = Executors.newFixedThreadPool(conf.getInt("iceberg.mr.commit.thread.pool.size", 10), new ThreadFactoryBuilder().setDaemon(true).setPriority(5).setNameFormat("iceberg-commit-pool-%d").build());
            List<DataFile> dataFiles = Collections.synchronizedList(new ArrayList());
            Tasks.range(expectedFiles).throwFailureWhenFinished(throwOnFailure).executeWith(executor).retry(3).run(taskId -> {
                String taskFileName = HiveIcebergOutputCommitter.generateFileForCommitLocation((Configuration)conf, jobContext.getJobID(), taskId);
                dataFiles.addAll(Arrays.asList(HiveIcebergOutputCommitter.readFileForCommit(taskFileName, io)));
            });
            List<DataFile> list = dataFiles;
            return list;
        }
        finally {
            if (executor != null) {
                executor.shutdown();
            }
        }
    }

    @VisibleForTesting
    static String generateJobLocation(Configuration conf, JobID jobId) {
        String tableLocation = conf.get("iceberg.mr.table.location");
        String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname);
        return tableLocation + "/temp/" + queryId + "-" + jobId;
    }

    private static String generateFileForCommitLocation(Configuration conf, JobID jobId, int taskId) {
        return HiveIcebergOutputCommitter.generateJobLocation(conf, jobId) + "/task-" + taskId + FOR_COMMIT_EXTENSION;
    }

    private static void createFileForCommit(DataFile[] closedFiles, String location, FileIO io) throws IOException {
        OutputFile fileForCommit = io.newOutputFile(location);
        try (ObjectOutputStream oos = new ObjectOutputStream(fileForCommit.createOrOverwrite());){
            oos.writeObject(closedFiles);
        }
        LOG.debug("Iceberg committed file is created {}", (Object)fileForCommit);
    }

    private static DataFile[] readFileForCommit(String fileForCommitLocation, FileIO io) {
        DataFile[] dataFileArray;
        ObjectInputStream ois = new ObjectInputStream(io.newInputFile(fileForCommitLocation).newStream());
        Throwable throwable = null;
        try {
            dataFileArray = (DataFile[])ois.readObject();
        }
        catch (Throwable throwable2) {
            try {
                try {
                    throwable = throwable2;
                    throw throwable2;
                }
                catch (Throwable throwable3) {
                    HiveIcebergOutputCommitter.$closeResource(throwable, ois);
                    throw throwable3;
                }
            }
            catch (IOException | ClassNotFoundException e) {
                throw new NotFoundException("Can not read or parse committed file: %s", fileForCommitLocation);
            }
        }
        HiveIcebergOutputCommitter.$closeResource(throwable, ois);
        return dataFileArray;
    }
}

