package org.apache.giraph.examples;

import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.giraph.aggregators.LongSumAggregator;
import org.apache.giraph.benchmark.RandomMessageBenchmark;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.worker.WorkerContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/examples/SimpleCheckpoint.class */
public class SimpleCheckpoint implements Tool {
    public static final int FAULTING_SUPERSTEP = 4;
    public static final long FAULTING_VERTEX_ID = 1;
    public static final String SUPERSTEP_COUNT = "simpleCheckpointVertex.superstepCount";
    public static final String ENABLE_FAULT = "simpleCheckpointVertex.enableFault";
    private static final Logger LOG = Logger.getLogger(SimpleCheckpoint.class);
    private Configuration conf;

    /* loaded from: input_file:org/apache/giraph/examples/SimpleCheckpoint$SimpleCheckpointComputation.class */
    public static class SimpleCheckpointComputation extends BasicComputation<LongWritable, IntWritable, FloatWritable, FloatWritable> {
        @Override // org.apache.giraph.graph.AbstractComputation, org.apache.giraph.graph.Computation
        public void compute(Vertex<LongWritable, IntWritable, FloatWritable> vertex, Iterable<FloatWritable> iterable) throws IOException {
            SimpleCheckpointVertexWorkerContext simpleCheckpointVertexWorkerContext = (SimpleCheckpointVertexWorkerContext) getWorkerContext();
            boolean enableFault = simpleCheckpointVertexWorkerContext.getEnableFault();
            int supersteps = simpleCheckpointVertexWorkerContext.getSupersteps();
            if (enableFault && getSuperstep() == 4 && getContext().getTaskAttemptID().getId() == 0 && vertex.getId().get() == 1) {
                SimpleCheckpoint.LOG.info("compute: Forced a fault on the first attempt of superstep 4 and vertex id 1");
                System.exit(-1);
            }
            if (getSuperstep() > supersteps) {
                vertex.voteToHalt();
                return;
            }
            long j = getAggregatedValue(LongSumAggregator.class.getName()).get();
            SimpleCheckpoint.LOG.info("compute: " + j);
            aggregate(LongSumAggregator.class.getName(), new LongWritable(vertex.getId().get()));
            SimpleCheckpoint.LOG.info("compute: sum = " + j + " for vertex " + vertex.getId());
            float f = 0.0f;
            Iterator<FloatWritable> it2 = iterable.iterator();
            while (it2.hasNext()) {
                float f2 = it2.next().get();
                f += f2;
                SimpleCheckpoint.LOG.info("compute: got msgValue = " + f2 + " for vertex " + vertex.getId() + " on superstep " + getSuperstep());
            }
            int i = vertex.getValue().get();
            vertex.setValue(new IntWritable(i + ((int) f)));
            SimpleCheckpoint.LOG.info("compute: vertex " + vertex.getId() + " has value " + vertex.getValue() + " on superstep " + getSuperstep());
            for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
                FloatWritable floatWritable = new FloatWritable(edge.mo2127getValue().get() + i);
                Edge<LongWritable, FloatWritable> create = EdgeFactory.create(edge.getTargetVertexId(), floatWritable);
                SimpleCheckpoint.LOG.info("compute: vertex " + vertex.getId() + " sending edgeValue " + edge.mo2127getValue() + " vertexValue " + i + " total " + floatWritable + " to vertex " + edge.getTargetVertexId() + " on superstep " + getSuperstep());
                vertex.addEdge(create);
                sendMessage(edge.getTargetVertexId(), floatWritable);
            }
        }
    }

    /* loaded from: input_file:org/apache/giraph/examples/SimpleCheckpoint$SimpleCheckpointVertexMasterCompute.class */
    public static class SimpleCheckpointVertexMasterCompute extends DefaultMasterCompute {
        @Override // org.apache.giraph.master.DefaultMasterCompute, org.apache.giraph.master.MasterCompute
        public void initialize() throws InstantiationException, IllegalAccessException {
            registerAggregator(LongSumAggregator.class.getName(), LongSumAggregator.class);
        }
    }

    /* loaded from: input_file:org/apache/giraph/examples/SimpleCheckpoint$SimpleCheckpointVertexWorkerContext.class */
    public static class SimpleCheckpointVertexWorkerContext extends WorkerContext {
        public static final String FAULT_FILE = "/tmp/faultFile";
        private static long FINAL_SUM;
        private int supersteps = 6;
        private boolean enableFault = false;

        public static long getFinalSum() {
            return FINAL_SUM;
        }

        @Override // org.apache.giraph.worker.WorkerContext
        public void preApplication() throws InstantiationException, IllegalAccessException {
            this.supersteps = getContext().getConfiguration().getInt(SimpleCheckpoint.SUPERSTEP_COUNT, this.supersteps);
            this.enableFault = getContext().getConfiguration().getBoolean(SimpleCheckpoint.ENABLE_FAULT, false);
        }

        @Override // org.apache.giraph.worker.WorkerContext
        public void postApplication() {
            setFinalSum(getAggregatedValue(LongSumAggregator.class.getName()).get());
            SimpleCheckpoint.LOG.info("FINAL_SUM=" + FINAL_SUM);
        }

        private static void setFinalSum(long j) {
            FINAL_SUM = j;
        }

        @Override // org.apache.giraph.worker.WorkerContext
        public void preSuperstep() {
        }

        @Override // org.apache.giraph.worker.WorkerContext
        public void postSuperstep() {
        }

        public int getSupersteps() {
            return this.supersteps;
        }

        public boolean getEnableFault() {
            return this.enableFault;
        }
    }

    public int run(String[] strArr) throws Exception {
        Options options = new Options();
        options.addOption("h", "help", false, "Help");
        options.addOption("v", "verbose", false, "Verbose");
        options.addOption("w", RandomMessageBenchmark.WORKERS_NUM, true, "Number of workers");
        options.addOption("s", "supersteps", true, "Supersteps to execute before finishing");
        options.addOption("w", RandomMessageBenchmark.WORKERS_NUM, true, "Minimum number of workers");
        options.addOption("o", "outputDirectory", true, "Output directory");
        HelpFormatter helpFormatter = new HelpFormatter();
        if (strArr.length == 0) {
            helpFormatter.printHelp(getClass().getName(), options, true);
            return 0;
        }
        CommandLine parse = new PosixParser().parse(options, strArr);
        if (parse.hasOption('h')) {
            helpFormatter.printHelp(getClass().getName(), options, true);
            return 0;
        }
        if (!parse.hasOption('w')) {
            LOG.info("Need to choose the number of workers (-w)");
            return -1;
        }
        if (!parse.hasOption('o')) {
            LOG.info("Need to set the output directory (-o)");
            return -1;
        }
        GiraphJob giraphJob = new GiraphJob(getConf(), getClass().getName());
        giraphJob.getConfiguration().setComputationClass(SimpleCheckpointComputation.class);
        giraphJob.getConfiguration().setVertexInputFormatClass(GeneratedVertexInputFormat.class);
        giraphJob.getConfiguration().setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
        giraphJob.getConfiguration().setWorkerContextClass(SimpleCheckpointVertexWorkerContext.class);
        giraphJob.getConfiguration().setMasterComputeClass(SimpleCheckpointVertexMasterCompute.class);
        giraphJob.getConfiguration().setWorkerConfiguration(Integer.parseInt(parse.getOptionValue('w')), Integer.parseInt(parse.getOptionValue('w')), 100.0f);
        FileOutputFormat.setOutputPath(giraphJob.getInternalJob(), new Path(parse.getOptionValue('o')));
        boolean z = false;
        if (parse.hasOption('v')) {
            z = true;
        }
        if (parse.hasOption('s')) {
            getConf().setInt(SUPERSTEP_COUNT, Integer.parseInt(parse.getOptionValue('s')));
        }
        return giraphJob.run(z) ? 0 : -1;
    }

    public static void main(String[] strArr) throws Exception {
        System.exit(ToolRunner.run(new SimpleCheckpoint(), strArr));
    }

    public Configuration getConf() {
        return this.conf;
    }

    public void setConf(Configuration configuration) {
        this.conf = configuration;
    }
}
