package org.apache.hama.bsp;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.message.compress.SnappyCompressor;
import org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl;
import org.apache.hama.examples.ClassSerializePrinting;

/* loaded from: input_file:org/apache/hama/bsp/TestBSPMasterGroomServer.class */
public class TestBSPMasterGroomServer extends HamaCluster {
    public static final String TMP_OUTPUT_PATH = "/tmp/messageQueue";
    protected HamaConfiguration configuration = new HamaConfiguration();
    protected static Log LOG = LogFactory.getLog(TestBSPMasterGroomServer.class);
    public static String TMP_OUTPUT = "/tmp/test-example/";
    public static Path OUTPUT_PATH = new Path(TMP_OUTPUT + "serialout");

    public TestBSPMasterGroomServer() {
        this.configuration.set("bsp.master.address", "localhost");
        this.configuration.set("hama.child.redirect.log.console", "true");
        assertEquals("Make sure master addr is set to localhost:", "localhost", this.configuration.get("bsp.master.address"));
        this.configuration.set("bsp.local.dir", "/tmp/hama-test");
        this.configuration.set("bsp.disk.queue.dir", "/tmp/messageQueue");
        this.configuration.set("hama.zookeeper.quorum", "localhost");
        this.configuration.setInt("hama.zookeeper.property.clientPort", 21810);
        this.configuration.set("hama.sync.client.class", ZooKeeperSyncClientImpl.class.getCanonicalName());
    }

    @Override // org.apache.hama.HamaCluster, org.apache.hama.HamaClusterTestCase, org.apache.hama.HamaTestCase
    public void setUp() throws Exception {
        super.setUp();
    }

    @Override // org.apache.hama.HamaClusterTestCase, org.apache.hama.HamaTestCase
    public void tearDown() throws Exception {
        super.tearDown();
    }

    public void testSubmitJob() throws Exception {
        BSPJob bSPJob = new BSPJob(this.configuration, ClassSerializePrinting.class);
        bSPJob.setJobName("Test Serialize Printing");
        bSPJob.setBspClass(ClassSerializePrinting.class);
        bSPJob.setOutputFormat(SequenceFileOutputFormat.class);
        bSPJob.setOutputKeyClass(IntWritable.class);
        bSPJob.setOutputValueClass(Text.class);
        bSPJob.setOutputPath(OUTPUT_PATH);
        bSPJob.setCompressionCodec(SnappyCompressor.class);
        bSPJob.setCompressionThreshold(40L);
        BSPJobClient bSPJobClient = new BSPJobClient(this.configuration);
        this.configuration.setInt("hama.zookeeper.session.timeout", 6000);
        assertEquals(this.numOfGroom, bSPJobClient.getClusterStatus(false).getGroomServers());
        bSPJob.setNumBspTask(2);
        FileSystem fileSystem = FileSystem.get(this.configuration);
        if (bSPJob.waitForCompletion(true)) {
            checkOutput(fileSystem, this.configuration, 2);
        } else {
            fail();
        }
        LOG.info("Client finishes execution job.");
    }

    public static void checkOutput(FileSystem fileSystem, Configuration configuration, int i) throws Exception {
        FileStatus[] listStatus = fileSystem.listStatus(OUTPUT_PATH);
        assertEquals(listStatus.length, i);
        for (FileStatus fileStatus : listStatus) {
            if (!fileStatus.isDir()) {
                SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, fileStatus.getPath(), configuration);
                int i2 = 0;
                int i3 = 0;
                IntWritable intWritable = new IntWritable();
                Text text = new Text();
                while (reader.next(intWritable, text)) {
                    assertEquals(i2, intWritable.get());
                    i3++;
                    if (i3 % i == 0) {
                        i2++;
                    }
                }
                reader.close();
                assertEquals(i2, 15);
            }
        }
        fileSystem.delete(new Path(TMP_OUTPUT), true);
    }
}
