/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Formatter;
import java.util.Iterator;
import junit.extensions.TestSetup;
import junit.framework.Assert;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.TaskCounter;

public class TestReduceFetchFromPartialMem
extends TestCase {
    protected static MiniMRCluster mrCluster = null;
    protected static MiniDFSCluster dfsCluster = null;
    protected static TestSuite mySuite;
    private static final String tagfmt = "%04d";
    private static final String keyfmt = "KEYKEYKEYKEYKEYKEYKE";
    private static final int keylen;

    protected static void setSuite(Class<? extends TestCase> klass) {
        mySuite = new TestSuite(klass);
    }

    public static Test suite() {
        TestSetup setup = new TestSetup((Test)mySuite){

            protected void setUp() throws Exception {
                Configuration conf = new Configuration();
                dfsCluster = new MiniDFSCluster(conf, 2, true, null);
                mrCluster = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(), 1);
            }

            protected void tearDown() throws Exception {
                if (dfsCluster != null) {
                    dfsCluster.shutdown();
                }
                if (mrCluster != null) {
                    mrCluster.shutdown();
                }
            }
        };
        return setup;
    }

    private static int getValLen(int id, int nMaps) {
        return 4096 / nMaps * (id + 1);
    }

    public void testReduceFromPartialMem() throws Exception {
        int MAP_TASKS = 7;
        JobConf job = mrCluster.createJobConf();
        job.setNumMapTasks(7);
        job.setInt("mapreduce.reduce.merge.inmem.threshold", 0);
        job.set("mapreduce.reduce.input.buffer.percent", "1.0");
        job.setInt("mapreduce.reduce.shuffle.parallelcopies", 1);
        job.setInt("mapreduce.task.io.sort.mb", 10);
        job.set("mapreduce.reduce.java.opts", "-Xmx128m");
        job.setLong("mapreduce.reduce.memory.totalbytes", 0x8000000L);
        job.set("mapreduce.reduce.shuffle.input.buffer.percent", "0.14");
        job.set("mapreduce.reduce.shuffle.merge.percent", "1.0");
        Counters c = TestReduceFetchFromPartialMem.runJob(job);
        long out = ((Counters.Counter)c.findCounter((Enum)TaskCounter.MAP_OUTPUT_RECORDS)).getCounter();
        long spill = ((Counters.Counter)c.findCounter((Enum)TaskCounter.SPILLED_RECORDS)).getCounter();
        TestReduceFetchFromPartialMem.assertTrue((String)("Expected some records not spilled during reduce" + spill + ")"), (spill < 2L * out ? 1 : 0) != 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static Counters runJob(JobConf conf) throws Exception {
        conf.setMapperClass(MapMB.class);
        conf.setReducerClass(MBValidate.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);
        conf.setNumReduceTasks(1);
        conf.setInputFormat(FakeIF.class);
        conf.setNumTasksToExecutePerJvm(1);
        conf.setInt("mapreduce.map.maxattempts", 0);
        conf.setInt("mapreduce.reduce.maxattempts", 0);
        FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{new Path("/in")});
        Path outp = new Path("/out");
        FileOutputFormat.setOutputPath((JobConf)conf, (Path)outp);
        RunningJob job = null;
        try {
            job = JobClient.runJob((JobConf)conf);
            TestReduceFetchFromPartialMem.assertTrue((boolean)job.isSuccessful());
        }
        finally {
            FileSystem fs = dfsCluster.getFileSystem();
            if (fs.exists(outp)) {
                fs.delete(outp, true);
            }
        }
        return job.getCounters();
    }

    static {
        TestReduceFetchFromPartialMem.setSuite(TestReduceFetchFromPartialMem.class);
        keylen = keyfmt.length();
    }

    public static class FakeIF
    implements InputFormat<NullWritable, NullWritable> {
        public InputSplit[] getSplits(JobConf conf, int numSplits) {
            InputSplit[] splits = new InputSplit[numSplits];
            for (int i = 0; i < splits.length; ++i) {
                splits[i] = new FakeSplit();
            }
            return splits;
        }

        public RecordReader<NullWritable, NullWritable> getRecordReader(InputSplit ignored, JobConf conf, Reporter reporter) {
            return new RecordReader<NullWritable, NullWritable>(){
                private boolean done = false;

                public boolean next(NullWritable key, NullWritable value) throws IOException {
                    if (this.done) {
                        return false;
                    }
                    this.done = true;
                    return true;
                }

                public NullWritable createKey() {
                    return NullWritable.get();
                }

                public NullWritable createValue() {
                    return NullWritable.get();
                }

                public long getPos() throws IOException {
                    return 0L;
                }

                public void close() throws IOException {
                }

                public float getProgress() throws IOException {
                    return 0.0f;
                }
            };
        }
    }

    public static class FakeSplit
    implements InputSplit {
        public void write(DataOutput out) throws IOException {
        }

        public void readFields(DataInput in) throws IOException {
        }

        public long getLength() {
            return 0L;
        }

        public String[] getLocations() {
            return new String[0];
        }
    }

    public static class MBValidate
    implements Reducer<Text, Text, Text, Text> {
        private static int nMaps;
        private static final Text vb;
        private int nRec = 0;
        private int nKey = -1;
        private int aKey = -1;
        private int bKey = -1;
        private final Text kb = new Text();
        private final Formatter fmt = new Formatter(new StringBuilder(25));

        public void configure(JobConf conf) {
            nMaps = conf.getNumMapTasks();
            ((StringBuilder)this.fmt.out()).append(TestReduceFetchFromPartialMem.keyfmt);
        }

        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> out, Reporter reporter) throws IOException {
            int vcCheck;
            int vlen;
            int recCheck;
            int vc = 0;
            int preRec = this.nRec;
            ((StringBuilder)this.fmt.out()).setLength(keylen);
            if (25 == key.getLength()) {
                recCheck = 1;
                switch ((char)key.getBytes()[0]) {
                    case 'A': {
                        vlen = TestReduceFetchFromPartialMem.getValLen(++this.aKey, MBValidate.nMaps) - 128;
                        vcCheck = this.aKey;
                        break;
                    }
                    case 'B': {
                        vlen = TestReduceFetchFromPartialMem.getValLen(++this.bKey, MBValidate.nMaps);
                        vcCheck = this.bKey;
                        break;
                    }
                    default: {
                        vcCheck = -1;
                        vlen = -1;
                        Assert.fail((String)("Unexpected tag on record: " + (char)key.getBytes()[24]));
                    }
                }
                this.kb.set((char)key.getBytes()[0] + this.fmt.format(TestReduceFetchFromPartialMem.tagfmt, vcCheck).toString());
            } else {
                this.kb.set(this.fmt.format(TestReduceFetchFromPartialMem.tagfmt, ++this.nKey).toString());
                vlen = 1000;
                recCheck = nMaps;
                vcCheck = nMaps * (nMaps - 1) >>> 1;
            }
            Assert.assertEquals((Object)this.kb, (Object)key);
            while (values.hasNext()) {
                Text val = values.next();
                vc += val.getBytes()[0];
                Assert.assertEquals((int)0, (int)WritableComparator.compareBytes((byte[])vb.getBytes(), (int)1, (int)(vlen - 1), (byte[])val.getBytes(), (int)1, (int)(val.getLength() - 1)));
                out.collect((Object)key, (Object)val);
                ++this.nRec;
            }
            Assert.assertEquals((String)("Bad rec count for " + key), (int)recCheck, (int)(this.nRec - preRec));
            Assert.assertEquals((String)("Bad rec group for " + key), (int)vcCheck, (int)vc);
        }

        public void close() throws IOException {
            Assert.assertEquals((int)4095, (int)this.nKey);
            Assert.assertEquals((int)(nMaps - 1), (int)this.aKey);
            Assert.assertEquals((int)(nMaps - 1), (int)this.bKey);
            Assert.assertEquals((String)"Bad record count", (int)(nMaps * 4098), (int)this.nRec);
        }

        static {
            vb = new Text();
            byte[] v = new byte[4096];
            Arrays.fill(v, (byte)86);
            vb.set(v);
        }
    }

    public static class MapMB
    implements Mapper<NullWritable, NullWritable, Text, Text> {
        private int id;
        private int nMaps;
        private final Text key = new Text();
        private final Text val = new Text();
        private final byte[] b = new byte[4096];
        private final Formatter fmt = new Formatter(new StringBuilder(25));

        public void configure(JobConf conf) {
            this.nMaps = conf.getNumMapTasks();
            this.id = this.nMaps - conf.getInt("mapreduce.task.partition", -1) - 1;
            Arrays.fill(this.b, 0, 4096, (byte)86);
            ((StringBuilder)this.fmt.out()).append(TestReduceFetchFromPartialMem.keyfmt);
        }

        public void map(NullWritable nk, NullWritable nv, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            this.val.set(this.b, 0, 1000);
            this.val.getBytes()[0] = (byte)this.id;
            for (int i = 0; i < 4096; ++i) {
                this.key.set(this.fmt.format(TestReduceFetchFromPartialMem.tagfmt, i).toString());
                output.collect((Object)this.key, (Object)this.val);
                ((StringBuilder)this.fmt.out()).setLength(keylen);
            }
            this.val.set(this.b, 0, TestReduceFetchFromPartialMem.getValLen(this.id, this.nMaps) - 128);
            this.val.getBytes()[0] = (byte)this.id;
            ((StringBuilder)this.fmt.out()).setLength(keylen);
            this.key.set("A" + this.fmt.format(TestReduceFetchFromPartialMem.tagfmt, this.id).toString());
            output.collect((Object)this.key, (Object)this.val);
            this.val.set(this.b, 0, TestReduceFetchFromPartialMem.getValLen(this.id, this.nMaps));
            this.val.getBytes()[0] = (byte)this.id;
            ((StringBuilder)this.fmt.out()).setLength(keylen);
            this.key.set("B" + this.fmt.format(TestReduceFetchFromPartialMem.tagfmt, this.id).toString());
            output.collect((Object)this.key, (Object)this.val);
        }

        public void close() throws IOException {
        }
    }
}

