package org.apache.hugegraph.computer.core.network;

import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.computer.core.common.exception.TransportException;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.manager.Manager;
import org.apache.hugegraph.computer.core.network.connection.ConnectionManager;
import org.apache.hugegraph.computer.core.sender.QueuedMessageSender;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/computer/core/network/DataClientManager.class */
public class DataClientManager implements Manager {
    public static final Logger LOG = Log.logger(DataClientManager.class);
    public static final String NAME = "data_client";
    private final ConnectionManager connManager;
    private final QueuedMessageSender sender;

    /* loaded from: input_file:org/apache/hugegraph/computer/core/network/DataClientManager$DataClientHandler.class */
    private class DataClientHandler implements ClientHandler {
        private final Runnable notBusyNotifier;

        public DataClientHandler(Runnable runnable) {
            E.checkNotNull(runnable, "The not-busy notifier can't be null");
            this.notBusyNotifier = runnable;
        }

        @Override // org.apache.hugegraph.computer.core.network.ClientHandler
        public void sendAvailable(ConnectionId connectionId) {
            DataClientManager.LOG.debug("Channel for connectionId {} is available", connectionId);
            this.notBusyNotifier.run();
        }

        @Override // org.apache.hugegraph.computer.core.network.TransportHandler
        public void onChannelActive(ConnectionId connectionId) {
            DataClientManager.LOG.debug("Channel for connectionId {} is active", connectionId);
        }

        @Override // org.apache.hugegraph.computer.core.network.TransportHandler
        public void onChannelInactive(ConnectionId connectionId) {
            DataClientManager.LOG.debug("Channel for connectionId {} is inactive", connectionId);
        }

        @Override // org.apache.hugegraph.computer.core.network.TransportHandler
        public void exceptionCaught(TransportException transportException, ConnectionId connectionId) {
            DataClientManager.LOG.error("Channel for connectionId {} occurred exception", connectionId, transportException);
            DataClientManager.this.connManager.closeClient(connectionId);
        }
    }

    public DataClientManager(ConnectionManager connectionManager, ComputerContext computerContext) {
        this.connManager = connectionManager;
        this.sender = new QueuedMessageSender(computerContext.config());
    }

    public QueuedMessageSender sender() {
        return this.sender;
    }

    @Override // org.apache.hugegraph.computer.core.manager.Manager
    public String name() {
        return NAME;
    }

    @Override // org.apache.hugegraph.computer.core.manager.Manager
    public void init(Config config) {
        this.connManager.initClientManager(config, new DataClientHandler(this.sender.notBusyNotifier()));
        LOG.info("DataClientManager inited");
    }

    @Override // org.apache.hugegraph.computer.core.manager.Manager
    public void inited(Config config) {
        this.sender.init();
    }

    @Override // org.apache.hugegraph.computer.core.manager.Manager
    public void close(Config config) {
        try {
            this.sender.close();
            LOG.info("DataClientManager closed");
        } finally {
            this.connManager.shutdownClients();
        }
    }

    public void connect(int i, String str, int i2) {
        try {
            TransportClient orCreateClient = this.connManager.getOrCreateClient(str, i2);
            LOG.info("Successfully connect to worker: {}({}:{})", new Object[]{Integer.valueOf(i), str, Integer.valueOf(i2)});
            this.sender.addWorkerClient(i, orCreateClient);
        } catch (TransportException e) {
            throw new ComputerException("Failed to connect to worker: %s(%s:%s)", new Object[]{Integer.valueOf(i), str, Integer.valueOf(i2)});
        }
    }
}
