package org.apache.hugegraph.computer.core.input;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.graph.vertex.Vertex;
import org.apache.hugegraph.computer.core.manager.Manager;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.rpc.InputSplitRpcService;
import org.apache.hugegraph.computer.core.sender.MessageSendManager;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.worker.load.LoadService;
import org.apache.hugegraph.util.ExecutorUtil;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/computer/core/input/WorkerInputManager.class */
public class WorkerInputManager implements Manager {
    private static final Logger LOG = Log.logger(WorkerInputManager.class);
    private static final String PREFIX = "input-send-executor-%s";
    public static final String NAME = "worker_input";
    private final LoadService loadService;
    private final ExecutorService sendExecutor;
    private final int sendThreadNum;
    private final MessageSendManager sendManager;
    private final SnapshotManager snapshotManager;

    public WorkerInputManager(ComputerContext computerContext, MessageSendManager messageSendManager, SnapshotManager snapshotManager) {
        this.sendManager = messageSendManager;
        this.snapshotManager = snapshotManager;
        this.sendThreadNum = inputSendThreadNum(computerContext.config()).intValue();
        this.sendExecutor = ExecutorUtil.newFixedThreadPool(this.sendThreadNum, PREFIX);
        LOG.info("Created parallel sending thread pool, thread num: {}", Integer.valueOf(this.sendThreadNum));
        this.loadService = new LoadService(computerContext);
    }

    private Integer inputSendThreadNum(Config config) {
        return (Integer) config.get(ComputerOptions.INPUT_SEND_THREAD_NUMS);
    }

    @Override // org.apache.hugegraph.computer.core.manager.Manager
    public String name() {
        return NAME;
    }

    @Override // org.apache.hugegraph.computer.core.manager.Manager
    public void init(Config config) {
        this.loadService.init();
        this.sendManager.init(config);
    }

    @Override // org.apache.hugegraph.computer.core.manager.Manager
    public void close(Config config) {
        this.loadService.close();
        this.sendManager.close(config);
        this.sendExecutor.shutdown();
    }

    public void service(InputSplitRpcService inputSplitRpcService) {
        this.loadService.rpcService(inputSplitRpcService);
    }

    public void loadGraph() {
        if (this.snapshotManager.loadSnapshot()) {
            this.snapshotManager.load();
            return;
        }
        ArrayList arrayList = new ArrayList();
        this.sendManager.startSend(MessageType.VERTEX);
        for (int i = 0; i < this.sendThreadNum; i++) {
            MessageSendManager messageSendManager = this.sendManager;
            Objects.requireNonNull(messageSendManager);
            Consumer<Vertex> consumer = messageSendManager::sendVertex;
            LoadService loadService = this.loadService;
            Objects.requireNonNull(loadService);
            arrayList.add(send(consumer, loadService::createIteratorFromVertex));
        }
        CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).exceptionally(th -> {
            throw new ComputerException("An exception occurred during parallel sending vertices", th);
        }).join();
        this.sendManager.finishSend(MessageType.VERTEX);
        this.sendManager.startSend(MessageType.EDGE);
        arrayList.clear();
        for (int i2 = 0; i2 < this.sendThreadNum; i2++) {
            MessageSendManager messageSendManager2 = this.sendManager;
            Objects.requireNonNull(messageSendManager2);
            Consumer<Vertex> consumer2 = messageSendManager2::sendEdge;
            LoadService loadService2 = this.loadService;
            Objects.requireNonNull(loadService2);
            arrayList.add(send(consumer2, loadService2::createIteratorFromEdge));
        }
        CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).exceptionally(th2 -> {
            throw new ComputerException("An exception occurred during parallel sending edges", th2);
        }).join();
        this.sendManager.finishSend(MessageType.EDGE);
        this.sendManager.clearBuffer();
    }

    private CompletableFuture<?> send(Consumer<Vertex> consumer, Supplier<Iterator<Vertex>> supplier) {
        return CompletableFuture.runAsync(() -> {
            Iterator it = (Iterator) supplier.get();
            while (it.hasNext()) {
                consumer.accept((Vertex) it.next());
            }
        }, this.sendExecutor);
    }
}
