package org.apache.giraph;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.giraph.aggregators.LongSumAggregator;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.examples.SimpleSuperstepComputation;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.worker.DefaultWorkerContext;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.giraph.zk.ZooKeeperManager;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/giraph/TestCheckpointing.class */
public class TestCheckpointing extends BspCase {
    private static final Logger LOG = Logger.getLogger(TestCheckpointing.class);
    public static final String TEST_JOB_ID = "test_job";
    private static SuperstepCallback SUPERSTEP_CALLBACK;

    /* loaded from: input_file:org/apache/giraph/TestCheckpointing$CheckpointComputation.class */
    public static class CheckpointComputation extends BasicComputation<LongWritable, IntWritable, FloatWritable, FloatWritable> {
        public void compute(Vertex<LongWritable, IntWritable, FloatWritable> vertex, Iterable<FloatWritable> iterable) throws IOException {
            Assert.assertEquals(getSuperstep() + 1, getWorkerContext().testValue);
            if (getSuperstep() > 4) {
                vertex.voteToHalt();
                return;
            }
            aggregate(LongSumAggregator.class.getName(), new LongWritable(vertex.getId().get()));
            float f = 0.0f;
            Iterator<FloatWritable> it = iterable.iterator();
            while (it.hasNext()) {
                f += it.next().get();
            }
            int i = vertex.getValue().get();
            vertex.setValue(new IntWritable(i + ((int) f)));
            for (Edge edge : vertex.getEdges()) {
                FloatWritable floatWritable = new FloatWritable(edge.getValue().get() + i);
                vertex.addEdge(EdgeFactory.create(edge.getTargetVertexId(), floatWritable));
                sendMessage(edge.getTargetVertexId(), floatWritable);
            }
        }
    }

    /* loaded from: input_file:org/apache/giraph/TestCheckpointing$CheckpointVertexMasterCompute.class */
    public static class CheckpointVertexMasterCompute extends DefaultMasterCompute {
        private int testValue = 0;

        public void compute() {
            long superstep = getSuperstep();
            if (TestCheckpointing.SUPERSTEP_CALLBACK != null) {
                TestCheckpointing.SUPERSTEP_CALLBACK.superstep(getSuperstep(), getConf());
            }
            int i = this.testValue;
            this.testValue = i + 1;
            Assert.assertEquals(superstep, i);
        }

        public void initialize() throws InstantiationException, IllegalAccessException {
            registerAggregator(LongSumAggregator.class.getName(), LongSumAggregator.class);
        }

        public void readFields(DataInput dataInput) throws IOException {
            super.readFields(dataInput);
            this.testValue = dataInput.readInt();
        }

        public void write(DataOutput dataOutput) throws IOException {
            super.write(dataOutput);
            dataOutput.writeInt(this.testValue);
        }
    }

    /* loaded from: input_file:org/apache/giraph/TestCheckpointing$CheckpointVertexWorkerContext.class */
    public static class CheckpointVertexWorkerContext extends DefaultWorkerContext {
        private static long FINAL_SUM;
        private int testValue;

        public static long getFinalSum() {
            return FINAL_SUM;
        }

        public void postSuperstep() {
            super.postSuperstep();
            sendMessageToMyself(new LongWritable(getSuperstep()));
        }

        private void sendMessageToMyself(Writable writable) {
            sendMessageToWorker(writable, getMyWorkerIndex());
        }

        public void postApplication() {
            setFinalSum(getAggregatedValue(LongSumAggregator.class.getName()).get());
            TestCheckpointing.LOG.info("FINAL_SUM=" + FINAL_SUM);
        }

        private static void setFinalSum(long j) {
            FINAL_SUM = j;
        }

        public void preSuperstep() {
            long superstep = getSuperstep();
            int i = this.testValue;
            this.testValue = i + 1;
            Assert.assertEquals(superstep, i);
            if (getSuperstep() > 0) {
                List andClearMessagesFromOtherWorkers = getAndClearMessagesFromOtherWorkers();
                Assert.assertEquals(1L, andClearMessagesFromOtherWorkers.size());
                Assert.assertEquals(getSuperstep() - 1, ((LongWritable) andClearMessagesFromOtherWorkers.get(0)).get());
            }
        }

        public void readFields(DataInput dataInput) throws IOException {
            super.readFields(dataInput);
            this.testValue = dataInput.readInt();
        }

        public void write(DataOutput dataOutput) throws IOException {
            super.write(dataOutput);
            dataOutput.writeInt(this.testValue);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/giraph/TestCheckpointing$SuperstepCallback.class */
    public interface SuperstepCallback {
        void superstep(long j, ImmutableClassesGiraphConfiguration<LongWritable, IntWritable, FloatWritable> immutableClassesGiraphConfiguration);
    }

    public TestCheckpointing() {
        super(TestCheckpointing.class.getName());
    }

    @Test
    public void testBspCheckpoint() throws InterruptedException, IOException, ClassNotFoundException {
        testBspCheckpoint(false);
    }

    @Test
    public void testAsyncMessageStoreCheckpoint() throws InterruptedException, IOException, ClassNotFoundException {
        testBspCheckpoint(true);
    }

    public void testBspCheckpoint(boolean z) throws IOException, InterruptedException, ClassNotFoundException {
        Path tempPath = getTempPath("checkpointing");
        GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
        if (z) {
            GiraphConstants.ASYNC_MESSAGE_STORE_THREADS_COUNT.set(giraphConfiguration, 2);
        }
        SUPERSTEP_CALLBACK = null;
        GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(giraphConfiguration, false);
        giraphConfiguration.setCheckpointFrequency(2);
        long runOriginalJob = runOriginalJob(tempPath, giraphConfiguration);
        Assert.assertEquals(10L, runOriginalJob);
        SUPERSTEP_CALLBACK = new SuperstepCallback() { // from class: org.apache.giraph.TestCheckpointing.1
            @Override // org.apache.giraph.TestCheckpointing.SuperstepCallback
            public void superstep(long j, ImmutableClassesGiraphConfiguration<LongWritable, IntWritable, FloatWritable> immutableClassesGiraphConfiguration) {
                if (j < 2) {
                    Assert.fail("Restarted JOB should not be executed on superstep " + j);
                }
            }
        };
        runRestartedJob(tempPath, giraphConfiguration, runOriginalJob, 2L);
    }

    private void runRestartedJob(Path path, GiraphConfiguration giraphConfiguration, long j, long j2) throws IOException, InterruptedException, ClassNotFoundException {
        LOG.info("testBspCheckpoint: Restarting from the latest superstep with checkpoint path = " + path);
        Path tempPath = getTempPath("checkpointing_restarted");
        GiraphConstants.RESTART_JOB_ID.set(giraphConfiguration, TEST_JOB_ID);
        giraphConfiguration.set("mapred.job.id", "restarted_test_job");
        if (j2 >= 0) {
            giraphConfiguration.set("giraph.restartSuperstep", Long.toString(j2));
        }
        GiraphJob prepareJob = prepareJob(getCallingMethodName() + "Restarted", giraphConfiguration, tempPath);
        GiraphConstants.CHECKPOINT_DIRECTORY.set(prepareJob.getConfiguration(), path.toString());
        Assert.assertTrue(prepareJob.run(true));
        if (runningInDistributedMode()) {
            return;
        }
        long finalSum = CheckpointVertexWorkerContext.getFinalSum();
        LOG.info("testBspCheckpoint: idSumRestarted = " + finalSum);
        Assert.assertEquals(j, finalSum);
    }

    private long runOriginalJob(Path path, GiraphConfiguration giraphConfiguration) throws IOException, InterruptedException, ClassNotFoundException {
        Path tempPath = getTempPath("checkpointing_original");
        giraphConfiguration.setComputationClass(CheckpointComputation.class);
        giraphConfiguration.setWorkerContextClass(CheckpointVertexWorkerContext.class);
        giraphConfiguration.setMasterComputeClass(CheckpointVertexMasterCompute.class);
        giraphConfiguration.setVertexInputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat.class);
        giraphConfiguration.setVertexOutputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat.class);
        giraphConfiguration.set("mapred.job.id", TEST_JOB_ID);
        GiraphJob prepareJob = prepareJob(getCallingMethodName(), giraphConfiguration, tempPath);
        GiraphConstants.CHECKPOINT_DIRECTORY.set(prepareJob.getConfiguration(), path.toString());
        Assert.assertTrue(prepareJob.run(true));
        long j = 0;
        if (!runningInDistributedMode()) {
            FileStatus singlePartFileStatus = getSinglePartFileStatus(prepareJob.getConfiguration(), tempPath);
            j = CheckpointVertexWorkerContext.getFinalSum();
            LOG.info("testBspCheckpoint: idSum = " + j + " fileLen = " + singlePartFileStatus.getLen());
        }
        return j;
    }

    @Test
    public void testManualCheckpointAtTheBeginning() throws InterruptedException, IOException, ClassNotFoundException {
        testManualCheckpoint(0);
    }

    @Test
    public void testManualCheckpoint() throws InterruptedException, IOException, ClassNotFoundException {
        testManualCheckpoint(2);
    }

    private void testManualCheckpoint(final int i) throws IOException, InterruptedException, ClassNotFoundException {
        Path tempPath = getTempPath("checkpointing");
        GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
        SUPERSTEP_CALLBACK = new SuperstepCallback() { // from class: org.apache.giraph.TestCheckpointing.2
            @Override // org.apache.giraph.TestCheckpointing.SuperstepCallback
            public void superstep(long j, ImmutableClassesGiraphConfiguration<LongWritable, IntWritable, FloatWritable> immutableClassesGiraphConfiguration) {
                if (j != i) {
                    if (j > i) {
                        Assert.fail("Job should be stopped by now " + j);
                    }
                } else {
                    try {
                        new ZooKeeperExt(immutableClassesGiraphConfiguration.getZookeeperList(), immutableClassesGiraphConfiguration.getZooKeeperSessionTimeout(), immutableClassesGiraphConfiguration.getZookeeperOpsMaxAttempts(), immutableClassesGiraphConfiguration.getZookeeperOpsRetryWaitMsecs(), TestCheckpointing.this).createExt((ZooKeeperManager.getBasePath(immutableClassesGiraphConfiguration) + "/_hadoopBsp/" + immutableClassesGiraphConfiguration.get("mapred.job.id")) + "/_checkpointAndStop", (byte[]) null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true);
                    } catch (IOException | InterruptedException | KeeperException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        };
        try {
            runOriginalJob(tempPath, giraphConfiguration);
            Assert.fail("Original job should fail after checkpointing");
        } catch (Exception e) {
            LOG.info("Original job failed, that's OK " + e);
        }
        SUPERSTEP_CALLBACK = new SuperstepCallback() { // from class: org.apache.giraph.TestCheckpointing.3
            @Override // org.apache.giraph.TestCheckpointing.SuperstepCallback
            public void superstep(long j, ImmutableClassesGiraphConfiguration<LongWritable, IntWritable, FloatWritable> immutableClassesGiraphConfiguration) {
                if (j < i) {
                    Assert.fail("Restarted JOB should not be executed on superstep " + j);
                }
            }
        };
        runRestartedJob(tempPath, giraphConfiguration, 10L, -1L);
    }
}
