package org.apache.hugegraph.computer.core.master;

import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.List;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.hugegraph.computer.core.aggregator.Aggregator;
import org.apache.hugegraph.computer.core.aggregator.DefaultAggregator;
import org.apache.hugegraph.computer.core.aggregator.MasterAggrManager;
import org.apache.hugegraph.computer.core.bsp.Bsp4Master;
import org.apache.hugegraph.computer.core.combiner.Combiner;
import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.common.ContainerInfo;
import org.apache.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.graph.SuperstepStat;
import org.apache.hugegraph.computer.core.graph.value.Value;
import org.apache.hugegraph.computer.core.graph.value.ValueType;
import org.apache.hugegraph.computer.core.input.MasterInputManager;
import org.apache.hugegraph.computer.core.manager.Managers;
import org.apache.hugegraph.computer.core.network.TransportUtil;
import org.apache.hugegraph.computer.core.output.ComputerOutput;
import org.apache.hugegraph.computer.core.rpc.MasterRpcManager;
import org.apache.hugegraph.computer.core.util.ShutdownHook;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.apache.hugegraph.util.TimeUtil;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/computer/core/master/MasterService.class */
public class MasterService implements Closeable {
    private static final Logger LOG = Log.logger(MasterService.class);
    private volatile boolean inited;
    private Config config;
    private volatile Bsp4Master bsp4Master;
    private ContainerInfo masterInfo;
    private List<ContainerInfo> workers;
    private int maxSuperStep;
    private MasterComputation masterComputation;
    private volatile Thread serviceThread;
    private final ComputerContext context = ComputerContext.instance();
    private final Managers managers = new Managers();
    private volatile boolean closed = false;
    private volatile ShutdownHook shutdownHook = new ShutdownHook();

    /* loaded from: input_file:org/apache/hugegraph/computer/core/master/MasterService$DefaultMasterContext.class */
    private class DefaultMasterContext implements MasterContext {
        private final MasterAggrManager aggrManager;

        public DefaultMasterContext() {
            this.aggrManager = (MasterAggrManager) MasterService.this.managers.get(MasterAggrManager.NAME);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public <V extends Value, C extends Aggregator<V>> void registerAggregator(String str, Class<C> cls) {
            E.checkArgument(cls != null, "The aggregator class can't be null", new Object[0]);
            try {
                this.aggrManager.registerAggregator(str, cls.newInstance());
            } catch (Exception e) {
                throw new ComputerException("Can't new instance from class: %s", e, new Object[]{cls.getName()});
            }
        }

        public <V extends Value, C extends Combiner<V>> void registerAggregator(String str, ValueType valueType, Class<C> cls) {
            registerAggregator(str, valueType, cls, null);
        }

        public <V extends Value, C extends Combiner<V>> void registerAggregator(String str, V v, Class<C> cls) {
            E.checkArgument(v != null, "The aggregator default value can't be null: %s, or call another register method if necessary: registerAggregator(String name,ValueType type,Class<C> combiner)", new Object[]{str});
            registerAggregator(str, v.valueType(), cls, v);
        }

        private <V extends Value, C extends Combiner<V>> void registerAggregator(String str, ValueType valueType, Class<C> cls, V v) {
            this.aggrManager.registerAggregator(str, new DefaultAggregator(MasterService.this.context, valueType, cls, v));
        }

        public <V extends Value> void aggregatedValue(String str, V v) {
            this.aggrManager.aggregatedAggregator(str, v);
        }

        public <V extends Value> V aggregatedValue(String str) {
            return (V) this.aggrManager.aggregatedValue(str);
        }

        public Config config() {
            return MasterService.this.config;
        }
    }

    /* loaded from: input_file:org/apache/hugegraph/computer/core/master/MasterService$SuperstepContext.class */
    private class SuperstepContext extends DefaultMasterContext implements MasterComputationContext {
        private final int superstep;
        private final SuperstepStat superstepStat;

        public SuperstepContext(int i, SuperstepStat superstepStat) {
            super();
            this.superstep = i;
            this.superstepStat = superstepStat;
        }

        public long totalVertexCount() {
            return this.superstepStat.vertexCount();
        }

        public long totalEdgeCount() {
            return this.superstepStat.edgeCount();
        }

        public long finishedVertexCount() {
            return this.superstepStat.finishedVertexCount();
        }

        public long messageCount() {
            return this.superstepStat.messageSendCount();
        }

        public long messageBytes() {
            return this.superstepStat.messageSendBytes();
        }

        public int superstep() {
            return this.superstep;
        }
    }

    public void init(Config config) {
        E.checkArgument(!this.inited, "The %s has been initialized", new Object[]{this});
        LOG.info("{} Start to initialize master", this);
        this.serviceThread = Thread.currentThread();
        registerShutdownHook();
        this.config = config;
        this.maxSuperStep = ((Integer) this.config.get(ComputerOptions.BSP_MAX_SUPER_STEP)).intValue();
        InetSocketAddress initManagers = initManagers();
        this.masterInfo = new ContainerInfo(0, TransportUtil.host(initManagers), initManagers.getPort());
        this.bsp4Master = new Bsp4Master(this.config);
        this.bsp4Master.clean();
        this.masterComputation = (MasterComputation) this.config.createObject(ComputerOptions.MASTER_COMPUTATION_CLASS);
        this.masterComputation.init(new DefaultMasterContext());
        this.managers.initedAll(config);
        LOG.info("{} register MasterService", this);
        this.bsp4Master.masterInitDone(this.masterInfo);
        this.workers = this.bsp4Master.waitWorkersInitDone();
        LOG.info("{} waited all workers registered, workers count: {}", this, Integer.valueOf(this.workers.size()));
        LOG.info("{} MasterService initialized", this);
        this.inited = true;
    }

    private void stopServiceThread() {
        if (this.serviceThread == null) {
            return;
        }
        try {
            this.serviceThread.interrupt();
        } catch (Throwable th) {
        }
    }

    private void registerShutdownHook() {
        this.shutdownHook.hook(() -> {
            stopServiceThread();
            cleanAndCloseBsp();
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        checkInited();
        if (this.closed) {
            LOG.info("{} MasterService had closed before", this);
            return;
        }
        this.masterComputation.close(new DefaultMasterContext());
        this.bsp4Master.waitWorkersCloseDone();
        this.managers.closeAll(this.config);
        cleanAndCloseBsp();
        this.shutdownHook.unhook();
        this.closed = true;
        LOG.info("{} MasterService closed", this);
    }

    private void cleanAndCloseBsp() {
        if (this.bsp4Master == null) {
            return;
        }
        this.bsp4Master.clean();
        this.bsp4Master.close();
    }

    public void execute() {
        SuperstepStat superstepStat;
        StopWatch stopWatch = new StopWatch();
        checkInited();
        LOG.info("{} MasterService execute", this);
        int superstepToResume = superstepToResume();
        LOG.info("{} MasterService resume from superstep: {}", this, Integer.valueOf(superstepToResume));
        this.bsp4Master.masterResumeDone(superstepToResume);
        stopWatch.start();
        if (superstepToResume == -1) {
            superstepStat = inputstep();
            superstepToResume++;
        } else {
            superstepStat = null;
        }
        stopWatch.stop();
        LOG.info("{} MasterService input step cost: {}", this, TimeUtil.readableTime(stopWatch.getTime()));
        E.checkState(superstepToResume <= this.maxSuperStep, "The superstep {} can't be > maxSuperStep {}", new Object[]{Integer.valueOf(superstepToResume), Integer.valueOf(this.maxSuperStep)});
        stopWatch.reset();
        stopWatch.start();
        while (superstepStat.active()) {
            LOG.info("{} MasterService superstep {} started", this, Integer.valueOf(superstepToResume));
            this.bsp4Master.waitWorkersStepPrepareDone(superstepToResume);
            this.managers.beforeSuperstep(this.config, superstepToResume);
            this.bsp4Master.masterStepPrepareDone(superstepToResume);
            this.bsp4Master.waitWorkersStepComputeDone(superstepToResume);
            this.bsp4Master.masterStepComputeDone(superstepToResume);
            superstepStat = SuperstepStat.from(this.bsp4Master.waitWorkersStepDone(superstepToResume));
            SuperstepContext superstepContext = new SuperstepContext(superstepToResume, superstepStat);
            if (finishedIteration(this.masterComputation.compute(superstepContext), superstepContext)) {
                superstepStat.inactivate();
            }
            this.managers.afterSuperstep(this.config, superstepToResume);
            this.bsp4Master.masterStepDone(superstepToResume, superstepStat);
            LOG.info("{} MasterService superstep {} finished", this, Integer.valueOf(superstepToResume));
            superstepToResume++;
        }
        stopWatch.stop();
        LOG.info("{} MasterService compute step cost: {}", this, TimeUtil.readableTime(stopWatch.getTime()));
        stopWatch.reset();
        stopWatch.start();
        outputstep();
        stopWatch.stop();
        LOG.info("{} MasterService output step cost: {}", this, TimeUtil.readableTime(stopWatch.getTime()));
    }

    public String toString() {
        return String.format("[master %s]", this.masterInfo == null ? "?" + hashCode() : Integer.valueOf(this.masterInfo.id()));
    }

    private InetSocketAddress initManagers() {
        MasterInputManager masterInputManager = new MasterInputManager();
        this.managers.add(masterInputManager);
        MasterAggrManager masterAggrManager = new MasterAggrManager();
        this.managers.add(masterAggrManager);
        MasterRpcManager masterRpcManager = new MasterRpcManager();
        this.managers.add(masterRpcManager);
        this.managers.initAll(this.config);
        masterRpcManager.registerInputSplitService(masterInputManager.handler());
        masterRpcManager.registerAggregatorService(masterAggrManager.handler());
        InetSocketAddress start = masterRpcManager.start();
        LOG.info("{} MasterService started rpc server: {}", this, start);
        return start;
    }

    private void checkInited() {
        E.checkArgument(this.inited, "The %s has not been initialized", new Object[]{this});
    }

    private int superstepToResume() {
        return -1;
    }

    private boolean finishedIteration(boolean z, MasterComputationContext masterComputationContext) {
        if (z && masterComputationContext.superstep() < this.maxSuperStep - 1) {
            return masterComputationContext.messageCount() == 0 && masterComputationContext.totalVertexCount() - masterComputationContext.finishedVertexCount() == 0;
        }
        return true;
    }

    private SuperstepStat inputstep() {
        LOG.info("{} MasterService inputstep started", this);
        this.bsp4Master.waitWorkersInputDone();
        this.bsp4Master.masterInputDone();
        SuperstepStat from = SuperstepStat.from(this.bsp4Master.waitWorkersStepDone(-1));
        this.bsp4Master.masterStepDone(-1, from);
        LOG.info("{} MasterService inputstep finished with superstat {}", this, from);
        return from;
    }

    private void outputstep() {
        LOG.info("{} MasterService outputstep started", this);
        this.bsp4Master.waitWorkersOutputDone();
        ((ComputerOutput) this.config.createObject(ComputerOptions.OUTPUT_CLASS)).mergePartitions(this.config);
        LOG.info("{} MasterService outputstep finished", this);
    }
}
