package org.apache.hama.examples;

import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.BSPMessage;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.FileOutputFormat;
import org.apache.hama.bsp.IntegerMessage;
import org.apache.hama.bsp.NullInputFormat;
import org.apache.hama.bsp.TextOutputFormat;
import org.apache.hama.bsp.sync.SyncException;

/* loaded from: input_file:org/apache/hama/examples/CombineExample.class */
public class CombineExample {
    private static Path TMP_OUTPUT = new Path("/tmp/combine-" + System.currentTimeMillis());

    /* loaded from: input_file:org/apache/hama/examples/CombineExample$MyBSP.class */
    public static class MyBSP extends BSP<NullWritable, NullWritable, Text, IntWritable> {
        public static final Log LOG = LogFactory.getLog(MyBSP.class);

        public void bsp(BSPPeer<NullWritable, NullWritable, Text, IntWritable> bSPPeer) throws IOException, SyncException, InterruptedException {
            for (String str : bSPPeer.getAllPeerNames()) {
                bSPPeer.send(str, new IntegerMessage(bSPPeer.getPeerName(), 1));
                bSPPeer.send(str, new IntegerMessage(bSPPeer.getPeerName(), 2));
                bSPPeer.send(str, new IntegerMessage(bSPPeer.getPeerName(), 3));
            }
            bSPPeer.sync();
            while (true) {
                IntegerMessage currentMessage = bSPPeer.getCurrentMessage();
                if (currentMessage == null) {
                    return;
                } else {
                    bSPPeer.write(new Text(currentMessage.getTag()), new IntWritable(currentMessage.getData().intValue()));
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/hama/examples/CombineExample$SumCombiner.class */
    public static class SumCombiner extends Combiner {
        public BSPMessageBundle combine(Iterable<BSPMessage> iterable) {
            BSPMessageBundle bSPMessageBundle = new BSPMessageBundle();
            int i = 0;
            Iterator<BSPMessage> it = iterable.iterator();
            while (it.hasNext()) {
                i += it.next().getData().intValue();
            }
            bSPMessageBundle.addMessage(new IntegerMessage("Sum = ", i));
            return bSPMessageBundle;
        }
    }

    static void printOutput(HamaConfiguration hamaConfiguration) throws IOException {
        FileSystem fileSystem = FileSystem.get(hamaConfiguration);
        FileStatus[] listStatus = fileSystem.listStatus(TMP_OUTPUT);
        int i = 0;
        while (true) {
            if (i >= listStatus.length) {
                break;
            }
            if (listStatus[i].getLen() > 0) {
                FSDataInputStream open = fileSystem.open(listStatus[i].getPath());
                IOUtils.copyBytes(open, System.out, hamaConfiguration, false);
                open.close();
                break;
            }
            i++;
        }
        fileSystem.delete(TMP_OUTPUT, true);
    }

    public static void main(String[] strArr) throws InterruptedException, IOException, ClassNotFoundException {
        HamaConfiguration hamaConfiguration = new HamaConfiguration();
        BSPJob bSPJob = new BSPJob(hamaConfiguration, CombineExample.class);
        bSPJob.setJobName("Combine Example");
        bSPJob.setBspClass(MyBSP.class);
        bSPJob.setCombinerClass(SumCombiner.class);
        bSPJob.setInputFormat(NullInputFormat.class);
        bSPJob.setOutputKeyClass(Text.class);
        bSPJob.setOutputValueClass(IntWritable.class);
        bSPJob.setOutputFormat(TextOutputFormat.class);
        FileOutputFormat.setOutputPath(bSPJob, TMP_OUTPUT);
        bSPJob.setNumBspTask(2);
        long currentTimeMillis = System.currentTimeMillis();
        if (bSPJob.waitForCompletion(true)) {
            printOutput(hamaConfiguration);
            System.out.println("Job Finished in " + ((System.currentTimeMillis() - currentTimeMillis) / 1000.0d) + " seconds");
        }
    }
}
