package org.apache.hama.bsp;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.util.KeyValuePair;
import org.junit.Test;

/* loaded from: input_file:org/apache/hama/bsp/TestKeyValueTextInputFormat.class */
public class TestKeyValueTextInputFormat extends TestCase {

    /* loaded from: input_file:org/apache/hama/bsp/TestKeyValueTextInputFormat$KeyValueHashPartitionedBSP.class */
    public static class KeyValueHashPartitionedBSP extends BSP<Text, Text, NullWritable, NullWritable, MapWritable> {
        public static final String TEST_INPUT_VALUES = "test.bsp.max.input";
        public static final String TEST_UNEXPECTED_KEYS = "test.bsp.keys.unexpected";
        public static final String TEST_MAX_VALUE = "test.bsp.keys.max";
        private int numTasks = 0;
        private int maxValue = 0;
        private MapWritable expectedKeys = new MapWritable();

        public void setup(BSPPeer<Text, Text, NullWritable, NullWritable, MapWritable> bSPPeer) throws IOException, SyncException, InterruptedException {
            this.maxValue = bSPPeer.getConfiguration().getInt(TEST_MAX_VALUE, 1000);
            this.numTasks = bSPPeer.getNumPeers();
        }

        public void bsp(BSPPeer<Text, Text, NullWritable, NullWritable, MapWritable> bSPPeer) throws IOException, SyncException, InterruptedException {
            MapWritable mapWritable = new MapWritable();
            mapWritable.put(new Text(TEST_UNEXPECTED_KEYS), new BooleanWritable(false));
            while (true) {
                KeyValuePair readNext = bSPPeer.readNext();
                if (readNext == null) {
                    break;
                }
                Text text = (Text) readNext.getKey();
                Text text2 = (Text) readNext.getValue();
                if (Math.abs(text.hashCode() % this.numTasks) != bSPPeer.getPeerIndex()) {
                    mapWritable.put(new Text(TEST_UNEXPECTED_KEYS), new BooleanWritable(true));
                    break;
                } else {
                    if (this.expectedKeys.containsKey(text)) {
                        mapWritable.put(new Text(TEST_UNEXPECTED_KEYS), new BooleanWritable(true));
                        break;
                    }
                    this.expectedKeys.put(new Text(text), new Text(text2));
                }
            }
            mapWritable.put(new Text(TEST_INPUT_VALUES), this.expectedKeys);
            int numPeers = bSPPeer.getNumPeers() / 2;
            bSPPeer.send(bSPPeer.getPeerName(numPeers), mapWritable);
            bSPPeer.sync();
            if (bSPPeer.getPeerIndex() == numPeers) {
                HashMap hashMap = new HashMap();
                while (true) {
                    MapWritable currentMessage = bSPPeer.getCurrentMessage();
                    if (currentMessage == null) {
                        break;
                    }
                    Assert.assertEquals(false, currentMessage.get(new Text(TEST_UNEXPECTED_KEYS)).get());
                    for (Map.Entry entry : currentMessage.get(new Text(TEST_INPUT_VALUES)).entrySet()) {
                        hashMap.put(Integer.valueOf(((Writable) entry.getKey()).toString()), Integer.valueOf(((Writable) entry.getValue()).toString()));
                    }
                }
                for (int i = 0; i < this.maxValue; i++) {
                    Assert.assertEquals(true, hashMap.containsKey(Integer.valueOf(i)));
                    Assert.assertEquals(i * i, ((Integer) hashMap.get(Integer.valueOf(i))).intValue());
                }
            }
            bSPPeer.sync();
        }
    }

    @Test
    public void testInput() {
        Configuration configuration = new Configuration();
        Path path = new Path("/tmp/test_keyvalueinputformat");
        Path path2 = new Path("/tmp/test_keyvalueinputformat_out");
        try {
            FileSystem fileSystem = FileSystem.get(new URI("/tmp/test_keyvalueinputformat"), configuration);
            fileSystem.delete(path, true);
            FSDataOutputStream create = fileSystem.create(path, true);
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < 1000; i++) {
                sb.append(i);
                sb.append("\t");
                sb.append(i * i);
                sb.append("\n");
            }
            create.writeBytes(sb.toString());
            create.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
        try {
            HamaConfiguration hamaConfiguration = new HamaConfiguration();
            hamaConfiguration.setInt(KeyValueHashPartitionedBSP.TEST_MAX_VALUE, 1000);
            BSPJob bSPJob = new BSPJob(hamaConfiguration, TestKeyValueTextInputFormat.class);
            bSPJob.setJobName("Test KeyValueTextInputFormat together with HashPartitioner");
            bSPJob.setBspClass(KeyValueHashPartitionedBSP.class);
            bSPJob.setPartitioner(HashPartitioner.class);
            bSPJob.setInputPath(path);
            bSPJob.setInputFormat(KeyValueTextInputFormat.class);
            bSPJob.setInputKeyClass(Text.class);
            bSPJob.setInputValueClass(Text.class);
            bSPJob.setOutputPath(path2);
            bSPJob.setOutputFormat(SequenceFileOutputFormat.class);
            bSPJob.setOutputKeyClass(NullWritable.class);
            bSPJob.setOutputValueClass(NullWritable.class);
            bSPJob.setNumBspTask(new BSPJobClient(hamaConfiguration).getClusterStatus(true).getMaxTasks());
            assertEquals(true, bSPJob.waitForCompletion(true));
        } catch (Exception e2) {
            e2.printStackTrace();
        }
    }
}
