package org.apache.hugegraph.computer.core.network.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.apache.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.network.IOMode;
import org.apache.hugegraph.computer.core.network.MessageHandler;
import org.apache.hugegraph.computer.core.network.TransportConf;
import org.apache.hugegraph.computer.core.network.TransportServer;
import org.apache.hugegraph.computer.core.network.TransportUtil;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/computer/core/network/netty/NettyTransportServer.class */
public class NettyTransportServer implements TransportServer, Closeable {
    private static final Logger LOG = Log.logger(NettyTransportServer.class);
    private static final String BOSS_THREAD_GROUP_NAME = "transport-netty-server-boss";
    private static final String WORKER_THREAD_GROUP_NAME = "transport-netty-server-worker";
    private static final int BOSS_THREADS = 1;
    private final ByteBufAllocator bufAllocator;
    private TransportConf conf;
    private ServerBootstrap bootstrap;
    private ChannelFuture bindFuture;
    private InetSocketAddress bindAddress;

    /* loaded from: input_file:org/apache/hugegraph/computer/core/network/netty/NettyTransportServer$ServerChannelInitializer.class */
    private static class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
        private final MessageHandler handler;
        private final NettyProtocol protocol;

        public ServerChannelInitializer(NettyProtocol nettyProtocol, MessageHandler messageHandler) {
            this.handler = messageHandler;
            this.protocol = nettyProtocol;
        }

        public void initChannel(SocketChannel socketChannel) {
            this.protocol.initializeServerPipeline(socketChannel, this.handler);
        }
    }

    public NettyTransportServer() {
        this(BufAllocatorFactory.createBufAllocator());
    }

    public NettyTransportServer(ByteBufAllocator byteBufAllocator) {
        this.bufAllocator = byteBufAllocator;
    }

    @Override // org.apache.hugegraph.computer.core.network.TransportServer
    public synchronized int listen(Config config, MessageHandler messageHandler) {
        E.checkArgument(this.bindFuture == null, "The TransportServer has already been listened", new Object[0]);
        E.checkArgumentNotNull(messageHandler, "The serverHandler param can't be null", new Object[0]);
        long currentTimeMillis = System.currentTimeMillis();
        init(config);
        this.bootstrap.childHandler(new ServerChannelInitializer(new NettyProtocol(this.conf), messageHandler));
        this.bindFuture = this.bootstrap.bind().syncUninterruptibly();
        this.bindAddress = (InetSocketAddress) this.bindFuture.channel().localAddress();
        LOG.info("The TransportServer started on address {}, took {} ms", TransportUtil.formatAddress(this.bindAddress), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        return this.bindAddress.getPort();
    }

    private void init(Config config) {
        this.conf = TransportConf.wrapConfig(config);
        this.bootstrap = new ServerBootstrap();
        IOMode ioMode = this.conf.ioMode();
        this.bootstrap.group(NettyEventLoopUtil.createEventLoop(ioMode, 1, BOSS_THREAD_GROUP_NAME), NettyEventLoopUtil.createEventLoop(ioMode, this.conf.serverThreads(), WORKER_THREAD_GROUP_NAME));
        this.bootstrap.channel(NettyEventLoopUtil.serverChannelClass(ioMode));
        this.bootstrap.localAddress(this.conf.serverAddress(), this.conf.serverPort());
        this.bootstrap.option(ChannelOption.SO_REUSEADDR, true);
        this.bootstrap.option(ChannelOption.ALLOCATOR, this.bufAllocator);
        this.bootstrap.childOption(ChannelOption.ALLOCATOR, this.bufAllocator);
        this.bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        this.bootstrap.childOption(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(this.conf.tcpKeepAlive()));
        boolean epollLevelTriggered = this.conf.epollLevelTriggered();
        if (this.conf.recvBufferFileMode()) {
            epollLevelTriggered = true;
        }
        NettyEventLoopUtil.enableTriggeredMode(ioMode, epollLevelTriggered, this.bootstrap);
        if (this.conf.maxSynBacklog() > 0) {
            this.bootstrap.option(ChannelOption.SO_BACKLOG, Integer.valueOf(this.conf.maxSynBacklog()));
        }
        if (this.conf.sizeReceiveBuffer() > 0) {
            this.bootstrap.childOption(ChannelOption.SO_RCVBUF, Integer.valueOf(this.conf.sizeReceiveBuffer()));
        }
        if (this.conf.sizeSendBuffer() > 0) {
            this.bootstrap.childOption(ChannelOption.SO_SNDBUF, Integer.valueOf(this.conf.sizeSendBuffer()));
        }
    }

    public TransportConf conf() {
        return this.conf;
    }

    @Override // org.apache.hugegraph.computer.core.network.TransportServer
    public int port() {
        return bindAddress().getPort();
    }

    @Override // org.apache.hugegraph.computer.core.network.TransportServer
    public String ip() {
        InetAddress address = bindAddress().getAddress();
        if (address == null) {
            return null;
        }
        return address.getHostAddress();
    }

    @Override // org.apache.hugegraph.computer.core.network.TransportServer
    public InetSocketAddress bindAddress() {
        E.checkArgumentNotNull(this.bindAddress, "The TransportServer has not been initialized", new Object[0]);
        return this.bindAddress;
    }

    @Override // org.apache.hugegraph.computer.core.network.TransportServer
    public void shutdown() {
        try {
            close();
        } catch (IOException e) {
            throw new ComputerException("Failed to shutdown server", e);
        }
    }

    @Override // org.apache.hugegraph.computer.core.network.TransportServer
    public boolean bound() {
        return (this.bindFuture == null || this.bindFuture.channel() == null || !this.bindFuture.channel().isActive()) ? false : true;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.bindFuture != null) {
            this.bindFuture.channel().close().awaitUninterruptibly(this.conf.closeTimeout(), TimeUnit.MILLISECONDS);
            this.bindFuture = null;
        }
        if (this.bootstrap != null && this.bootstrap.config().group() != null) {
            this.bootstrap.config().group().shutdownGracefully();
        }
        if (this.bootstrap != null && this.bootstrap.config().childGroup() != null) {
            this.bootstrap.config().childGroup().shutdownGracefully();
        }
        this.bootstrap = null;
    }
}
