package org.apache.giraph.block_app.framework.api.local;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.giraph.block_app.framework.BlockFactory;
import org.apache.giraph.block_app.framework.BlockUtils;
import org.apache.giraph.block_app.framework.block.Block;
import org.apache.giraph.block_app.framework.internal.BlockMasterLogic;
import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
import org.apache.giraph.block_app.framework.internal.BlockWorkerLogic;
import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
import org.apache.giraph.conf.BooleanConfOption;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.IntConfOption;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.SimpleVertexWriter;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.utils.InternalVertexRunner;
import org.apache.giraph.utils.TestGraph;
import org.apache.giraph.utils.Trimmable;
import org.apache.giraph.utils.WritableUtils;
import org.apache.giraph.writable.kryo.KryoWritableWrapper;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;

/* loaded from: input_file:org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.class */
public class LocalBlockRunner {
    public static final IntConfOption NUM_THREADS = new IntConfOption("test.LocalBlockRunner.NUM_THREADS", 3, "");
    public static final BooleanConfOption RUN_ALL_CHECKS = new BooleanConfOption("test.LocalBlockRunner.RUN_ALL_CHECKS", true, "");
    public static final BooleanConfOption SERIALIZE_MASTER = new BooleanConfOption("test.LocalBlockRunner.SERIALIZE_MASTER", false, "");

    private LocalBlockRunner() {
    }

    public static <I extends WritableComparable, V extends Writable, E extends Writable> TestGraph<I, V, E> runApp(TestGraph<I, V, E> testGraph, boolean z) throws Exception {
        if (z) {
            return InternalVertexRunner.runWithInMemoryOutput(testGraph.getConf(), testGraph);
        }
        runApp(testGraph);
        return testGraph;
    }

    public static <I extends WritableComparable, V extends Writable, E extends Writable> void runApp(TestGraph<I, V, E> testGraph) {
        runAppWithVertexOutput(testGraph, noOpVertexSaver());
    }

    public static <I extends WritableComparable, V extends Writable, E extends Writable> void runBlock(TestGraph<I, V, E> testGraph, Block block, Object obj) {
        runBlockWithVertexOutput(block, obj, testGraph, noOpVertexSaver());
    }

    public static <I extends WritableComparable, V extends Writable, E extends Writable> void runAppWithVertexOutput(TestGraph<I, V, E> testGraph, SimpleVertexWriter<I, V, E> simpleVertexWriter) {
        BlockFactory createBlockFactory = BlockUtils.createBlockFactory(testGraph.getConf());
        runBlockWithVertexOutput(createBlockFactory.createBlock(testGraph.getConf()), createBlockFactory.createExecutionStage(testGraph.getConf()), testGraph, simpleVertexWriter);
    }

    public static <I extends WritableComparable, V extends Writable, E extends Writable> void runBlockWithVertexOutput(Block block, Object obj, TestGraph<I, V, E> testGraph, final SimpleVertexWriter<I, V, E> simpleVertexWriter) {
        Preconditions.checkNotNull(block);
        Preconditions.checkNotNull(testGraph);
        ImmutableClassesGiraphConfiguration<I, V, E> conf = testGraph.getConf();
        int i = NUM_THREADS.get(conf);
        boolean z = RUN_ALL_CHECKS.get(conf);
        boolean z2 = SERIALIZE_MASTER.get(conf);
        final boolean doOutputDuringComputation = conf.doOutputDuringComputation();
        final InternalApi internalApi = new InternalApi(testGraph, conf, i, z);
        final InternalApi<I, V, E>.InternalWorkerApi workerApi = internalApi.getWorkerApi();
        BlockUtils.checkBlockTypes(block, obj, conf);
        BlockMasterLogic blockMasterLogic = new BlockMasterLogic();
        blockMasterLogic.initialize(block, obj, internalApi);
        BlockWorkerContextLogic workerContextLogic = internalApi.getWorkerContextLogic();
        workerContextLogic.preApplication(workerApi, new BlockOutputHandle("", conf, new Progressable() { // from class: org.apache.giraph.block_app.framework.api.local.LocalBlockRunner.1
            public void progress() {
            }
        }));
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(i);
        if (z) {
            Iterator<Vertex<I, V, E>> it2 = testGraph.iterator();
            while (it2.hasNext()) {
                Vertex<I, V, E> next = it2.next();
                V createVertexValue = conf.createVertexValue();
                WritableUtils.copyInto(next.getValue(), createVertexValue);
                next.setValue(createVertexValue);
                next.setEdges((Iterable) WritableUtils.createCopy(next.getEdges(), conf.getOutEdgesClass(), conf));
            }
        }
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        int i2 = 0;
        while (true) {
            if (z2) {
                blockMasterLogic = (BlockMasterLogic) ((KryoWritableWrapper) WritableUtils.createCopy(new KryoWritableWrapper(blockMasterLogic), (Class<? extends KryoWritableWrapper>) KryoWritableWrapper.class, conf)).get();
                blockMasterLogic.initializeAfterRead(internalApi);
            }
            if (!atomicBoolean.get()) {
                break;
            }
            final BlockWorkerPieces computeNext = blockMasterLogic.computeNext(i2);
            if (computeNext == null) {
                if (!conf.doOutputDuringComputation()) {
                    Iterator<Partition<I, V, E>> it3 = internalApi.getPartitions().iterator();
                    while (it3.hasNext()) {
                        Iterator<Vertex<I, V, E>> it4 = it3.next().iterator();
                        while (it4.hasNext()) {
                            try {
                                simpleVertexWriter.writeVertex((Vertex) it4.next());
                            } catch (IOException | InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
                Preconditions.checkState(0 == newFixedThreadPool.shutdownNow().size(), "Some work still left to be done?");
            } else {
                internalApi.afterMasterBeforeWorker(computeNext);
                List<Partition<I, V, E>> partitions = internalApi.getPartitions();
                workerContextLogic.preSuperstep(workerApi, workerApi, (BlockWorkerPieces) KryoWritableWrapper.wrapAndCopy(computeNext), i2, internalApi.takeWorkerMessages());
                final CountDownLatch countDownLatch = new CountDownLatch(i);
                final AtomicReference atomicReference = new AtomicReference();
                atomicBoolean.set(false);
                for (final Partition<I, V, E> partition : partitions) {
                    newFixedThreadPool.execute(new Runnable() { // from class: org.apache.giraph.block_app.framework.api.local.LocalBlockRunner.2
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                boolean z3 = false;
                                BlockWorkerLogic blockWorkerLogic = new BlockWorkerLogic((BlockWorkerPieces) KryoWritableWrapper.wrapAndCopy(BlockWorkerPieces.this));
                                blockWorkerLogic.preSuperstep(workerApi, workerApi);
                                for (Vertex vertex : partition) {
                                    Iterable takeMessages = internalApi.takeMessages(vertex.getId());
                                    if (vertex.isHalted() && !Iterables.isEmpty(takeMessages)) {
                                        vertex.wakeUp();
                                    }
                                    if (!vertex.isHalted()) {
                                        blockWorkerLogic.compute(vertex, takeMessages);
                                        vertex.unwrapMutableEdges();
                                        if (vertex instanceof Trimmable) {
                                            ((Trimmable) vertex).trim();
                                        }
                                        if (doOutputDuringComputation) {
                                            simpleVertexWriter.writeVertex(vertex);
                                        }
                                        partition.saveVertex(vertex);
                                    }
                                    if (!vertex.isHalted()) {
                                        z3 = true;
                                    }
                                }
                                if (z3) {
                                    atomicBoolean.set(true);
                                }
                                blockWorkerLogic.postSuperstep();
                            } catch (Throwable th) {
                                th.printStackTrace();
                                atomicReference.set(th);
                            }
                            countDownLatch.countDown();
                        }
                    });
                }
                try {
                    countDownLatch.await();
                    if (atomicReference.get() != null) {
                        throw new RuntimeException("Worker failed", (Throwable) atomicReference.get());
                    }
                    workerContextLogic.postSuperstep();
                    internalApi.afterWorkerBeforeMaster();
                    i2++;
                } catch (InterruptedException e2) {
                    throw new RuntimeException("Thread intentionally interrupted", e2);
                }
            }
        }
        workerContextLogic.postApplication();
        internalApi.postApplication();
    }

    private static <I extends WritableComparable, E extends Writable, V extends Writable> SimpleVertexWriter<I, V, E> noOpVertexSaver() {
        return (SimpleVertexWriter<I, V, E>) new SimpleVertexWriter<I, V, E>() { // from class: org.apache.giraph.block_app.framework.api.local.LocalBlockRunner.3
            @Override // org.apache.giraph.io.SimpleVertexWriter
            public void writeVertex(Vertex<I, V, E> vertex) throws IOException, InterruptedException {
            }
        };
    }
}
