/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle.impl;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.combine.Combiner;
import org.apache.tez.runtime.library.common.shuffle.impl.ExceptionReporter;
import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryWriter;
import org.apache.tez.runtime.library.common.shuffle.impl.MapOutput;
import org.apache.tez.runtime.library.common.shuffle.impl.MergeThread;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class MergeManager {
    private static final Log LOG = LogFactory.getLog(MergeManager.class);
    private final Configuration conf;
    private final FileSystem localFS;
    private final FileSystem rfs;
    private final LocalDirAllocator localDirAllocator;
    private final TezTaskOutputFiles mapOutputFile;
    private final Progressable nullProgressable = new NullProgressable();
    private final Combiner combiner;
    Set<MapOutput> inMemoryMergedMapOutputs = new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
    private IntermediateMemoryToMemoryMerger memToMemMerger;
    Set<MapOutput> inMemoryMapOutputs = new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
    private InMemoryMerger inMemoryMerger;
    Set<Path> onDiskMapOutputs = new TreeSet<Path>();
    private OnDiskMerger onDiskMerger;
    private long memoryLimit;
    private int postMergeMemLimit;
    private long usedMemory;
    private long commitMemory;
    private int ioSortFactor;
    private long maxSingleShuffleLimit;
    private int memToMemMergeOutputsThreshold;
    private long mergeThreshold;
    private long initialMemoryAvailable = -1L;
    private final ExceptionReporter exceptionReporter;
    private final TezInputContext inputContext;
    private final TezCounter spilledRecordsCounter;
    private final TezCounter reduceCombineInputCounter;
    private final TezCounter mergedMapOutputsCounter;
    private CompressionCodec codec;
    private volatile boolean finalMergeComplete = false;
    private boolean ifileReadAhead;
    private int ifileReadAheadLength;
    private int ifileBufferSize;
    private final MapOutput stallShuffle = new MapOutput(null);

    public MergeManager(Configuration conf, FileSystem localFS, LocalDirAllocator localDirAllocator, TezInputContext inputContext, Combiner combiner, TezCounter spilledRecordsCounter, TezCounter reduceCombineInputCounter, TezCounter mergedMapOutputsCounter, ExceptionReporter exceptionReporter) {
        this.inputContext = inputContext;
        this.conf = conf;
        this.localDirAllocator = localDirAllocator;
        this.exceptionReporter = exceptionReporter;
        this.combiner = combiner;
        this.reduceCombineInputCounter = reduceCombineInputCounter;
        this.spilledRecordsCounter = spilledRecordsCounter;
        this.mergedMapOutputsCounter = mergedMapOutputsCounter;
        this.mapOutputFile = new TezTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
        this.localFS = localFS;
        this.rfs = ((LocalFileSystem)localFS).getRaw();
    }

    void setInitialMemoryAvailable(long available) {
        this.initialMemoryAvailable = available;
    }

    @InterfaceAudience.Private
    void configureAndStart() {
        Preconditions.checkState((this.initialMemoryAvailable != -1L ? 1 : 0) != 0, (Object)"Initial available memory must be configured before starting");
        if (ConfigUtils.isIntermediateInputCompressed(this.conf)) {
            Class<? extends CompressionCodec> codecClass = ConfigUtils.getIntermediateInputCompressorClass(this.conf, DefaultCodec.class);
            this.codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, (Configuration)this.conf);
        } else {
            this.codec = null;
        }
        this.ifileReadAhead = this.conf.getBoolean("tez.runtime.ifile.readahead", true);
        this.ifileReadAheadLength = this.ifileReadAhead ? this.conf.getInt("tez.runtime.ifile.readahead.bytes", 0x400000) : 0;
        this.ifileBufferSize = this.conf.getInt("io.file.buffer.size", -1);
        float maxInMemCopyUse = this.conf.getFloat("tez.runtime.shuffle.input.buffer.percent", 0.9f);
        if ((double)maxInMemCopyUse > 1.0 || (double)maxInMemCopyUse < 0.0) {
            throw new IllegalArgumentException("Invalid value for tez.runtime.shuffle.input.buffer.percent: " + maxInMemCopyUse);
        }
        long memLimit = (long)((float)this.conf.getLong("tez.runtime.task.memory", Math.min(this.inputContext.getTotalMemoryAvailableToTask(), Integer.MAX_VALUE)) * maxInMemCopyUse);
        float maxRedPer = this.conf.getFloat("tez.runtime.task.input.buffer.percent", 0.0f);
        if ((double)maxRedPer > 1.0 || (double)maxRedPer < 0.0) {
            throw new TezUncheckedException("tez.runtime.task.input.buffer.percent" + maxRedPer);
        }
        int maxRedBuffer = (int)Math.min((float)this.inputContext.getTotalMemoryAvailableToTask() * maxRedPer, 2.1474836E9f);
        this.memoryLimit = this.initialMemoryAvailable < memLimit ? this.initialMemoryAvailable : memLimit;
        this.postMergeMemLimit = this.initialMemoryAvailable < (long)maxRedBuffer ? (int)this.initialMemoryAvailable : maxRedBuffer;
        LOG.info((Object)("InitialRequest: ShuffleMem=" + memLimit + ", postMergeMem=" + maxRedBuffer + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + "Updated to: ShuffleMem=" + this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit));
        this.ioSortFactor = this.conf.getInt("tez.runtime.io.sort.factor", 100);
        float singleShuffleMemoryLimitPercent = this.conf.getFloat("tez.runtime.shuffle.memory.limit.percent", 0.25f);
        if (singleShuffleMemoryLimitPercent <= 0.0f || singleShuffleMemoryLimitPercent > 1.0f) {
            throw new IllegalArgumentException("Invalid value for tez.runtime.shuffle.memory.limit.percent: " + singleShuffleMemoryLimitPercent);
        }
        this.maxSingleShuffleLimit = (long)((float)this.memoryLimit * singleShuffleMemoryLimitPercent);
        this.memToMemMergeOutputsThreshold = this.conf.getInt("tez.runtime.shuffle.memory-to-memory.segments", this.ioSortFactor);
        this.mergeThreshold = (long)((float)this.memoryLimit * this.conf.getFloat("tez.runtime.shuffle.merge.percent", 0.9f));
        LOG.info((Object)("MergerManager: memoryLimit=" + this.memoryLimit + ", " + "maxSingleShuffleLimit=" + this.maxSingleShuffleLimit + ", " + "mergeThreshold=" + this.mergeThreshold + ", " + "ioSortFactor=" + this.ioSortFactor + ", " + "memToMemMergeOutputsThreshold=" + this.memToMemMergeOutputsThreshold));
        if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
            throw new RuntimeException("Invlaid configuration: maxSingleShuffleLimit should be less than mergeThresholdmaxSingleShuffleLimit: " + this.maxSingleShuffleLimit + "mergeThreshold: " + this.mergeThreshold);
        }
        boolean allowMemToMemMerge = this.conf.getBoolean("tez.runtime.shuffle.memory-to-memory.enable", false);
        if (allowMemToMemMerge) {
            this.memToMemMerger = new IntermediateMemoryToMemoryMerger(this, this.memToMemMergeOutputsThreshold);
            this.memToMemMerger.start();
        } else {
            this.memToMemMerger = null;
        }
        this.inMemoryMerger = new InMemoryMerger(this);
        this.inMemoryMerger.start();
        this.onDiskMerger = new OnDiskMerger(this);
        this.onDiskMerger.start();
    }

    @InterfaceAudience.Private
    static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
        float maxInMemCopyUse = conf.getFloat("tez.runtime.shuffle.input.buffer.percent", 0.9f);
        if ((double)maxInMemCopyUse > 1.0 || (double)maxInMemCopyUse < 0.0) {
            throw new IllegalArgumentException("Invalid value for tez.runtime.shuffle.input.buffer.percent: " + maxInMemCopyUse);
        }
        long memLimit = (long)((float)conf.getLong("tez.runtime.task.memory", Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
        LOG.info((Object)("Initial Shuffle Memory Required: " + memLimit + ", based on INPUT_BUFFER_factor: " + maxInMemCopyUse));
        float maxRedPer = conf.getFloat("tez.runtime.task.input.buffer.percent", 0.0f);
        if ((double)maxRedPer > 1.0 || (double)maxRedPer < 0.0) {
            throw new TezUncheckedException("tez.runtime.task.input.buffer.percent" + maxRedPer);
        }
        int maxRedBuffer = (int)Math.min((float)maxAvailableTaskMemory * maxRedPer, 2.1474836E9f);
        LOG.info((Object)("Initial Memory required for final merged output: " + maxRedBuffer + ", using factor: " + maxRedPer));
        long reqMem = Math.max((long)maxRedBuffer, memLimit);
        return reqMem;
    }

    public void waitForInMemoryMerge() throws InterruptedException {
        this.inMemoryMerger.waitForMerge();
    }

    private boolean canShuffleToMemory(long requestedSize) {
        return requestedSize < this.maxSingleShuffleLimit;
    }

    public synchronized MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifier, long requestedSize, int fetcher) throws IOException {
        if (!this.canShuffleToMemory(requestedSize)) {
            LOG.info((Object)(srcAttemptIdentifier + ": Shuffling to disk since " + requestedSize + " is greater than maxSingleShuffleLimit (" + this.maxSingleShuffleLimit + ")"));
            return new MapOutput(srcAttemptIdentifier, this, requestedSize, this.conf, this.localDirAllocator, fetcher, true, this.mapOutputFile);
        }
        if (this.usedMemory > this.memoryLimit) {
            LOG.debug((Object)(srcAttemptIdentifier + ": Stalling shuffle since usedMemory (" + this.usedMemory + ") is greater than memoryLimit (" + this.memoryLimit + ")." + " CommitMemory is (" + this.commitMemory + ")"));
            return this.stallShuffle;
        }
        LOG.debug((Object)(srcAttemptIdentifier + ": Proceeding with shuffle since usedMemory (" + this.usedMemory + ") is lesser than memoryLimit (" + this.memoryLimit + ")." + "CommitMemory is (" + this.commitMemory + ")"));
        return this.unconditionalReserve(srcAttemptIdentifier, requestedSize, true);
    }

    private synchronized MapOutput unconditionalReserve(InputAttemptIdentifier srcAttemptIdentifier, long requestedSize, boolean primaryMapOutput) {
        this.usedMemory += requestedSize;
        return new MapOutput(srcAttemptIdentifier, this, (int)requestedSize, primaryMapOutput);
    }

    synchronized void unreserve(long size) {
        this.commitMemory -= size;
        this.usedMemory -= size;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void closeInMemoryFile(MapOutput mapOutput) {
        this.inMemoryMapOutputs.add(mapOutput);
        LOG.info((Object)("closeInMemoryFile -> map-output of size: " + mapOutput.getSize() + ", inMemoryMapOutputs.size() -> " + this.inMemoryMapOutputs.size() + ", commitMemory -> " + this.commitMemory + ", usedMemory ->" + this.usedMemory));
        this.commitMemory += mapOutput.getSize();
        MergeThread mergeThread = this.inMemoryMerger;
        synchronized (mergeThread) {
            if (!this.inMemoryMerger.isInProgress() && this.commitMemory >= this.mergeThreshold) {
                LOG.info((Object)("Starting inMemoryMerger's merge since commitMemory=" + this.commitMemory + " > mergeThreshold=" + this.mergeThreshold + ". Current usedMemory=" + this.usedMemory));
                this.inMemoryMapOutputs.addAll(this.inMemoryMergedMapOutputs);
                this.inMemoryMergedMapOutputs.clear();
                this.inMemoryMerger.startMerge(this.inMemoryMapOutputs);
            }
        }
        if (this.memToMemMerger != null) {
            mergeThread = this.memToMemMerger;
            synchronized (mergeThread) {
                if (!this.memToMemMerger.isInProgress() && this.inMemoryMapOutputs.size() >= this.memToMemMergeOutputsThreshold) {
                    this.memToMemMerger.startMerge(this.inMemoryMapOutputs);
                }
            }
        }
    }

    public synchronized void closeInMemoryMergedFile(MapOutput mapOutput) {
        this.inMemoryMergedMapOutputs.add(mapOutput);
        LOG.info((Object)("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + ", inMemoryMergedMapOutputs.size() -> " + this.inMemoryMergedMapOutputs.size()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void closeOnDiskFile(Path file) {
        this.onDiskMapOutputs.add(file);
        OnDiskMerger onDiskMerger = this.onDiskMerger;
        synchronized (onDiskMerger) {
            if (!this.onDiskMerger.isInProgress() && this.onDiskMapOutputs.size() >= 2 * this.ioSortFactor - 1) {
                this.onDiskMerger.startMerge(this.onDiskMapOutputs);
            }
        }
    }

    @InterfaceAudience.Private
    public boolean isMergeComplete() {
        return this.finalMergeComplete;
    }

    public TezRawKeyValueIterator close() throws Throwable {
        if (this.memToMemMerger != null) {
            this.memToMemMerger.close();
        }
        this.inMemoryMerger.close();
        this.onDiskMerger.close();
        ArrayList<MapOutput> memory = new ArrayList<MapOutput>(this.inMemoryMergedMapOutputs);
        memory.addAll(this.inMemoryMapOutputs);
        ArrayList<Path> disk = new ArrayList<Path>(this.onDiskMapOutputs);
        TezRawKeyValueIterator kvIter = this.finalMerge(this.conf, this.rfs, memory, disk);
        this.finalMergeComplete = true;
        return kvIter;
    }

    void runCombineProcessor(TezRawKeyValueIterator kvIter, IFile.Writer writer) throws IOException, InterruptedException {
        this.combiner.combine(kvIter, writer);
    }

    private long createInMemorySegments(List<MapOutput> inMemoryMapOutputs, List<TezMerger.Segment> inMemorySegments, long leaveBytes) throws IOException {
        long totalSize = 0L;
        long fullSize = 0L;
        for (MapOutput mo : inMemoryMapOutputs) {
            fullSize += (long)mo.getMemory().length;
        }
        while (fullSize > leaveBytes) {
            MapOutput mo = inMemoryMapOutputs.remove(0);
            byte[] data = mo.getMemory();
            long size = data.length;
            totalSize += size;
            fullSize -= size;
            InMemoryReader reader = new InMemoryReader(this, mo.getAttemptIdentifier(), data, 0, (int)size);
            inMemorySegments.add(new TezMerger.Segment(reader, true, mo.isPrimaryMapOutput() ? this.mergedMapOutputsCounter : null));
        }
        return totalSize;
    }

    private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs, List<MapOutput> inMemoryMapOutputs, List<Path> onDiskMapOutputs) throws IOException {
        Path[] onDisk;
        LOG.info((Object)("finalMerge called with " + inMemoryMapOutputs.size() + " in-memory map-outputs and " + onDiskMapOutputs.size() + " on-disk map-outputs"));
        Class keyClass = ConfigUtils.getIntermediateInputKeyClass(job);
        Class valueClass = ConfigUtils.getIntermediateInputValueClass(job);
        Path tmpDir = new Path(this.inputContext.getUniqueIdentifier());
        RawComparator comparator = ConfigUtils.getIntermediateInputKeyComparator(job);
        ArrayList<TezMerger.Segment> memDiskSegments = new ArrayList<TezMerger.Segment>();
        long inMemToDiskBytes = 0L;
        boolean mergePhaseFinished = false;
        if (inMemoryMapOutputs.size() > 0) {
            int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getSrcTaskIndex();
            inMemToDiskBytes = this.createInMemorySegments(inMemoryMapOutputs, memDiskSegments, this.postMergeMemLimit);
            int numMemDiskSegments = memDiskSegments.size();
            if (numMemDiskSegments > 0 && this.ioSortFactor > onDiskMapOutputs.size()) {
                mergePhaseFinished = true;
                Path outputPath = this.mapOutputFile.getInputFileForWrite(srcTaskId, inMemToDiskBytes).suffix(Constants.MERGED_OUTPUT_PREFIX);
                TezRawKeyValueIterator rIter = TezMerger.merge(job, fs, keyClass, valueClass, memDiskSegments, numMemDiskSegments, tmpDir, comparator, this.nullProgressable, this.spilledRecordsCounter, null, null);
                IFile.Writer writer = new IFile.Writer(job, fs, outputPath, keyClass, valueClass, this.codec, null);
                try {
                    TezMerger.writeFile(rIter, writer, this.nullProgressable, 10000L);
                    onDiskMapOutputs.add(outputPath);
                }
                catch (IOException e) {
                    if (null != outputPath) {
                        try {
                            fs.delete(outputPath, true);
                        }
                        catch (IOException ie) {
                            // empty catch block
                        }
                    }
                    throw e;
                }
                finally {
                    if (null != writer) {
                        writer.close();
                    }
                }
                LOG.info((Object)("Merged " + numMemDiskSegments + " segments, " + inMemToDiskBytes + " bytes to disk to satisfy " + "reduce memory limit"));
                inMemToDiskBytes = 0L;
                memDiskSegments.clear();
            } else if (inMemToDiskBytes != 0L) {
                LOG.info((Object)("Keeping " + numMemDiskSegments + " segments, " + inMemToDiskBytes + " bytes in memory for " + "intermediate, on-disk merge"));
            }
        }
        ArrayList<TezMerger.Segment> diskSegments = new ArrayList<TezMerger.Segment>();
        long onDiskBytes = inMemToDiskBytes;
        for (Path file : onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()])) {
            onDiskBytes += fs.getFileStatus(file).getLen();
            LOG.debug((Object)("Disk file: " + file + " Length is " + fs.getFileStatus(file).getLen()));
            diskSegments.add(new TezMerger.Segment(job, fs, file, this.codec, this.ifileReadAhead, this.ifileReadAheadLength, this.ifileBufferSize, false, file.toString().endsWith(Constants.MERGED_OUTPUT_PREFIX) ? null : this.mergedMapOutputsCounter));
        }
        LOG.info((Object)("Merging " + onDisk.length + " files, " + onDiskBytes + " bytes from disk"));
        Collections.sort(diskSegments, new Comparator<TezMerger.Segment>(){

            @Override
            public int compare(TezMerger.Segment o1, TezMerger.Segment o2) {
                if (o1.getLength() == o2.getLength()) {
                    return 0;
                }
                return o1.getLength() < o2.getLength() ? -1 : 1;
            }
        });
        ArrayList<TezMerger.Segment> finalSegments = new ArrayList<TezMerger.Segment>();
        long inMemBytes = this.createInMemorySegments(inMemoryMapOutputs, finalSegments, 0L);
        LOG.info((Object)("Merging " + finalSegments.size() + " segments, " + inMemBytes + " bytes from memory into reduce"));
        if (0L != onDiskBytes) {
            int numInMemSegments = memDiskSegments.size();
            diskSegments.addAll(0, memDiskSegments);
            memDiskSegments.clear();
            TezRawKeyValueIterator diskMerge = TezMerger.merge(job, fs, keyClass, valueClass, diskSegments, this.ioSortFactor, numInMemSegments, tmpDir, comparator, this.nullProgressable, false, this.spilledRecordsCounter, null, null);
            diskSegments.clear();
            if (0 == finalSegments.size()) {
                return diskMerge;
            }
            finalSegments.add(new TezMerger.Segment(new RawKVIteratorReader(diskMerge, onDiskBytes), true));
        }
        return TezMerger.merge(job, fs, keyClass, valueClass, finalSegments, finalSegments.size(), tmpDir, comparator, this.nullProgressable, this.spilledRecordsCounter, null, null);
    }

    class RawKVIteratorReader
    extends IFile.Reader {
        private final TezRawKeyValueIterator kvIter;

        public RawKVIteratorReader(TezRawKeyValueIterator kvIter, long size) throws IOException {
            super(null, size, null, MergeManager.this.spilledRecordsCounter, MergeManager.this.ifileReadAhead, MergeManager.this.ifileReadAheadLength, MergeManager.this.ifileBufferSize);
            this.kvIter = kvIter;
        }

        @Override
        public boolean nextRawKey(DataInputBuffer key) throws IOException {
            if (this.kvIter.next()) {
                DataInputBuffer kb = this.kvIter.getKey();
                int kp = kb.getPosition();
                int klen = kb.getLength() - kp;
                key.reset(kb.getData(), kp, klen);
                this.bytesRead += (long)klen;
                return true;
            }
            return false;
        }

        @Override
        public void nextRawValue(DataInputBuffer value) throws IOException {
            DataInputBuffer vb = this.kvIter.getValue();
            int vp = vb.getPosition();
            int vlen = vb.getLength() - vp;
            value.reset(vb.getData(), vp, vlen);
            this.bytesRead += (long)vlen;
        }

        @Override
        public long getPosition() throws IOException {
            return this.bytesRead;
        }

        @Override
        public void close() throws IOException {
            this.kvIter.close();
        }
    }

    private class OnDiskMerger
    extends MergeThread<Path> {
        public OnDiskMerger(MergeManager manager) {
            super(manager, Integer.MAX_VALUE, MergeManager.this.exceptionReporter);
            this.setName("OnDiskMerger - Thread to merge on-disk map-outputs");
            this.setDaemon(true);
        }

        @Override
        public void merge(List<Path> inputs) throws IOException {
            if (inputs == null || inputs.isEmpty()) {
                LOG.info((Object)"No ondisk files to merge...");
                return;
            }
            long approxOutputSize = 0L;
            int bytesPerSum = MergeManager.this.conf.getInt("io.bytes.per.checksum", 512);
            LOG.info((Object)("OnDiskMerger: We have  " + inputs.size() + " map outputs on disk. Triggering merge..."));
            for (Path file : inputs) {
                approxOutputSize += MergeManager.this.localFS.getFileStatus(file).getLen();
            }
            approxOutputSize += ChecksumFileSystem.getChecksumLength((long)approxOutputSize, (int)bytesPerSum);
            Path outputPath = MergeManager.this.localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), approxOutputSize, MergeManager.this.conf).suffix(Constants.MERGED_OUTPUT_PREFIX);
            IFile.Writer writer = new IFile.Writer(MergeManager.this.conf, MergeManager.this.rfs, outputPath, ConfigUtils.getIntermediateInputKeyClass(MergeManager.this.conf), ConfigUtils.getIntermediateInputValueClass(MergeManager.this.conf), MergeManager.this.codec, null);
            TezRawKeyValueIterator iter = null;
            Path tmpDir = new Path(MergeManager.this.inputContext.getUniqueIdentifier());
            try {
                iter = TezMerger.merge(MergeManager.this.conf, MergeManager.this.rfs, ConfigUtils.getIntermediateInputKeyClass(MergeManager.this.conf), ConfigUtils.getIntermediateInputValueClass(MergeManager.this.conf), MergeManager.this.codec, MergeManager.this.ifileReadAhead, MergeManager.this.ifileReadAheadLength, MergeManager.this.ifileBufferSize, inputs.toArray(new Path[inputs.size()]), true, MergeManager.this.ioSortFactor, tmpDir, ConfigUtils.getIntermediateInputKeyComparator(MergeManager.this.conf), MergeManager.this.nullProgressable, MergeManager.this.spilledRecordsCounter, null, MergeManager.this.mergedMapOutputsCounter, null);
                TezMerger.writeFile(iter, writer, MergeManager.this.nullProgressable, 10000L);
                writer.close();
            }
            catch (IOException e) {
                MergeManager.this.localFS.delete(outputPath, true);
                throw e;
            }
            MergeManager.this.closeOnDiskFile(outputPath);
            LOG.info((Object)(MergeManager.this.inputContext.getUniqueIdentifier() + " Finished merging " + inputs.size() + " map output files on disk of total-size " + approxOutputSize + "." + " Local output file is " + outputPath + " of size " + MergeManager.this.localFS.getFileStatus(outputPath).getLen()));
        }
    }

    private class InMemoryMerger
    extends MergeThread<MapOutput> {
        public InMemoryMerger(MergeManager manager) {
            super(manager, Integer.MAX_VALUE, MergeManager.this.exceptionReporter);
            this.setName("InMemoryMerger - Thread to merge in-memory shuffled map-outputs");
            this.setDaemon(true);
        }

        @Override
        public void merge(List<MapOutput> inputs) throws IOException, InterruptedException {
            if (inputs == null || inputs.size() == 0) {
                return;
            }
            InputAttemptIdentifier srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
            ArrayList<TezMerger.Segment> inMemorySegments = new ArrayList<TezMerger.Segment>();
            long mergeOutputSize = MergeManager.this.createInMemorySegments(inputs, inMemorySegments, 0L);
            int noInMemorySegments = inMemorySegments.size();
            Path outputPath = MergeManager.this.mapOutputFile.getInputFileForWrite(srcTaskIdentifier.getInputIdentifier().getSrcTaskIndex(), mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX);
            IFile.Writer writer = null;
            try {
                writer = new IFile.Writer(MergeManager.this.conf, MergeManager.this.rfs, outputPath, ConfigUtils.getIntermediateInputKeyClass(MergeManager.this.conf), ConfigUtils.getIntermediateInputValueClass(MergeManager.this.conf), MergeManager.this.codec, null);
                TezRawKeyValueIterator rIter = null;
                LOG.info((Object)("Initiating in-memory merge with " + noInMemorySegments + " segments..."));
                rIter = TezMerger.merge(MergeManager.this.conf, MergeManager.this.rfs, ConfigUtils.getIntermediateInputKeyClass(MergeManager.this.conf), ConfigUtils.getIntermediateInputValueClass(MergeManager.this.conf), inMemorySegments, inMemorySegments.size(), new Path(MergeManager.this.inputContext.getUniqueIdentifier()), ConfigUtils.getIntermediateInputKeyComparator(MergeManager.this.conf), MergeManager.this.nullProgressable, MergeManager.this.spilledRecordsCounter, null, null);
                if (null == MergeManager.this.combiner) {
                    TezMerger.writeFile(rIter, writer, MergeManager.this.nullProgressable, 10000L);
                } else {
                    MergeManager.this.runCombineProcessor(rIter, writer);
                }
                writer.close();
                writer = null;
                LOG.info((Object)(MergeManager.this.inputContext.getUniqueIdentifier() + " Merge of the " + noInMemorySegments + " files in-memory complete." + " Local file is " + outputPath + " of size " + MergeManager.this.localFS.getFileStatus(outputPath).getLen()));
            }
            catch (IOException e) {
                MergeManager.this.localFS.delete(outputPath, true);
                throw e;
            }
            finally {
                if (writer != null) {
                    writer.close();
                }
            }
            MergeManager.this.closeOnDiskFile(outputPath);
        }
    }

    private class IntermediateMemoryToMemoryMerger
    extends MergeThread<MapOutput> {
        public IntermediateMemoryToMemoryMerger(MergeManager manager, int mergeFactor) {
            super(manager, mergeFactor, MergeManager.this.exceptionReporter);
            this.setName("InMemoryMerger - Thread to do in-memory merge of in-memory shuffled map-outputs");
            this.setDaemon(true);
        }

        @Override
        public void merge(List<MapOutput> inputs) throws IOException {
            if (inputs == null || inputs.size() == 0) {
                return;
            }
            InputAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier();
            ArrayList<TezMerger.Segment> inMemorySegments = new ArrayList<TezMerger.Segment>();
            long mergeOutputSize = MergeManager.this.createInMemorySegments(inputs, inMemorySegments, 0L);
            int noInMemorySegments = inMemorySegments.size();
            MapOutput mergedMapOutputs = MergeManager.this.unconditionalReserve(dummyMapId, mergeOutputSize, false);
            InMemoryWriter writer = new InMemoryWriter(mergedMapOutputs.getArrayStream());
            LOG.info((Object)("Initiating Memory-to-Memory merge with " + noInMemorySegments + " segments of total-size: " + mergeOutputSize));
            TezRawKeyValueIterator rIter = TezMerger.merge(MergeManager.this.conf, MergeManager.this.rfs, ConfigUtils.getIntermediateInputKeyClass(MergeManager.this.conf), ConfigUtils.getIntermediateInputValueClass(MergeManager.this.conf), inMemorySegments, inMemorySegments.size(), new Path(MergeManager.this.inputContext.getUniqueIdentifier()), ConfigUtils.getIntermediateInputKeyComparator(MergeManager.this.conf), MergeManager.this.nullProgressable, null, null, null);
            TezMerger.writeFile(rIter, writer, MergeManager.this.nullProgressable, 10000L);
            ((IFile.Writer)writer).close();
            LOG.info((Object)(MergeManager.this.inputContext.getUniqueIdentifier() + " Memory-to-Memory merge of the " + noInMemorySegments + " files in-memory complete."));
            MergeManager.this.closeInMemoryMergedFile(mergedMapOutputs);
        }
    }
}

