/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DirEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.EntryFileIO;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.LoadedManifestData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.util.functional.TaskPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoadManifestsStage
extends AbstractJobOrTaskStage<Arguments, Result> {
    private static final Logger LOG = LoggerFactory.getLogger(LoadManifestsStage.class);
    private final SummaryInfo summaryInfo = new SummaryInfo();
    private final Map<String, DirEntry> directories = new ConcurrentHashMap<String, DirEntry>();
    private EntryFileIO.EntryWriter entryWriter;

    public LoadManifestsStage(StageConfig stageConfig) {
        super(false, stageConfig, "job_stage_load_manifests", true);
    }

    @Override
    protected Result executeStage(Arguments arguments) throws IOException {
        EntryFileIO entryFileIO = new EntryFileIO(this.getStageConfig().getConf());
        Path manifestDir = this.getTaskManifestDir();
        LOG.info("{}: Executing Manifest Job Commit with manifests in {}", (Object)this.getName(), (Object)manifestDir);
        Path entrySequenceData = arguments.getEntrySequenceData();
        this.entryWriter = entryFileIO.launchEntryWriter(entryFileIO.createWriter(entrySequenceData), arguments.queueCapacity);
        try {
            this.msync(manifestDir);
            RemoteIterator<FileStatus> manifestFiles = RemoteIterators.haltableRemoteIterator(this.listManifests(), () -> this.entryWriter.isActive());
            this.processAllManifests(manifestFiles);
            ManifestCommitterSupport.maybeAddIOStatistics(this.getIOStatistics(), manifestFiles);
            LOG.info("{}: Summary of {} manifests loaded in {}: {}", this.getName(), this.summaryInfo.manifestCount, manifestDir, this.summaryInfo);
            this.entryWriter.close();
            this.entryWriter.maybeRaiseWriteException();
        }
        catch (EntryWriteException e) {
            this.entryWriter.maybeRaiseWriteException();
            throw e;
        }
        finally {
            this.entryWriter.close();
        }
        LoadedManifestData loadedManifestData = new LoadedManifestData(new ArrayList<DirEntry>(this.directories.values()), entrySequenceData, this.entryWriter.getCount());
        return new Result(this.summaryInfo, loadedManifestData);
    }

    private void processAllManifests(RemoteIterator<FileStatus> manifestFiles) throws IOException {
        IOStatisticsBinding.trackDurationOfInvocation(this.getIOStatistics(), "op_load_all_manifests", () -> TaskPool.foreach(manifestFiles).executeWith(this.getIOProcessors()).stopOnFailure().run(this::processOneManifest));
    }

    private void processOneManifest(FileStatus status) throws IOException {
        this.updateAuditContext("op_load_all_manifests");
        TaskManifest manifest = this.fetchTaskManifest(status);
        this.progress();
        int created = this.coalesceDirectories(manifest);
        String attemptID = manifest.getTaskAttemptID();
        LOG.debug("{}: task attempt {} added {} directories", this.getName(), attemptID, created);
        this.summaryInfo.add(manifest);
        manifest.setIOStatistics(null);
        manifest.getExtraData().clear();
        boolean enqueued = this.entryWriter.enqueue(manifest.getFilesToCommit());
        if (!enqueued) {
            LOG.warn("{}: Failed to write manifest for task {}", (Object)this.getName(), (Object)attemptID);
            throw new EntryWriteException(attemptID);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    int coalesceDirectories(TaskManifest manifest) {
        List<DirEntry> toCreate = manifest.getDestDirectories().stream().filter(e -> !this.directories.containsKey(e)).collect(Collectors.toList());
        if (!toCreate.isEmpty()) {
            Map<String, DirEntry> map = this.directories;
            synchronized (map) {
                toCreate.forEach(entry -> this.directories.putIfAbsent(entry.getDir(), (DirEntry)entry));
            }
        }
        return toCreate.size();
    }

    private TaskManifest fetchTaskManifest(FileStatus status) throws IOException {
        if (status.getLen() == 0L || !status.isFile()) {
            throw new PathIOException(status.getPath().toString(), "Not a valid manifest file; file status = " + status);
        }
        TaskManifest manifest = this.loadManifest(status);
        String id = manifest.getTaskAttemptID();
        int filecount = manifest.getFilesToCommit().size();
        long size = manifest.getTotalFileSize();
        LOG.info("{}: Task Attempt {} file {}: File count: {}; data size={}", this.getName(), id, status.getPath(), filecount, size);
        IOStatisticsStore iostats = this.getIOStatistics();
        iostats.addSample("committer_task_manifest_file_size", status.getLen());
        iostats.addSample("committer_task_file_count", filecount);
        iostats.addSample("committer_task_directory_count", manifest.getDestDirectories().size());
        return manifest;
    }

    public static final class SummaryInfo
    implements IOStatisticsSource {
        private final IOStatisticsSnapshot iostatistics = IOStatisticsSupport.snapshotIOStatistics();
        private final List<String> taskIDs = new ArrayList<String>();
        private final List<String> taskAttemptIDs = new ArrayList<String>();
        private AtomicLong manifestCount = new AtomicLong();
        private AtomicLong fileCount = new AtomicLong();
        private AtomicLong directoryCount = new AtomicLong();
        private AtomicLong totalFileSize = new AtomicLong();

        @Override
        public IOStatisticsSnapshot getIOStatistics() {
            return this.iostatistics;
        }

        public long getFileCount() {
            return this.fileCount.get();
        }

        public long getDirectoryCount() {
            return this.directoryCount.get();
        }

        public long getTotalFileSize() {
            return this.totalFileSize.get();
        }

        public long getManifestCount() {
            return this.manifestCount.get();
        }

        public List<String> getTaskIDs() {
            return this.taskIDs;
        }

        public List<String> getTaskAttemptIDs() {
            return this.taskAttemptIDs;
        }

        public synchronized void add(TaskManifest manifest) {
            this.manifestCount.incrementAndGet();
            this.iostatistics.aggregate(manifest.getIOStatistics());
            this.fileCount.addAndGet(manifest.getFilesToCommit().size());
            this.directoryCount.addAndGet(manifest.getDestDirectories().size());
            this.totalFileSize.addAndGet(manifest.getTotalFileSize());
            this.taskIDs.add(manifest.getTaskID());
            this.taskAttemptIDs.add(manifest.getTaskAttemptID());
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("SummaryInfo{");
            sb.append("manifestCount=").append(this.getManifestCount());
            sb.append(", fileCount=").append(this.getFileCount());
            sb.append(", directoryCount=").append(this.getDirectoryCount());
            sb.append(", totalFileSize=").append(FileUtils.byteCountToDisplaySize(this.getTotalFileSize()));
            sb.append('}');
            return sb.toString();
        }
    }

    public static final class EntryWriteException
    extends IOException {
        private EntryWriteException(String taskId) {
            super("Failed to write manifest data for task " + taskId + "to local file");
        }
    }

    public static final class Result {
        private final SummaryInfo summary;
        private final LoadedManifestData loadedManifestData;

        public Result(SummaryInfo summary, LoadedManifestData loadedManifestData) {
            this.summary = summary;
            this.loadedManifestData = loadedManifestData;
        }

        public SummaryInfo getSummary() {
            return this.summary;
        }

        public LoadedManifestData getLoadedManifestData() {
            return this.loadedManifestData;
        }
    }

    public static final class Arguments {
        private final File entrySequenceFile;
        private final int queueCapacity;

        public Arguments(File entrySequenceFile, int queueCapacity) {
            this.entrySequenceFile = entrySequenceFile;
            this.queueCapacity = queueCapacity;
        }

        private Path getEntrySequenceData() {
            return new Path(this.entrySequenceFile.toURI());
        }
    }
}

