/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.sort.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.comparator.ProxyComparator;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipelinedSorter
extends ExternalSorter {
    private static final Logger LOG = LoggerFactory.getLogger(PipelinedSorter.class);
    public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
    private static final int APPROX_HEADER_LENGTH = 150;
    private final int partitionBits;
    private static final int PARTITION = 0;
    private static final int KEYSTART = 1;
    private static final int VALSTART = 2;
    private static final int VALLEN = 3;
    private static final int NMETA = 4;
    private static final int METASIZE = 16;
    private final int minSpillsForCombine;
    private final ProxyComparator hasher;
    private SortSpan span;
    @VisibleForTesting
    protected final LinkedList<ByteBuffer> bufferList = new LinkedList();
    private ListIterator<ByteBuffer> listIterator;
    private final long capacity;
    private int bufferOverflowRecursion;
    private final int blockSize;
    private final SpanMerger merger;
    private final ExecutorService sortmaster;
    private final ArrayList<TezSpillRecord> indexCacheList = new ArrayList();
    private final boolean pipelinedShuffle;

    public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs, long initialMemoryAvailable) throws IOException {
        this(outputContext, conf, numOutputs, initialMemoryAvailable, 0);
    }

    PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs, long initialMemoryAvailable, int blkSize) throws IOException {
        super(outputContext, conf, numOutputs, initialMemoryAvailable);
        StringBuilder initialSetupLogLine = new StringBuilder("Setting up PipelinedSorter for ").append(outputContext.getDestinationVertexName()).append(": ");
        this.partitionBits = this.bitcount(this.partitions) + 1;
        boolean confPipelinedShuffle = this.conf.getBoolean("tez.runtime.pipelined-shuffle.enabled", false);
        this.pipelinedShuffle = !this.isFinalMergeEnabled() && confPipelinedShuffle;
        long sortmb = this.availableMemoryMb;
        long maxMemUsage = sortmb << 20;
        this.blockSize = PipelinedSorter.computeBlockSize(blkSize, maxMemUsage);
        long usage = sortmb << 20;
        int numberOfBlocks = Math.max(1, (int)Math.ceil(1.0 * (double)usage / (double)this.blockSize));
        initialSetupLogLine.append("#blocks=").append(numberOfBlocks);
        initialSetupLogLine.append(", maxMemUsage=").append(maxMemUsage);
        initialSetupLogLine.append(", BLOCK_SIZE=").append(this.blockSize);
        initialSetupLogLine.append(", finalMergeEnabled=").append(this.isFinalMergeEnabled());
        initialSetupLogLine.append(", pipelinedShuffle=").append(this.pipelinedShuffle);
        initialSetupLogLine.append(", sendEmptyPartitions=").append(this.sendEmptyPartitionDetails);
        initialSetupLogLine.append(", ").append("tez.runtime.io.sort.mb").append("=").append(sortmb);
        initialSetupLogLine.append(", UsingHashComparator=");
        if (this.comparator instanceof ProxyComparator) {
            this.hasher = (ProxyComparator)this.comparator;
            initialSetupLogLine.append(true);
        } else {
            this.hasher = null;
            initialSetupLogLine.append(false);
        }
        LOG.info(initialSetupLogLine.toString());
        long totalCapacityWithoutMeta = 0L;
        for (int i = 0; i < numberOfBlocks; ++i) {
            Preconditions.checkArgument((usage > 0L ? 1 : 0) != 0, (Object)("usage can't be less than zero " + usage));
            long size = Math.min(usage, (long)this.blockSize);
            int sizeWithoutMeta = (int)(size - size % 16L);
            this.bufferList.add(ByteBuffer.allocate(sizeWithoutMeta));
            totalCapacityWithoutMeta += (long)sizeWithoutMeta;
            usage -= size;
        }
        this.capacity = totalCapacityWithoutMeta;
        this.listIterator = this.bufferList.listIterator();
        Preconditions.checkArgument((boolean)this.listIterator.hasNext(), (Object)("Buffer list seems to be empty " + this.bufferList.size()));
        this.span = new SortSpan(this.listIterator.next(), 0x100000, 16, this.comparator);
        this.merger = new SpanMerger();
        int sortThreads = this.conf.getInt("tez.runtime.pipelined.sorter.sort.threads", 2);
        this.sortmaster = Executors.newFixedThreadPool(sortThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Sorter {" + TezUtilsInternal.cleanVertexName((String)outputContext.getDestinationVertexName()) + "} #%d").build());
        this.valSerializer.open((OutputStream)this.span.out);
        this.keySerializer.open((OutputStream)this.span.out);
        this.minSpillsForCombine = this.conf.getInt("tez.runtime.combine.min.spills", 3);
    }

    @VisibleForTesting
    static int computeBlockSize(int blkSize, long maxMemUsage) {
        if (blkSize == 0) {
            return (int)Math.min(maxMemUsage, Integer.MAX_VALUE);
        }
        Preconditions.checkArgument((blkSize > 0 ? 1 : 0) != 0, (Object)"blkSize should be between 1 and Integer.MAX_VALUE");
        if ((long)blkSize >= maxMemUsage) {
            return maxMemUsage > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)maxMemUsage;
        }
        return blkSize;
    }

    private int bitcount(int n) {
        int bit = 0;
        while (n != 0) {
            ++bit;
            n >>= 1;
        }
        return bit;
    }

    public void sort() throws IOException {
        SortSpan newSpan = this.span.next();
        if (newSpan == null) {
            Stopwatch stopWatch = new Stopwatch();
            stopWatch.start();
            this.merger.add(this.span.sort(this.sorter));
            this.spill();
            stopWatch.stop();
            LOG.info("Time taken for spill " + stopWatch.elapsedMillis() + " ms");
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.outputContext.getDestinationVertexName() + ": Time taken for spill " + stopWatch.elapsedMillis() + " ms");
            }
            if (this.pipelinedShuffle) {
                this.sendPipelinedShuffleEvents();
            }
            this.listIterator = this.bufferList.listIterator();
            int items = 0x100000;
            int perItem = 16;
            if (this.span.length() != 0) {
                items = this.span.length();
                perItem = this.span.kvbuffer.limit() / items;
                items = this.span.capacity / (16 + perItem);
                if (items > 0x100000) {
                    items = 0x100000;
                }
            }
            Preconditions.checkArgument((boolean)this.listIterator.hasNext(), (Object)"block iterator should not be empty");
            this.span = new SortSpan((ByteBuffer)this.listIterator.next().clear(), 0x100000, perItem, ConfigUtils.getIntermediateOutputKeyComparator(this.conf));
        } else {
            SortTask task = new SortTask(this.span, this.sorter);
            Future<SpanIterator> future = this.sortmaster.submit(task);
            this.merger.add(future);
            this.span = newSpan;
        }
        this.valSerializer.open((OutputStream)this.span.out);
        this.keySerializer.open((OutputStream)this.span.out);
    }

    private void sendPipelinedShuffleEvents() throws IOException {
        LinkedList events = Lists.newLinkedList();
        String pathComponent = this.outputContext.getUniqueIdentifier() + "_" + (this.numSpills - 1);
        ShuffleUtils.generateEventOnSpill(events, this.isFinalMergeEnabled(), false, this.outputContext, this.numSpills - 1, this.indexCacheList.get(this.numSpills - 1), this.partitions, this.sendEmptyPartitionDetails, pathComponent, this.partitionStats);
        this.outputContext.sendEvents((List)events);
        LOG.info(this.outputContext.getDestinationVertexName() + ": Added spill event for spill (final update=false), spillId=" + (this.numSpills - 1));
    }

    @Override
    public void write(Object key, Object value) throws IOException {
        this.collect(key, value, this.partitioner.getPartition(key, value, this.partitions));
    }

    synchronized void collect(Object key, Object value, int partition) throws IOException {
        if (key.getClass() != this.keyClass) {
            throw new IOException("Type mismatch in key from map: expected " + this.keyClass.getName() + ", received " + key.getClass().getName());
        }
        if (value.getClass() != this.valClass) {
            throw new IOException("Type mismatch in value from map: expected " + this.valClass.getName() + ", received " + value.getClass().getName());
        }
        if (partition < 0 || partition >= this.partitions) {
            throw new IOException("Illegal partition for " + key + " (" + partition + ")");
        }
        if (this.span.kvmeta.remaining() < 16) {
            this.sort();
            if (this.span.length() == 0) {
                this.spillSingleRecord(key, value, partition);
                return;
            }
        }
        int keystart = this.span.kvbuffer.position();
        int valstart = -1;
        int valend = -1;
        try {
            this.keySerializer.serialize(key);
            valstart = this.span.kvbuffer.position();
            this.valSerializer.serialize(value);
            valend = this.span.kvbuffer.position();
        }
        catch (BufferOverflowException overflow) {
            this.span.kvbuffer.position(keystart);
            this.sort();
            if (this.span.length() == 0 || this.bufferOverflowRecursion > this.bufferList.size()) {
                this.spillSingleRecord(key, value, partition);
                this.bufferOverflowRecursion = 0;
                return;
            }
            ++this.bufferOverflowRecursion;
            this.collect(key, value, partition);
            return;
        }
        if (this.bufferOverflowRecursion > 0) {
            --this.bufferOverflowRecursion;
        }
        int prefix = 0;
        if (this.hasher != null) {
            prefix = this.hasher.getProxy(key);
        }
        prefix = partition << 32 - this.partitionBits | prefix >>> this.partitionBits;
        this.span.kvmeta.put(prefix);
        this.span.kvmeta.put(keystart);
        this.span.kvmeta.put(valstart);
        this.span.kvmeta.put(valend - valstart);
        this.mapOutputRecordCounter.increment(1L);
        this.outputContext.notifyProgress();
        this.mapOutputByteCounter.increment((long)(valend - keystart));
    }

    private void adjustSpillCounters(long rawLength, long compLength) {
        if (!this.isFinalMergeEnabled()) {
            this.outputBytesWithOverheadCounter.increment(rawLength);
        } else if (this.numSpills > 0) {
            this.additionalSpillBytesWritten.increment(compLength);
            this.outputBytesWithOverheadCounter.setValue(0L);
        } else {
            this.outputBytesWithOverheadCounter.increment(rawLength);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void spillSingleRecord(Object key, Object value, int partition) throws IOException {
        TezSpillRecord spillRec = new TezSpillRecord(this.partitions);
        Path filename = this.mapOutputFile.getSpillFileForWrite(this.numSpills, -1L);
        Path indexFilename = this.mapOutputFile.getSpillIndexFileForWrite(this.numSpills, this.partitions * 24);
        this.spillFilePaths.put(this.numSpills, filename);
        FSDataOutputStream out = this.rfs.create(filename, true, 4096);
        try {
            LOG.info(this.outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString() + ", indexFilename=" + indexFilename);
            for (int i = 0; i < this.partitions; ++i) {
                IFile.Writer writer = null;
                try {
                    long segmentStart = out.getPos();
                    writer = new IFile.Writer(this.conf, out, this.keyClass, this.valClass, this.codec, this.spilledRecordsCounter, null, false);
                    if (i == partition) {
                        long recordStart = out.getPos();
                        writer.append(key, value);
                        this.mapOutputRecordCounter.increment(1L);
                        this.mapOutputByteCounter.increment(out.getPos() - recordStart);
                    }
                    writer.close();
                    this.adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength());
                    TezIndexRecord rec = new TezIndexRecord(segmentStart, writer.getRawLength(), writer.getCompressedLength());
                    spillRec.putIndex(rec, i);
                    writer = null;
                    continue;
                }
                finally {
                    if (null != writer) {
                        writer.close();
                    }
                }
            }
            this.spillFileIndexPaths.put(this.numSpills, indexFilename);
            spillRec.writeToFile(indexFilename, this.conf);
            this.indexCacheList.add(spillRec);
            ++this.numSpills;
            if (!this.isFinalMergeEnabled()) {
                this.fileOutputByteCounter.increment(this.rfs.getFileStatus(filename).getLen());
                this.numShuffleChunks.setValue((long)this.numSpills);
            }
            if (this.pipelinedShuffle) {
                this.sendPipelinedShuffleEvents();
            }
        }
        finally {
            out.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void spill() throws IOException {
        long size = this.capacity + (long)(this.partitions * 150);
        TezSpillRecord spillRec = new TezSpillRecord(this.partitions);
        Path filename = this.mapOutputFile.getSpillFileForWrite(this.numSpills, size);
        this.spillFilePaths.put(this.numSpills, filename);
        FSDataOutputStream out = this.rfs.create(filename, true, 4096);
        try {
            this.merger.ready();
            LOG.info(this.outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString());
            for (int i = 0; i < this.partitions; ++i) {
                this.outputContext.notifyProgress();
                TezRawKeyValueIterator kvIter = this.merger.filter(i);
                long segmentStart = out.getPos();
                IFile.Writer writer = new IFile.Writer(this.conf, out, this.keyClass, this.valClass, this.codec, this.spilledRecordsCounter, null, this.merger.needsRLE());
                if (this.combiner == null) {
                    while (kvIter.next()) {
                        writer.append(kvIter.getKey(), kvIter.getValue());
                    }
                } else {
                    this.runCombineProcessor(kvIter, writer);
                }
                writer.close();
                this.adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength());
                TezIndexRecord rec = new TezIndexRecord(segmentStart, writer.getRawLength(), writer.getCompressedLength());
                spillRec.putIndex(rec, i);
                if (this.isFinalMergeEnabled() || !this.reportPartitionStats()) continue;
                int n = i;
                this.partitionStats[n] = this.partitionStats[n] + writer.getCompressedLength();
            }
            Path indexFilename = this.mapOutputFile.getSpillIndexFileForWrite(this.numSpills, this.partitions * 24);
            this.spillFileIndexPaths.put(this.numSpills, indexFilename);
            spillRec.writeToFile(indexFilename, this.conf);
            this.indexCacheList.add(spillRec);
            ++this.numSpills;
            if (!this.isFinalMergeEnabled()) {
                this.fileOutputByteCounter.increment(this.rfs.getFileStatus(filename).getLen());
                this.numShuffleChunks.setValue((long)this.numSpills);
            }
        }
        catch (InterruptedException ie) {
        }
        finally {
            out.close();
        }
    }

    @Override
    public void flush() throws IOException {
        String uniqueIdentifier = this.outputContext.getUniqueIdentifier();
        this.outputContext.notifyProgress();
        LOG.info(this.outputContext.getDestinationVertexName() + ": Starting flush of map output");
        this.span.end();
        this.merger.add(this.span.sort(this.sorter));
        this.spill();
        this.sortmaster.shutdown();
        this.bufferList.clear();
        if (this.indexCacheList.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.outputContext.getDestinationVertexName() + ": Index list is empty... returning");
            }
            return;
        }
        if (!this.isFinalMergeEnabled()) {
            LinkedList events = Lists.newLinkedList();
            int startIndex = this.pipelinedShuffle ? this.numSpills - 1 : 0;
            int endIndex = this.numSpills;
            for (int i = startIndex; i < endIndex; ++i) {
                boolean isLastEvent = i == this.numSpills - 1;
                String pathComponent = this.outputContext.getUniqueIdentifier() + "_" + i;
                ShuffleUtils.generateEventOnSpill(events, this.isFinalMergeEnabled(), isLastEvent, this.outputContext, i, this.indexCacheList.get(i), this.partitions, this.sendEmptyPartitionDetails, pathComponent, this.partitionStats);
                LOG.info(this.outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
            }
            this.outputContext.sendEvents((List)events);
            return;
        }
        this.numAdditionalSpills.increment((long)(this.numSpills - 1));
        if (this.numSpills == 1) {
            Path filename = (Path)this.spillFilePaths.get(0);
            Path indexFilename = (Path)this.spillFileIndexPaths.get(0);
            this.finalOutputFile = this.mapOutputFile.getOutputFileForWriteInVolume(filename);
            this.finalIndexFile = this.mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename);
            this.sameVolRename(filename, this.finalOutputFile);
            this.sameVolRename(indexFilename, this.finalIndexFile);
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.outputContext.getDestinationVertexName() + ": numSpills=" + this.numSpills + ", finalOutputFile=" + this.finalOutputFile + ", " + "finalIndexFile=" + this.finalIndexFile + ", filename=" + filename + ", indexFilename=" + indexFilename);
            }
            TezSpillRecord spillRecord = new TezSpillRecord(this.finalIndexFile, this.conf);
            if (this.reportPartitionStats()) {
                for (int i = 0; i < spillRecord.size(); ++i) {
                    int n = i;
                    this.partitionStats[n] = this.partitionStats[n] + spillRecord.getIndex(i).getPartLength();
                }
            }
            this.numShuffleChunks.setValue((long)this.numSpills);
            this.fileOutputByteCounter.increment(this.rfs.getFileStatus(this.finalOutputFile).getLen());
            return;
        }
        this.finalOutputFile = this.mapOutputFile.getOutputFileForWrite(0L);
        this.finalIndexFile = this.mapOutputFile.getOutputIndexFileForWrite(0L);
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.outputContext.getDestinationVertexName() + ": " + "numSpills: " + this.numSpills + ", finalOutputFile:" + this.finalOutputFile + ", finalIndexFile:" + this.finalIndexFile);
        }
        FSDataOutputStream finalOut = this.rfs.create(this.finalOutputFile, true, 4096);
        TezSpillRecord spillRec = new TezSpillRecord(this.partitions);
        for (int parts = 0; parts < this.partitions; ++parts) {
            ArrayList<TezMerger.Segment> segmentList = new ArrayList<TezMerger.Segment>(this.numSpills);
            for (int i = 0; i < this.numSpills; ++i) {
                Path spillFilename = (Path)this.spillFilePaths.get(i);
                TezIndexRecord indexRecord = this.indexCacheList.get(i).getIndex(parts);
                TezMerger.DiskSegment s = new TezMerger.DiskSegment(this.rfs, spillFilename, indexRecord.getStartOffset(), indexRecord.getPartLength(), this.codec, this.ifileReadAhead, this.ifileReadAheadLength, this.ifileBufferSize, true);
                segmentList.add(i, s);
            }
            int mergeFactor = this.conf.getInt("tez.runtime.io.sort.factor", 100);
            boolean sortSegments = segmentList.size() > mergeFactor;
            TezRawKeyValueIterator kvIter = TezMerger.merge(this.conf, this.rfs, this.keyClass, this.valClass, this.codec, segmentList, mergeFactor, new Path(uniqueIdentifier), ConfigUtils.getIntermediateOutputKeyComparator(this.conf), this.progressable, sortSegments, true, null, this.spilledRecordsCounter, this.additionalSpillBytesRead, null);
            long segmentStart = finalOut.getPos();
            IFile.Writer writer = new IFile.Writer(this.conf, finalOut, this.keyClass, this.valClass, this.codec, this.spilledRecordsCounter, null, this.merger.needsRLE());
            if (this.combiner == null || this.numSpills < this.minSpillsForCombine) {
                TezMerger.writeFile(kvIter, writer, this.progressable, 10000L);
            } else {
                this.runCombineProcessor(kvIter, writer);
            }
            writer.close();
            this.outputBytesWithOverheadCounter.increment(writer.getRawLength());
            TezIndexRecord rec = new TezIndexRecord(segmentStart, writer.getRawLength(), writer.getCompressedLength());
            spillRec.putIndex(rec, parts);
            if (!this.reportPartitionStats()) continue;
            int n = parts;
            this.partitionStats[n] = this.partitionStats[n] + writer.getCompressedLength();
        }
        this.numShuffleChunks.setValue(1L);
        this.fileOutputByteCounter.increment(this.rfs.getFileStatus(this.finalOutputFile).getLen());
        spillRec.writeToFile(this.finalIndexFile, this.conf);
        finalOut.close();
        for (int i = 0; i < this.numSpills; ++i) {
            Path indexFilename = (Path)this.spillFileIndexPaths.get(i);
            Path spillFilename = (Path)this.spillFilePaths.get(i);
            this.rfs.delete(indexFilename, true);
            this.rfs.delete(spillFilename, true);
        }
        this.spillFileIndexPaths.clear();
        this.spillFilePaths.clear();
    }

    private final class SpanMerger
    implements PartitionedRawKeyValueIterator {
        InputByteBuffer key = new InputByteBuffer();
        InputByteBuffer value = new InputByteBuffer();
        int partition;
        private ArrayList<Future<SpanIterator>> futures = new ArrayList();
        private SpanHeap heap = new SpanHeap();
        private PartitionFilter partIter;
        private int gallop = 0;
        private SpanIterator horse;
        private long total = 0L;
        private long eq = 0L;

        public SpanMerger() {
            this.partIter = new PartitionFilter(this);
        }

        public final void add(SpanIterator iter) {
            if (iter.next()) {
                this.heap.add(iter);
            }
        }

        public final void add(Future<SpanIterator> iter) {
            this.futures.add(iter);
        }

        public final boolean ready() throws IOException, InterruptedException {
            try {
                SpanIterator iter = null;
                while (this.futures.size() > 0) {
                    Future<SpanIterator> futureIter = this.futures.remove(0);
                    iter = futureIter.get();
                    this.add(iter);
                }
                StringBuilder sb = new StringBuilder();
                for (SpanIterator sp : this.heap) {
                    sb.append(sp.toString());
                    sb.append(",");
                    this.total += (long)sp.span.length();
                    this.eq += sp.span.getEq();
                }
                LOG.info(PipelinedSorter.this.outputContext.getDestinationVertexName() + ": " + "Heap = " + sb.toString());
                return true;
            }
            catch (Exception e) {
                LOG.info(PipelinedSorter.this.outputContext.getDestinationVertexName() + ": " + e.toString());
                return false;
            }
        }

        private SpanIterator pop() {
            if (this.gallop > 0) {
                --this.gallop;
                return this.horse;
            }
            SpanIterator current = this.heap.pop();
            SpanIterator next = (SpanIterator)this.heap.peek();
            if (next != null && current != null && this.horse == current) {
                this.gallop = current.bisect(next.getKey(), next.getPartition()) - 1;
            }
            this.horse = current;
            return current;
        }

        public boolean needsRLE() {
            return (double)this.eq > 0.1 * (double)this.total;
        }

        private SpanIterator peek() {
            if (this.gallop > 0) {
                return this.horse;
            }
            return (SpanIterator)this.heap.peek();
        }

        @Override
        public final boolean next() {
            SpanIterator current = this.pop();
            if (current != null) {
                this.partition = current.getPartition();
                this.key.reset(current.getKey());
                this.value.reset(current.getValue());
                if (this.gallop <= 0) {
                    this.add(current);
                } else {
                    current.next();
                }
                return true;
            }
            return false;
        }

        @Override
        public DataInputBuffer getKey() {
            return this.key;
        }

        @Override
        public DataInputBuffer getValue() {
            return this.value;
        }

        @Override
        public int getPartition() {
            return this.partition;
        }

        @Override
        public void close() throws IOException {
        }

        @Override
        public Progress getProgress() {
            return new Progress();
        }

        @Override
        public boolean isSameKey() throws IOException {
            return false;
        }

        public TezRawKeyValueIterator filter(int partition) {
            this.partIter.reset(partition);
            return this.partIter;
        }
    }

    private static class SpanHeap
    extends PriorityQueue<SpanIterator> {
        private static final long serialVersionUID = 1L;

        public SpanHeap() {
            super(256);
        }

        public SpanIterator pop() {
            return (SpanIterator)this.poll();
        }
    }

    private class PartitionFilter
    implements TezRawKeyValueIterator {
        private final PartitionedRawKeyValueIterator iter;
        private int partition;
        private boolean dirty = false;

        public PartitionFilter(PartitionedRawKeyValueIterator iter) {
            this.iter = iter;
        }

        @Override
        public DataInputBuffer getKey() throws IOException {
            return this.iter.getKey();
        }

        @Override
        public DataInputBuffer getValue() throws IOException {
            return this.iter.getValue();
        }

        @Override
        public void close() throws IOException {
        }

        @Override
        public Progress getProgress() {
            return new Progress();
        }

        @Override
        public boolean isSameKey() throws IOException {
            return this.iter.isSameKey();
        }

        @Override
        public boolean next() throws IOException {
            if (this.dirty || this.iter.next()) {
                int prefix = this.iter.getPartition();
                if (prefix >>> 32 - PipelinedSorter.this.partitionBits == this.partition) {
                    this.dirty = false;
                    return true;
                }
                if (!this.dirty) {
                    this.dirty = true;
                }
            }
            return false;
        }

        public void reset(int partition) {
            this.partition = partition;
        }

        public int getPartition() {
            return this.partition;
        }
    }

    private static class SortTask
    extends CallableWithNdc<SpanIterator> {
        private final SortSpan sortable;
        private final IndexedSorter sorter;

        public SortTask(SortSpan sortable, IndexedSorter sorter) {
            this.sortable = sortable;
            this.sorter = sorter;
        }

        protected SpanIterator callInternal() {
            return this.sortable.sort(this.sorter);
        }
    }

    private static class SpanIterator
    implements PartitionedRawKeyValueIterator,
    Comparable<SpanIterator> {
        private int kvindex = -1;
        private final int maxindex;
        private final IntBuffer kvmeta;
        private final ByteBuffer kvbuffer;
        private final SortSpan span;
        private final InputByteBuffer key = new InputByteBuffer();
        private final InputByteBuffer value = new InputByteBuffer();
        private final Progress progress = new Progress();
        private static final int minrun = 16;

        public SpanIterator(SortSpan span) {
            this.kvmeta = span.kvmeta;
            this.kvbuffer = span.kvbuffer;
            this.span = span;
            this.maxindex = this.kvmeta.limit() / 4 - 1;
        }

        @Override
        public DataInputBuffer getKey() {
            int keystart = this.kvmeta.get(this.span.offsetFor(this.kvindex) + 1);
            int valstart = this.kvmeta.get(this.span.offsetFor(this.kvindex) + 2);
            byte[] buf = this.kvbuffer.array();
            int off = this.kvbuffer.arrayOffset();
            this.key.reset(buf, off + keystart, valstart - keystart);
            return this.key;
        }

        @Override
        public DataInputBuffer getValue() {
            int valstart = this.kvmeta.get(this.span.offsetFor(this.kvindex) + 2);
            int vallen = this.kvmeta.get(this.span.offsetFor(this.kvindex) + 3);
            byte[] buf = this.kvbuffer.array();
            int off = this.kvbuffer.arrayOffset();
            this.value.reset(buf, off + valstart, vallen);
            return this.value;
        }

        @Override
        public boolean next() {
            if (this.kvindex == this.maxindex) {
                return false;
            }
            if (this.kvindex % 100 == 0) {
                this.progress.set((float)(this.kvindex - this.maxindex) / (float)this.maxindex);
            }
            ++this.kvindex;
            return true;
        }

        @Override
        public void close() {
        }

        @Override
        public Progress getProgress() {
            return this.progress;
        }

        @Override
        public boolean isSameKey() throws IOException {
            return false;
        }

        @Override
        public int getPartition() {
            int partition = this.kvmeta.get(this.span.offsetFor(this.kvindex) + 0);
            return partition;
        }

        public int size() {
            return this.maxindex - this.kvindex;
        }

        @Override
        public int compareTo(SpanIterator other) {
            return this.span.compareInternal(other.getKey(), other.getPartition(), this.kvindex);
        }

        public String toString() {
            return String.format("SpanIterator<%d:%d> (span=%s)", this.kvindex, this.maxindex, this.span.toString());
        }

        int bisect(DataInputBuffer needle, int needlePart) {
            int start = this.kvindex;
            int end = this.maxindex - 1;
            int mid = start;
            int cmp = 0;
            if (end - start < 16) {
                return 0;
            }
            if (this.span.compareInternal(needle, needlePart, start) > 0) {
                return this.kvindex;
            }
            if (this.span.compareInternal(needle, needlePart, start + 16) > 0) {
                return 0;
            }
            if (this.span.compareInternal(needle, needlePart, end) < 0) {
                return end - this.kvindex;
            }
            boolean found = false;
            for (int i = 0; start < end && i < 16; ++i) {
                mid = start + (end - start) / 2;
                cmp = this.span.compareInternal(needle, needlePart, mid);
                if (cmp == 0) {
                    start = mid;
                    found = true;
                } else if (cmp < 0) {
                    start = mid;
                    found = true;
                }
                if (cmp <= 0) continue;
                end = mid;
            }
            if (found) {
                return start - this.kvindex;
            }
            return 0;
        }
    }

    private final class SortSpan
    implements IndexedSortable {
        final IntBuffer kvmeta;
        final ByteBuffer kvbuffer;
        final DataOutputStream out;
        final RawComparator comparator;
        final int[] imeta = new int[4];
        final int[] jmeta = new int[4];
        private int index = 0;
        private long eq = 0L;
        private boolean reinit = false;
        private int capacity;

        public SortSpan(ByteBuffer source, int maxItems, int perItem, RawComparator comparator) {
            this.capacity = source.remaining();
            int metasize = 16 * maxItems;
            int dataSize = maxItems * perItem;
            if (this.capacity < metasize + dataSize) {
                metasize = 16 * (this.capacity / (perItem + 16));
            }
            ByteBuffer reserved = source.duplicate();
            reserved.mark();
            LOG.info(PipelinedSorter.this.outputContext.getDestinationVertexName() + ": " + "reserved.remaining()=" + reserved.remaining() + ", reserved.metasize=" + metasize);
            reserved.position(metasize);
            this.kvbuffer = reserved.slice();
            reserved.flip();
            reserved.limit(metasize);
            this.kvmeta = reserved.slice().order(ByteOrder.nativeOrder()).asIntBuffer();
            this.out = new DataOutputStream(new BufferStreamWrapper(this.kvbuffer));
            this.comparator = comparator;
        }

        public SpanIterator sort(IndexedSorter sorter) {
            long start = System.currentTimeMillis();
            if (this.length() > 1) {
                sorter.sort((IndexedSortable)this, 0, this.length(), PipelinedSorter.this.progressable);
            }
            LOG.info(PipelinedSorter.this.outputContext.getDestinationVertexName() + ": " + "done sorting span=" + this.index + ", length=" + this.length() + ", " + "time=" + (System.currentTimeMillis() - start));
            return new SpanIterator(this);
        }

        int offsetFor(int i) {
            return i * 4;
        }

        public void swap(int mi, int mj) {
            int kvi = this.offsetFor(mi);
            int kvj = this.offsetFor(mj);
            this.kvmeta.position(kvi);
            this.kvmeta.get(this.imeta);
            this.kvmeta.position(kvj);
            this.kvmeta.get(this.jmeta);
            this.kvmeta.position(kvj);
            this.kvmeta.put(this.imeta);
            this.kvmeta.position(kvi);
            this.kvmeta.put(this.jmeta);
        }

        private int compareKeys(int kvi, int kvj) {
            int off;
            int istart = this.kvmeta.get(kvi + 1);
            int jstart = this.kvmeta.get(kvj + 1);
            int ilen = this.kvmeta.get(kvi + 2) - istart;
            int jlen = this.kvmeta.get(kvj + 2) - jstart;
            if (ilen == 0 || jlen == 0) {
                if (ilen == jlen) {
                    ++this.eq;
                }
                return ilen - jlen;
            }
            byte[] buf = this.kvbuffer.array();
            int cmp = this.comparator.compare(buf, (off = this.kvbuffer.arrayOffset()) + istart, ilen, buf, off + jstart, jlen);
            if (cmp == 0) {
                ++this.eq;
            }
            return cmp;
        }

        public int compare(int mi, int mj) {
            int kvjp;
            int kvi = this.offsetFor(mi);
            int kvj = this.offsetFor(mj);
            int kvip = this.kvmeta.get(kvi + 0);
            if (kvip != (kvjp = this.kvmeta.get(kvj + 0))) {
                return kvip - kvjp;
            }
            return this.compareKeys(kvi, kvj);
        }

        public SortSpan next() {
            ByteBuffer remaining = this.end();
            if (remaining != null) {
                SortSpan newSpan = null;
                int items = this.length();
                int perItem = this.kvbuffer.position() / items;
                if (this.reinit) {
                    items = 0x100000;
                    perItem = 16;
                }
                newSpan = new SortSpan(remaining, items, perItem, ConfigUtils.getIntermediateOutputKeyComparator(PipelinedSorter.this.conf));
                newSpan.index = this.index + 1;
                LOG.info(String.format(PipelinedSorter.this.outputContext.getDestinationVertexName() + ": " + "New Span%d.length = %d, perItem = %d", newSpan.index, newSpan.length(), perItem) + ", counter:" + PipelinedSorter.this.mapOutputRecordCounter.getValue());
                return newSpan;
            }
            return null;
        }

        public int length() {
            return this.kvmeta.limit() / 4;
        }

        public ByteBuffer end() {
            ByteBuffer remaining = this.kvbuffer.duplicate();
            remaining.position(this.kvbuffer.position());
            remaining = remaining.slice();
            this.kvbuffer.limit(this.kvbuffer.position());
            this.kvmeta.limit(this.kvmeta.position());
            int items = this.length();
            if (items == 0) {
                return null;
            }
            int perItem = this.kvbuffer.position() / items;
            LOG.info(PipelinedSorter.this.outputContext.getDestinationVertexName() + ": " + String.format("Span%d.length = %d, perItem = %d", this.index, this.length(), perItem));
            if (remaining.remaining() < 16 + perItem) {
                if (PipelinedSorter.this.listIterator.hasNext()) {
                    LOG.info(PipelinedSorter.this.outputContext.getDestinationVertexName() + ": " + "Getting memory from next block in the list, recordsWritten=" + PipelinedSorter.this.mapOutputRecordCounter.getValue());
                    this.reinit = true;
                    return (ByteBuffer)PipelinedSorter.this.listIterator.next();
                }
                return null;
            }
            return remaining;
        }

        public int compareInternal(DataInputBuffer needle, int needlePart, int index) {
            int cmp = 0;
            int partition = this.kvmeta.get(this.offsetFor(index) + 0);
            if (partition != needlePart) {
                cmp = partition - needlePart;
            } else {
                int keystart = this.kvmeta.get(this.offsetFor(index) + 1);
                int valstart = this.kvmeta.get(this.offsetFor(index) + 2);
                byte[] buf = this.kvbuffer.array();
                int off = this.kvbuffer.arrayOffset();
                cmp = this.comparator.compare(buf, keystart + off, valstart - keystart, needle.getData(), needle.getPosition(), needle.getLength() - needle.getPosition());
            }
            return cmp;
        }

        public long getEq() {
            return this.eq;
        }

        public String toString() {
            return String.format("Span[%d,%d]", 4 * this.kvmeta.capacity(), this.kvbuffer.limit());
        }
    }

    private static final class InputByteBuffer
    extends DataInputBuffer {
        private byte[] buffer = new byte[256];
        private ByteBuffer wrapped = ByteBuffer.wrap(this.buffer);

        private InputByteBuffer() {
        }

        private void resize(int length) {
            if (length > this.buffer.length || this.buffer.length > 10 * (1 + length)) {
                this.buffer = new byte[length];
                this.wrapped = ByteBuffer.wrap(this.buffer);
            }
            this.wrapped.limit(length);
        }

        public void reset(DataInputBuffer clone) {
            byte[] data = clone.getData();
            int start = clone.getPosition();
            int length = clone.getLength() - start;
            super.reset(data, start, length);
        }

        public void copy(DataInputBuffer clone) {
            byte[] data = clone.getData();
            int start = clone.getPosition();
            int length = clone.getLength() - start;
            this.resize(length);
            System.arraycopy(data, start, this.buffer, 0, length);
            super.reset(this.buffer, 0, length);
        }
    }

    private static class BufferStreamWrapper
    extends OutputStream {
        private final ByteBuffer out;

        public BufferStreamWrapper(ByteBuffer out) {
            this.out = out;
        }

        @Override
        public void write(int b) throws IOException {
            this.out.put((byte)b);
        }

        @Override
        public void write(byte[] b) throws IOException {
            this.out.put(b);
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            this.out.put(b, off, len);
        }
    }

    private static interface PartitionedRawKeyValueIterator
    extends TezRawKeyValueIterator {
        public int getPartition();
    }
}

