package org.apache.hugegraph.computer.core.worker;

import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hugegraph.computer.core.aggregator.Aggregator;
import org.apache.hugegraph.computer.core.aggregator.WorkerAggrManager;
import org.apache.hugegraph.computer.core.bsp.Bsp4Worker;
import org.apache.hugegraph.computer.core.combiner.Combiner;
import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.common.ContainerInfo;
import org.apache.hugegraph.computer.core.compute.ComputeManager;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.graph.SuperstepStat;
import org.apache.hugegraph.computer.core.graph.edge.Edge;
import org.apache.hugegraph.computer.core.graph.id.Id;
import org.apache.hugegraph.computer.core.graph.value.Value;
import org.apache.hugegraph.computer.core.graph.vertex.Vertex;
import org.apache.hugegraph.computer.core.input.WorkerInputManager;
import org.apache.hugegraph.computer.core.manager.Managers;
import org.apache.hugegraph.computer.core.network.DataClientManager;
import org.apache.hugegraph.computer.core.network.DataServerManager;
import org.apache.hugegraph.computer.core.network.connection.TransportConnectionManager;
import org.apache.hugegraph.computer.core.receiver.MessageRecvManager;
import org.apache.hugegraph.computer.core.rpc.WorkerRpcManager;
import org.apache.hugegraph.computer.core.sender.MessageSendManager;
import org.apache.hugegraph.computer.core.sort.sorting.RecvSortManager;
import org.apache.hugegraph.computer.core.sort.sorting.SendSortManager;
import org.apache.hugegraph.computer.core.store.FileManager;
import org.apache.hugegraph.computer.core.util.ShutdownHook;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/computer/core/worker/WorkerService.class */
public class WorkerService implements Closeable {
    private static final Logger LOG = Log.logger(WorkerService.class);
    private Config config;
    private Bsp4Worker bsp4Worker;
    private ComputeManager computeManager;
    private ContainerInfo workerInfo;
    private Combiner<Value> combiner;
    private ContainerInfo masterInfo;
    private volatile Thread serviceThread;
    private final ComputerContext context = ComputerContext.instance();
    private final Managers managers = new Managers();
    private final Map<Integer, ContainerInfo> workers = new HashMap();
    private volatile boolean inited = false;
    private volatile boolean closed = false;
    private volatile ShutdownHook shutdownHook = new ShutdownHook();

    /* loaded from: input_file:org/apache/hugegraph/computer/core/worker/WorkerService$SuperstepContext.class */
    private class SuperstepContext implements WorkerContext {
        private final int superstep;
        private final SuperstepStat superstepStat;
        private final WorkerAggrManager aggrManager;
        private final MessageSendManager sendManager;

        private SuperstepContext(int i, SuperstepStat superstepStat) {
            this.superstep = i;
            this.superstepStat = superstepStat;
            this.aggrManager = (WorkerAggrManager) WorkerService.this.managers.get(WorkerAggrManager.NAME);
            this.sendManager = (MessageSendManager) WorkerService.this.managers.get(MessageSendManager.NAME);
        }

        public Config config() {
            return WorkerService.this.config;
        }

        public <V extends Value> Aggregator<V> createAggregator(String str) {
            return this.aggrManager.createAggregator(str);
        }

        public <V extends Value> void aggregateValue(String str, V v) {
            this.aggrManager.aggregateValue(str, v);
        }

        public <V extends Value> V aggregatedValue(String str) {
            return (V) this.aggrManager.aggregatedValue(str);
        }

        public void sendMessage(Id id, Value value) {
            this.sendManager.sendMessage(id, value);
        }

        public void sendMessageToAllEdges(Vertex vertex, Value value) {
            Iterator it = vertex.edges().iterator();
            while (it.hasNext()) {
                sendMessage(((Edge) it.next()).targetId(), value);
            }
        }

        public long totalVertexCount() {
            return this.superstepStat.vertexCount();
        }

        public long totalEdgeCount() {
            return this.superstepStat.edgeCount();
        }

        public int superstep() {
            return this.superstep;
        }

        public <V extends Value> Combiner<V> combiner() {
            return (Combiner<V>) WorkerService.this.combiner;
        }
    }

    public void init(Config config) {
        E.checkArgument(!this.inited, "The %s has been initialized", new Object[]{this});
        this.serviceThread = Thread.currentThread();
        registerShutdownHook();
        this.config = config;
        this.workerInfo = new ContainerInfo();
        LOG.info("{} Start to initialize worker", this);
        this.bsp4Worker = new Bsp4Worker(this.config, this.workerInfo);
        this.masterInfo = this.bsp4Worker.waitMasterInitDone();
        this.workerInfo.updateAddress(initManagers(this.masterInfo));
        Computation computation = (Computation) this.config.createObject(ComputerOptions.WORKER_COMPUTATION_CLASS);
        LOG.info("Loading computation '{}' in category '{}'", computation.name(), computation.category());
        this.combiner = (Combiner) this.config.createObject(ComputerOptions.WORKER_COMBINER_CLASS, false);
        if (this.combiner == null) {
            LOG.info("None combiner is provided for computation '{}'", computation.name());
        } else {
            LOG.info("Combiner '{}' is provided for computation '{}'", this.combiner.name(), computation.name());
        }
        LOG.info("{} register WorkerService", this);
        this.bsp4Worker.workerInitDone();
        List<ContainerInfo> waitMasterAllInitDone = this.bsp4Worker.waitMasterAllInitDone();
        DataClientManager dataClientManager = (DataClientManager) this.managers.get(DataClientManager.NAME);
        for (ContainerInfo containerInfo : waitMasterAllInitDone) {
            this.workers.put(Integer.valueOf(containerInfo.id()), containerInfo);
            dataClientManager.connect(containerInfo.id(), containerInfo.hostname(), containerInfo.dataPort());
        }
        this.computeManager = new ComputeManager(this.context, this.managers);
        this.managers.initedAll(this.config);
        LOG.info("{} WorkerService initialized", this);
        this.inited = true;
    }

    private void registerShutdownHook() {
        this.shutdownHook.hook(() -> {
            stopServiceThread();
            cleanAndCloseBsp();
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        checkInited();
        if (this.closed) {
            LOG.info("{} WorkerService had closed before", this);
            return;
        }
        this.computeManager.close();
        this.managers.closeAll(this.config);
        this.bsp4Worker.workerCloseDone();
        this.bsp4Worker.close();
        this.shutdownHook.unhook();
        this.closed = true;
        LOG.info("{} WorkerService closed", this);
    }

    private void stopServiceThread() {
        if (this.serviceThread == null) {
            return;
        }
        try {
            this.serviceThread.interrupt();
        } catch (Throwable th) {
        }
    }

    private void cleanAndCloseBsp() {
        if (this.bsp4Worker == null) {
            return;
        }
        this.bsp4Worker.clean();
        this.bsp4Worker.close();
    }

    public void execute() {
        SuperstepStat superstepStat;
        checkInited();
        LOG.info("{} WorkerService execute", this);
        int waitMasterResumeDone = this.bsp4Worker.waitMasterResumeDone();
        if (waitMasterResumeDone == -1) {
            superstepStat = inputstep();
            waitMasterResumeDone++;
        } else {
            superstepStat = null;
        }
        while (superstepStat.active()) {
            SuperstepContext superstepContext = new SuperstepContext(waitMasterResumeDone, superstepStat);
            LOG.info("Start computation of superstep {}", Integer.valueOf(waitMasterResumeDone));
            if (waitMasterResumeDone > 0) {
                this.computeManager.takeRecvedMessages();
            }
            this.managers.beforeSuperstep(this.config, waitMasterResumeDone);
            this.bsp4Worker.workerStepPrepareDone(waitMasterResumeDone);
            this.bsp4Worker.waitMasterStepPrepareDone(waitMasterResumeDone);
            WorkerStat compute = this.computeManager.compute(superstepContext, waitMasterResumeDone);
            this.bsp4Worker.workerStepComputeDone(waitMasterResumeDone);
            this.bsp4Worker.waitMasterStepComputeDone(waitMasterResumeDone);
            this.managers.afterSuperstep(this.config, waitMasterResumeDone);
            this.bsp4Worker.workerStepDone(waitMasterResumeDone, compute);
            LOG.info("End computation of superstep {}", Integer.valueOf(waitMasterResumeDone));
            superstepStat = this.bsp4Worker.waitMasterStepDone(waitMasterResumeDone);
            waitMasterResumeDone++;
        }
        outputstep();
    }

    public String toString() {
        return String.format("[worker %s]", this.workerInfo == null ? "?" + hashCode() : Integer.valueOf(this.workerInfo.id()));
    }

    private InetSocketAddress initManagers(ContainerInfo containerInfo) {
        WorkerRpcManager workerRpcManager = new WorkerRpcManager();
        this.managers.add(workerRpcManager);
        WorkerRpcManager.updateRpcRemoteServerConfig(this.config, containerInfo.hostname(), containerInfo.rpcPort());
        workerRpcManager.init(this.config);
        WorkerAggrManager workerAggrManager = new WorkerAggrManager(this.context);
        workerAggrManager.service(workerRpcManager.aggregateRpcService());
        this.managers.add(workerAggrManager);
        FileManager fileManager = new FileManager();
        this.managers.add(fileManager);
        RecvSortManager recvSortManager = new RecvSortManager(this.context);
        this.managers.add(recvSortManager);
        MessageRecvManager messageRecvManager = new MessageRecvManager(this.context, fileManager, recvSortManager);
        this.managers.add(messageRecvManager);
        TransportConnectionManager transportConnectionManager = new TransportConnectionManager();
        DataServerManager dataServerManager = new DataServerManager(transportConnectionManager, messageRecvManager);
        this.managers.add(dataServerManager);
        DataClientManager dataClientManager = new DataClientManager(transportConnectionManager, this.context);
        this.managers.add(dataClientManager);
        SendSortManager sendSortManager = new SendSortManager(this.context);
        this.managers.add(sendSortManager);
        MessageSendManager messageSendManager = new MessageSendManager(this.context, sendSortManager, dataClientManager.sender());
        this.managers.add(messageSendManager);
        WorkerInputManager workerInputManager = new WorkerInputManager(this.context, messageSendManager);
        workerInputManager.service(workerRpcManager.inputSplitService());
        this.managers.add(workerInputManager);
        this.managers.initAll(this.config);
        InetSocketAddress address = dataServerManager.address();
        LOG.info("{} WorkerService initialized managers with data server address '{}'", this, address);
        return address;
    }

    private void checkInited() {
        E.checkArgument(this.inited, "The %s has not been initialized", new Object[]{this});
    }

    private SuperstepStat inputstep() {
        LOG.info("{} WorkerService inputstep started", this);
        WorkerInputManager workerInputManager = (WorkerInputManager) this.managers.get(WorkerInputManager.NAME);
        workerInputManager.loadGraph();
        this.bsp4Worker.workerInputDone();
        this.bsp4Worker.waitMasterInputDone();
        this.bsp4Worker.workerStepDone(-1, this.computeManager.input());
        SuperstepStat waitMasterStepDone = this.bsp4Worker.waitMasterStepDone(-1);
        workerInputManager.close(this.config);
        LOG.info("{} WorkerService inputstep finished", this);
        return waitMasterStepDone;
    }

    private void outputstep() {
        this.computeManager.output();
        this.bsp4Worker.workerOutputDone();
        LOG.info("{} WorkerService outputstep finished", this);
    }
}
