/*
 * Decompiled with CFR 0.152.
 */
package org.apache.giraph.block_app.framework;

import com.google.common.collect.Iterables;
import java.util.Map;
import org.apache.giraph.block_app.framework.api.BlockMasterApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
import org.apache.giraph.block_app.framework.api.CreateReducersApi;
import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
import org.apache.giraph.block_app.framework.block.Block;
import org.apache.giraph.block_app.framework.block.RepeatUntilBlock;
import org.apache.giraph.block_app.framework.block.SequenceBlock;
import org.apache.giraph.block_app.framework.piece.AbstractPiece;
import org.apache.giraph.block_app.framework.piece.Piece;
import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
import org.apache.giraph.block_app.library.Pieces;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.function.Consumer;
import org.apache.giraph.function.ObjectTransfer;
import org.apache.giraph.function.Supplier;
import org.apache.giraph.function.primitive.PrimitiveRefs;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.master.MasterGlobalCommUsage;
import org.apache.giraph.reducers.ReduceOperation;
import org.apache.giraph.reducers.impl.SumReduce;
import org.apache.giraph.types.NoMessage;
import org.apache.giraph.utils.TestGraph;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.junit.Assert;
import org.junit.Test;

public class BlockExecutionTest {
    private static GiraphConfiguration createConf() {
        GiraphConfiguration conf = new GiraphConfiguration();
        GiraphConstants.VERTEX_ID_CLASS.set((Configuration)conf, LongWritable.class);
        GiraphConstants.VERTEX_VALUE_CLASS.set((Configuration)conf, LongWritable.class);
        GiraphConstants.EDGE_VALUE_CLASS.set((Configuration)conf, NullWritable.class);
        return conf;
    }

    private static TestGraph<LongWritable, LongWritable, NullWritable> createTestGraph() {
        TestGraph graph = new TestGraph(BlockExecutionTest.createConf());
        graph.addVertex((WritableComparable)new LongWritable(1L), (Writable)new LongWritable(), new Map.Entry[0]);
        graph.addVertex((WritableComparable)new LongWritable(2L), (Writable)new LongWritable(), new Map.Entry[0]);
        graph.addVertex((WritableComparable)new LongWritable(3L), (Writable)new LongWritable(), new Map.Entry[0]);
        graph.addVertex((WritableComparable)new LongWritable(4L), (Writable)new LongWritable(), new Map.Entry[0]);
        graph.addEdge((WritableComparable)new LongWritable(1L), (WritableComparable)new LongWritable(2L), (Writable)NullWritable.get());
        graph.addEdge((WritableComparable)new LongWritable(2L), (WritableComparable)new LongWritable(1L), (Writable)NullWritable.get());
        graph.addEdge((WritableComparable)new LongWritable(2L), (WritableComparable)new LongWritable(3L), (Writable)NullWritable.get());
        graph.addEdge((WritableComparable)new LongWritable(3L), (WritableComparable)new LongWritable(2L), (Writable)NullWritable.get());
        return graph;
    }

    @Test
    public void testMessageSending() {
        TestGraph<LongWritable, LongWritable, NullWritable> graph = BlockExecutionTest.createTestGraph();
        LocalBlockRunner.runBlock(graph, (Block)new Piece<WritableComparable, LongWritable, Writable, BooleanWritable, Object>(){

            public VertexSender<WritableComparable, LongWritable, Writable> getVertexSender(final BlockWorkerSendApi<WritableComparable, LongWritable, Writable, BooleanWritable> workerApi, Object executionStage) {
                return new AbstractPiece.InnerVertexSender(){

                    public void vertexSend(Vertex<WritableComparable, LongWritable, Writable> vertex) {
                        workerApi.sendMessageToAllEdges(vertex, (Writable)new BooleanWritable());
                    }
                };
            }

            public VertexReceiver<WritableComparable, LongWritable, Writable, BooleanWritable> getVertexReceiver(BlockWorkerReceiveApi<WritableComparable> workerApi, Object executionStage) {
                return new AbstractPiece.InnerVertexReceiver(){

                    public void vertexReceive(Vertex<WritableComparable, LongWritable, Writable> vertex, Iterable<BooleanWritable> messages) {
                        ((LongWritable)vertex.getValue()).set((long)Iterables.size(messages));
                    }
                };
            }

            protected Class<BooleanWritable> getMessageClass() {
                return BooleanWritable.class;
            }
        }, (Object)new Object());
        Assert.assertEquals((long)1L, (long)((LongWritable)graph.getVertex((WritableComparable)new LongWritable(1L)).getValue()).get());
        Assert.assertEquals((long)2L, (long)((LongWritable)graph.getVertex((WritableComparable)new LongWritable(2L)).getValue()).get());
        Assert.assertEquals((long)1L, (long)((LongWritable)graph.getVertex((WritableComparable)new LongWritable(3L)).getValue()).get());
        Assert.assertEquals((long)0L, (long)((LongWritable)graph.getVertex((WritableComparable)new LongWritable(4L)).getValue()).get());
    }

    @Test
    public void testReducing() {
        TestGraph<LongWritable, LongWritable, NullWritable> graph = BlockExecutionTest.createTestGraph();
        final LongWritable value = new LongWritable();
        LocalBlockRunner.runBlock(graph, (Block)new Piece<WritableComparable, Writable, Writable, NoMessage, Object>(){
            private ReducerHandle<LongWritable, LongWritable> numVertices;

            public void registerReducers(CreateReducersApi reduceApi, Object executionStage) {
                this.numVertices = reduceApi.createLocalReducer((ReduceOperation)SumReduce.LONG);
            }

            public VertexSender<WritableComparable, Writable, Writable> getVertexSender(BlockWorkerSendApi<WritableComparable, Writable, Writable, NoMessage> workerApi, Object executionStage) {
                return new AbstractPiece.InnerVertexSender(){

                    public void vertexSend(Vertex<WritableComparable, Writable, Writable> vertex) {
                        numVertices.reduce((Object)new LongWritable(1L));
                    }
                };
            }

            public void masterCompute(BlockMasterApi masterApi, Object executionStage) {
                value.set(((LongWritable)this.numVertices.getReducedValue((MasterGlobalCommUsage)masterApi)).get());
            }
        }, (Object)new Object());
        Assert.assertEquals((long)4L, (long)value.get());
    }

    public void testVertexRemoval() {
        TestGraph<LongWritable, LongWritable, NullWritable> graph = BlockExecutionTest.createTestGraph();
        LocalBlockRunner.runBlock(graph, (Block)new Piece<LongWritable, Writable, Writable, NoMessage, Object>(){

            public VertexSender<LongWritable, Writable, Writable> getVertexSender(final BlockWorkerSendApi<LongWritable, Writable, Writable, NoMessage> workerApi, Object executionStage) {
                return new AbstractPiece.InnerVertexSender(){

                    public void vertexSend(Vertex<LongWritable, Writable, Writable> vertex) {
                        long id = ((LongWritable)vertex.getId()).get();
                        if (id == 1L || id == 3L) {
                            workerApi.removeVertexRequest((WritableComparable)((LongWritable)vertex.getId()));
                        }
                    }
                };
            }
        }, (Object)new Object());
        Assert.assertNull((Object)graph.getVertex((WritableComparable)new LongWritable(1L)));
        Assert.assertNotNull((Object)graph.getVertex((WritableComparable)new LongWritable(2L)));
        Assert.assertNull((Object)graph.getVertex((WritableComparable)new LongWritable(3L)));
        Assert.assertNotNull((Object)graph.getVertex((WritableComparable)new LongWritable(4L)));
    }

    @Test
    public void testRepeatUntilBlockFinishCurrentLoop() throws Exception {
        final ObjectTransfer toQuit = new ObjectTransfer();
        final PrimitiveRefs.IntRef counter = new PrimitiveRefs.IntRef(5);
        Piece counterPiece = Pieces.masterCompute((String)"Count", (Consumer)new Consumer<BlockMasterApi>(){

            public void apply(BlockMasterApi input) {
                --counter.value;
                if (counter.value == 0) {
                    toQuit.apply((Object)true);
                }
            }
        });
        SequenceBlock innerBlock = new SequenceBlock(new Block[]{counterPiece, counterPiece, counterPiece, counterPiece});
        Block repeatBlock = RepeatUntilBlock.unlimited((Block)innerBlock, (Supplier)toQuit);
        LocalBlockRunner.runBlock(BlockExecutionTest.createTestGraph(), (Block)repeatBlock, (Object)new Object());
        Assert.assertEquals((long)-3L, (long)counter.value);
    }
}

