package org.apache.hama.bsp;

import java.io.IOException;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl;
import org.apache.hama.util.KeyValuePair;

/* loaded from: input_file:org/apache/hama/bsp/TestPartitioning.class */
public class TestPartitioning extends HamaCluster {
    public static final String TMP_OUTPUT_PATH = "/tmp/test-example/output.txt";
    protected HamaConfiguration configuration = new HamaConfiguration();
    public static final Log LOG = LogFactory.getLog(TestPartitioning.class);
    public static String TMP_OUTPUT = "/tmp/test-example/";
    public static Path OUTPUT_PATH = new Path(TMP_OUTPUT);

    /* loaded from: input_file:org/apache/hama/bsp/TestPartitioning$PartionedBSP.class */
    public static class PartionedBSP extends BSP<LongWritable, Text, NullWritable, NullWritable, NullWritable> {
        public void bsp(BSPPeer<LongWritable, Text, NullWritable, NullWritable, NullWritable> bSPPeer) throws IOException, SyncException, InterruptedException {
            long j = 0;
            while (true) {
                KeyValuePair readNext = bSPPeer.readNext();
                if (readNext == null) {
                    break;
                }
                TestPartitioning.LOG.debug(((LongWritable) readNext.getKey()).get() + " / " + ((Text) readNext.getValue()).toString());
                j++;
            }
            Assert.assertTrue(j > 2);
        }
    }

    public TestPartitioning() {
        this.configuration.set("bsp.master.address", "localhost");
        this.configuration.set("hama.child.redirect.log.console", "true");
        assertEquals("Make sure master addr is set to localhost:", "localhost", this.configuration.get("bsp.master.address"));
        this.configuration.set("bsp.local.dir", "/tmp/hama-test");
        this.configuration.set("bsp.disk.queue.dir", TMP_OUTPUT_PATH);
        this.configuration.set("hama.zookeeper.quorum", "localhost");
        this.configuration.setInt("hama.zookeeper.property.clientPort", 21810);
        this.configuration.set("hama.sync.client.class", ZooKeeperSyncClientImpl.class.getCanonicalName());
    }

    @Override // org.apache.hama.HamaCluster, org.apache.hama.HamaClusterTestCase, org.apache.hama.HamaTestCase
    public void setUp() throws Exception {
        super.setUp();
    }

    @Override // org.apache.hama.HamaClusterTestCase, org.apache.hama.HamaTestCase
    public void tearDown() throws Exception {
        super.tearDown();
    }

    public void testPartitioner() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set("bsp.local.dir", "/tmp/hama-test/partitioning");
        configuration.setBoolean("bsp.input.runtime.partitioning", true);
        BSPJob bSPJob = new BSPJob(new HamaConfiguration(configuration));
        bSPJob.setJobName("Test partitioning with input");
        bSPJob.setBspClass(PartionedBSP.class);
        bSPJob.setNumBspTask(2);
        configuration.setInt("hama.zookeeper.session.timeout", 600);
        bSPJob.setInputFormat(TextInputFormat.class);
        bSPJob.setOutputFormat(NullOutputFormat.class);
        FileInputFormat.setInputPaths(bSPJob, "../CHANGES.txt,../README.txt");
        bSPJob.setPartitioner(HashPartitioner.class);
        assertTrue(bSPJob.waitForCompletion(true));
        FileSystem.get(configuration).delete(OUTPUT_PATH, true);
        getMergeProcessorID();
    }

    public void getMergeProcessorID() {
        for (int i = 0; i < 8; i++) {
            assertTrue(PartitioningRunner.getMergeProcessorID(i, 6) < 6);
        }
    }
}
