package org.apache.hugegraph.computer.core.compute;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.graph.partition.PartitionStat;
import org.apache.hugegraph.computer.core.manager.Managers;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.receiver.MessageRecvManager;
import org.apache.hugegraph.computer.core.receiver.MessageStat;
import org.apache.hugegraph.computer.core.sender.MessageSendManager;
import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator;
import org.apache.hugegraph.computer.core.store.entry.KvEntry;
import org.apache.hugegraph.computer.core.util.Consumers;
import org.apache.hugegraph.computer.core.worker.WorkerContext;
import org.apache.hugegraph.computer.core.worker.WorkerStat;
import org.apache.hugegraph.util.ExecutorUtil;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/computer/core/compute/ComputeManager.class */
public class ComputeManager {
    private static final Logger LOG = Log.logger(ComputeManager.class);
    private static final String PREFIX = "partition-compute-executor-%s";
    private final int workerId;
    private final ComputerContext context;
    private final Managers managers;
    private final Map<Integer, FileGraphPartition> partitions = new HashMap();
    private final MessageRecvManager recvManager;
    private final MessageSendManager sendManager;
    private final ExecutorService computeExecutor;

    public ComputeManager(int i, ComputerContext computerContext, Managers managers) {
        this.workerId = i;
        this.context = computerContext;
        this.managers = managers;
        this.recvManager = (MessageRecvManager) this.managers.get(MessageRecvManager.NAME);
        this.sendManager = (MessageSendManager) this.managers.get(MessageSendManager.NAME);
        int intValue = partitionComputeThreadNum(computerContext.config()).intValue();
        this.computeExecutor = ExecutorUtil.newFixedThreadPool(intValue, PREFIX);
        LOG.info("Created partition compute thread pool, thread num: {}", Integer.valueOf(intValue));
    }

    private Integer partitionComputeThreadNum(Config config) {
        return (Integer) config.get(ComputerOptions.PARTITIONS_COMPUTE_THREAD_NUMS);
    }

    public WorkerStat input() {
        WorkerStat workerStat = new WorkerStat(this.workerId);
        this.recvManager.waitReceivedAllMessages();
        Map<Integer, PeekableIterator<KvEntry>> vertexPartitions = this.recvManager.vertexPartitions();
        Map<Integer, PeekableIterator<KvEntry>> edgePartitions = this.recvManager.edgePartitions();
        for (Map.Entry<Integer, PeekableIterator<KvEntry>> entry : vertexPartitions.entrySet()) {
            int intValue = entry.getKey().intValue();
            PeekableIterator<KvEntry> value = entry.getValue();
            PeekableIterator<KvEntry> orDefault = edgePartitions.getOrDefault(Integer.valueOf(intValue), PeekableIterator.emptyIterator());
            FileGraphPartition fileGraphPartition = new FileGraphPartition(this.context, this.managers, intValue);
            PartitionStat partitionStat = null;
            ComputerException computerException = null;
            try {
                partitionStat = fileGraphPartition.input(value, orDefault);
                try {
                    value.close();
                    orDefault.close();
                } catch (Exception e) {
                    ComputerException computerException2 = new ComputerException("Failed to close vertex or edge file iterator", e);
                    if (0 == 0) {
                        throw computerException2;
                    }
                    computerException.addSuppressed(computerException2);
                }
            } catch (ComputerException e2) {
                try {
                    value.close();
                    orDefault.close();
                } catch (Exception e3) {
                    ComputerException computerException3 = new ComputerException("Failed to close vertex or edge file iterator", e3);
                    if (e2 == null) {
                        throw computerException3;
                    }
                    e2.addSuppressed(computerException3);
                }
                if (e2 != null) {
                    throw e2;
                }
            } catch (Throwable th) {
                try {
                    value.close();
                    orDefault.close();
                } catch (Exception e4) {
                    ComputerException computerException4 = new ComputerException("Failed to close vertex or edge file iterator", e4);
                    if (0 == 0) {
                        throw computerException4;
                    }
                    computerException.addSuppressed(computerException4);
                }
                if (0 != 0) {
                    throw null;
                }
                throw th;
            }
            if (0 != 0) {
                throw null;
            }
            workerStat.add(partitionStat);
            this.partitions.put(Integer.valueOf(intValue), fileGraphPartition);
        }
        return workerStat;
    }

    public void takeRecvedMessages() {
        Map<Integer, PeekableIterator<KvEntry>> messagePartitions = this.recvManager.messagePartitions();
        for (FileGraphPartition fileGraphPartition : this.partitions.values()) {
            fileGraphPartition.messages(messagePartitions.get(Integer.valueOf(fileGraphPartition.partition())));
        }
    }

    public WorkerStat compute(WorkerContext workerContext, int i) {
        this.sendManager.startSend(MessageType.MSG);
        WorkerStat workerStat = new WorkerStat(this.workerId);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Consumers consumers = new Consumers(this.computeExecutor, fileGraphPartition -> {
            PartitionStat compute = fileGraphPartition.compute(workerContext, i);
            concurrentHashMap.put(Integer.valueOf(compute.partitionId()), compute);
        });
        consumers.start("partition-compute");
        try {
            Iterator<FileGraphPartition> it = this.partitions.values().iterator();
            while (it.hasNext()) {
                consumers.provide(it.next());
            }
            consumers.await();
            this.sendManager.finishSend(MessageType.MSG);
            Map<Integer, MessageStat> messageStats = this.recvManager.messageStats();
            Iterator it2 = concurrentHashMap.entrySet().iterator();
            while (it2.hasNext()) {
                PartitionStat partitionStat = (PartitionStat) ((Map.Entry) it2.next()).getValue();
                int partitionId = partitionStat.partitionId();
                partitionStat.mergeSendMessageStat(this.sendManager.messageStat(partitionId));
                MessageStat messageStat = messageStats.get(Integer.valueOf(partitionId));
                if (messageStat != null) {
                    partitionStat.mergeRecvMessageStat(messageStat);
                }
                workerStat.add(partitionStat);
            }
            return workerStat;
        } catch (Throwable th) {
            throw new ComputerException("An exception occurred when partition parallel compute", th);
        }
    }

    public void output() {
        for (FileGraphPartition fileGraphPartition : this.partitions.values()) {
            LOG.info("Output partition {} complete, stat='{}'", Integer.valueOf(fileGraphPartition.partition()), fileGraphPartition.output());
        }
    }

    public void close() {
        this.computeExecutor.shutdown();
    }
}
