/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.task.reduce;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.IndexRecord;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SpillRecord;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.reduce.ExceptionReporter;
import org.apache.hadoop.mapreduce.task.reduce.Fetcher;
import org.apache.hadoop.mapreduce.task.reduce.MapHost;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput;
import org.apache.hadoop.mapreduce.task.reduce.MergeManager;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleClientMetrics;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl;

class LocalFetcher<K, V>
extends Fetcher<K, V> {
    private static final Log LOG = LogFactory.getLog(LocalFetcher.class);
    private static final MapHost LOCALHOST = new MapHost("local", "local");
    private JobConf job;
    private Map<TaskAttemptID, MapOutputFile> localMapFiles;

    public LocalFetcher(JobConf job, TaskAttemptID reduceId, ShuffleSchedulerImpl<K, V> scheduler, MergeManager<K, V> merger, Reporter reporter, ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, SecretKey shuffleKey, Map<TaskAttemptID, MapOutputFile> localMapFiles) {
        super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter, shuffleKey);
        this.job = job;
        this.localMapFiles = localMapFiles;
        this.setName("localfetcher#" + this.id);
        this.setDaemon(true);
    }

    @Override
    public void run() {
        HashSet<TaskAttemptID> maps = new HashSet<TaskAttemptID>();
        for (TaskAttemptID map : this.localMapFiles.keySet()) {
            maps.add(map);
        }
        while (maps.size() > 0) {
            try {
                this.merger.waitForResource();
                this.metrics.threadBusy();
                this.doCopy(maps);
                this.metrics.threadFree();
            }
            catch (InterruptedException ie) {
            }
            catch (Throwable t) {
                this.exceptionReporter.reportException(t);
            }
        }
    }

    private void doCopy(Set<TaskAttemptID> maps) throws IOException {
        Iterator<TaskAttemptID> iter = maps.iterator();
        while (iter.hasNext()) {
            TaskAttemptID map = iter.next();
            LOG.debug("LocalFetcher " + this.id + " going to fetch: " + map);
            if (!this.copyMapOutput(map)) break;
            iter.remove();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException {
        Path mapOutputFileName = this.localMapFiles.get(mapTaskId).getOutputFile();
        Path indexFileName = mapOutputFileName.suffix(".index");
        SpillRecord sr = new SpillRecord(indexFileName, this.job);
        IndexRecord ir = sr.getIndex(this.reduce);
        long compressedLength = ir.partLength;
        long decompressedLength = ir.rawLength;
        MapOutput mapOutput = this.merger.reserve(mapTaskId, decompressedLength, this.id);
        if (mapOutput == null) {
            LOG.info("fetcher#" + this.id + " - MergeManager returned Status.WAIT ...");
            return false;
        }
        LOG.info("localfetcher#" + this.id + " about to shuffle output of map " + mapOutput.getMapId() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + mapOutput.getDescription());
        FileSystem localFs = FileSystem.getLocal(this.job).getRaw();
        FSDataInputStream inStream = localFs.open(mapOutputFileName);
        try {
            inStream.seek(ir.startOffset);
            mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, this.metrics, this.reporter);
        }
        finally {
            try {
                inStream.close();
            }
            catch (IOException ioe) {
                LOG.warn("IOException closing inputstream from map output: " + ioe.toString());
            }
        }
        this.scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0L, mapOutput);
        return true;
    }
}

