/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.mapreduce.examples.terasort;

import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.QuickSort;
import org.apache.hadoop.util.StringUtils;
import org.apache.tez.mapreduce.examples.terasort.TeraScheduler;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class TeraInputFormat
extends FileInputFormat<Text, Text> {
    static final String PARTITION_FILENAME = "_partition.lst";
    private static final String NUM_PARTITIONS = "mapreduce.terasort.num.partitions";
    private static final String SAMPLE_SIZE = "mapreduce.terasort.partitions.sample";
    static final int KEY_LENGTH = 10;
    static final int VALUE_LENGTH = 90;
    static final int RECORD_LENGTH = 100;
    private static MRJobConfig lastContext = null;
    private static List<InputSplit> lastResult = null;

    public static void writePartitionFile(final JobContext job, Path partFile) throws Throwable {
        long t1 = System.currentTimeMillis();
        Configuration conf = job.getConfiguration();
        final TeraInputFormat inFormat = new TeraInputFormat();
        final TextSampler sampler = new TextSampler();
        int partitions = job.getNumReduceTasks();
        long sampleSize = conf.getLong(SAMPLE_SIZE, 100000L);
        final List<InputSplit> splits = inFormat.getSplits(job);
        long t2 = System.currentTimeMillis();
        System.out.println("Computing input splits took " + (t2 - t1) + "ms");
        int samples = Math.min(conf.getInt(NUM_PARTITIONS, 10), splits.size());
        System.out.println("Sampling " + samples + " splits of " + splits.size());
        final long recordsPerSample = sampleSize / (long)samples;
        final int sampleStep = splits.size() / samples;
        Thread[] samplerReader = new Thread[samples];
        SamplerThreadGroup threadGroup = new SamplerThreadGroup("Sampler Reader Thread Group");
        for (int i = 0; i < samples; ++i) {
            final int idx = i;
            samplerReader[i] = new Thread(threadGroup, "Sampler Reader " + idx){
                {
                    super(x0, x1);
                    this.setDaemon(true);
                }

                public void run() {
                    long records = 0L;
                    try {
                        TaskAttemptContextImpl context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
                        RecordReader<Text, Text> reader = inFormat.createRecordReader((InputSplit)splits.get(sampleStep * idx), (TaskAttemptContext)context);
                        reader.initialize((InputSplit)splits.get(sampleStep * idx), (TaskAttemptContext)context);
                        while (reader.nextKeyValue()) {
                            sampler.addKey(new Text((Text)reader.getCurrentKey()));
                            if (recordsPerSample > ++records) continue;
                            break;
                        }
                    }
                    catch (IOException ie) {
                        System.err.println("Got an exception while reading splits " + StringUtils.stringifyException((Throwable)ie));
                        throw new RuntimeException(ie);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
            };
            samplerReader[i].start();
        }
        FileSystem outFs = partFile.getFileSystem(conf);
        FSDataOutputStream writer = outFs.create(partFile, true, 65536, (short)10, outFs.getDefaultBlockSize(partFile));
        for (int i = 0; i < samples; ++i) {
            try {
                samplerReader[i].join();
                if (threadGroup.getThrowable() == null) continue;
                throw threadGroup.getThrowable();
            }
            catch (InterruptedException e) {
                // empty catch block
            }
        }
        for (Text split : sampler.createPartitions(partitions)) {
            split.write((DataOutput)writer);
        }
        writer.close();
        long t3 = System.currentTimeMillis();
        System.out.println("Computing parititions took " + (t3 - t2) + "ms");
    }

    public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
        return new TeraRecordReader();
    }

    protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
        return new TeraFileSplit(file, start, length, hosts);
    }

    public List<InputSplit> getSplits(JobContext job) throws IOException {
        if (job == lastContext) {
            return lastResult;
        }
        long t1 = System.currentTimeMillis();
        lastContext = job;
        lastResult = super.getSplits(job);
        long t2 = System.currentTimeMillis();
        System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
        if (job.getConfiguration().getBoolean(TeraScheduler.USE, true)) {
            TeraScheduler scheduler = new TeraScheduler(lastResult.toArray(new TeraFileSplit[0]), job.getConfiguration());
            lastResult = scheduler.getNewFileSplits();
            long t3 = System.currentTimeMillis();
            System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");
        }
        return lastResult;
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    static class TeraRecordReader
    extends RecordReader<Text, Text> {
        private FSDataInputStream in;
        private long offset;
        private long length;
        private static final int RECORD_LENGTH = 100;
        private byte[] buffer = new byte[100];
        private Text key;
        private Text value;

        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            Path p = ((FileSplit)split).getPath();
            FileSystem fs = p.getFileSystem(context.getConfiguration());
            this.in = fs.open(p);
            long start = ((FileSplit)split).getStart();
            this.offset = (100L - start % 100L) % 100L;
            this.in.seek(start + this.offset);
            this.length = ((FileSplit)split).getLength();
        }

        public void close() throws IOException {
            this.in.close();
        }

        public Text getCurrentKey() {
            return this.key;
        }

        public Text getCurrentValue() {
            return this.value;
        }

        public float getProgress() throws IOException {
            return (float)this.offset / (float)this.length;
        }

        public boolean nextKeyValue() throws IOException {
            if (this.offset >= this.length) {
                return false;
            }
            int read = 0;
            while (read < 100) {
                long newRead = this.in.read(this.buffer, read, 100 - read);
                if (newRead == -1L) {
                    if (read == 0) {
                        return false;
                    }
                    throw new EOFException("read past eof");
                }
                read = (int)((long)read + newRead);
            }
            if (this.key == null) {
                this.key = new Text();
            }
            if (this.value == null) {
                this.value = new Text();
            }
            this.key.set(this.buffer, 0, 10);
            this.value.set(this.buffer, 10, 90);
            this.offset += 100L;
            return true;
        }
    }

    static class SamplerThreadGroup
    extends ThreadGroup {
        private Throwable throwable;

        public SamplerThreadGroup(String s) {
            super(s);
        }

        public void uncaughtException(Thread thread, Throwable throwable) {
            this.throwable = throwable;
        }

        public Throwable getThrowable() {
            return this.throwable;
        }
    }

    static class TextSampler
    implements IndexedSortable {
        private ArrayList<Text> records = new ArrayList();

        TextSampler() {
        }

        public int compare(int i, int j) {
            Text left = this.records.get(i);
            Text right = this.records.get(j);
            return left.compareTo((BinaryComparable)right);
        }

        public void swap(int i, int j) {
            Text left = this.records.get(i);
            Text right = this.records.get(j);
            this.records.set(j, left);
            this.records.set(i, right);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void addKey(Text key) {
            TextSampler textSampler = this;
            synchronized (textSampler) {
                this.records.add(new Text(key));
            }
        }

        Text[] createPartitions(int numPartitions) {
            int numRecords = this.records.size();
            System.out.println("Making " + numPartitions + " from " + numRecords + " sampled records");
            if (numPartitions > numRecords) {
                throw new IllegalArgumentException("Requested more partitions than input keys (" + numPartitions + " > " + numRecords + ")");
            }
            new QuickSort().sort((IndexedSortable)this, 0, this.records.size());
            float stepSize = (float)numRecords / (float)numPartitions;
            Text[] result = new Text[numPartitions - 1];
            for (int i = 1; i < numPartitions; ++i) {
                result[i - 1] = this.records.get(Math.round(stepSize * (float)i));
            }
            return result;
        }
    }

    static class TeraFileSplit
    extends FileSplit {
        private static String[] ZERO_LOCATIONS = new String[0];
        private String[] locations;

        public TeraFileSplit() {
            this.locations = ZERO_LOCATIONS;
        }

        public TeraFileSplit(Path file, long start, long length, String[] hosts) {
            super(file, start, length, hosts);
            try {
                this.locations = super.getLocations();
            }
            catch (IOException e) {
                this.locations = ZERO_LOCATIONS;
            }
        }

        protected void setLocations(String[] hosts) {
            this.locations = hosts;
        }

        public String[] getLocations() {
            return this.locations;
        }

        public String toString() {
            StringBuffer result = new StringBuffer();
            result.append(this.getPath());
            result.append(" from ");
            result.append(this.getStart());
            result.append(" length ");
            result.append(this.getLength());
            for (String host : this.getLocations()) {
                result.append(" ");
                result.append(host);
            }
            return result.toString();
        }
    }
}

