package org.apache.dolphinscheduler.rpc.remote;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import org.apache.dolphinscheduler.rpc.client.RpcRequestCache;
import org.apache.dolphinscheduler.rpc.client.RpcRequestTable;
import org.apache.dolphinscheduler.rpc.codec.NettyDecoder;
import org.apache.dolphinscheduler.rpc.codec.NettyEncoder;
import org.apache.dolphinscheduler.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.rpc.future.RpcFuture;
import org.apache.dolphinscheduler.rpc.protocol.RpcProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/dolphinscheduler/rpc/remote/NettyClient.class */
public class NettyClient {
    private final Logger logger;
    private final EventLoopGroup workerGroup;
    private final NettyClientConfig clientConfig;
    private final Bootstrap bootstrap;
    private final AtomicBoolean isStarted;
    private final ConcurrentHashMap<Host, Channel> channels;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/dolphinscheduler/rpc/remote/NettyClient$NettyClientInner.class */
    private static class NettyClientInner {
        private static final NettyClient INSTANCE = new NettyClient(new NettyClientConfig());

        private NettyClientInner() {
        }
    }

    public static NettyClient getInstance() {
        return NettyClientInner.INSTANCE;
    }

    private Channel getChannel(Host host) {
        Channel channel = this.channels.get(host);
        return (channel == null || !channel.isActive()) ? createChannel(host, true) : channel;
    }

    public Channel createChannel(Host host, boolean z) {
        ChannelFuture connect;
        try {
            synchronized (this.bootstrap) {
                connect = this.bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort()));
            }
            if (z) {
                connect.sync();
            }
            if (!connect.isSuccess()) {
                return null;
            }
            Channel channel = connect.channel();
            this.channels.put(host, channel);
            return channel;
        } catch (Exception e) {
            this.logger.warn(String.format("connect to %s error", host), e);
            return null;
        }
    }

    private NettyClient(NettyClientConfig nettyClientConfig) {
        this.logger = LoggerFactory.getLogger(NettyClient.class);
        this.bootstrap = new Bootstrap();
        this.isStarted = new AtomicBoolean(false);
        this.channels = new ConcurrentHashMap<>(128);
        this.clientConfig = nettyClientConfig;
        if (NettyUtils.useEpoll()) {
            this.workerGroup = new EpollEventLoopGroup(nettyClientConfig.getWorkerThreads(), new ThreadFactory() { // from class: org.apache.dolphinscheduler.rpc.remote.NettyClient.1
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    return new Thread(runnable, String.format("NettyClient_%d", Integer.valueOf(this.threadIndex.incrementAndGet())));
                }
            });
        } else {
            this.workerGroup = new NioEventLoopGroup(nettyClientConfig.getWorkerThreads(), new ThreadFactory() { // from class: org.apache.dolphinscheduler.rpc.remote.NettyClient.2
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    return new Thread(runnable, String.format("NettyClient_%d", Integer.valueOf(this.threadIndex.incrementAndGet())));
                }
            });
        }
        start();
    }

    private void start() {
        this.bootstrap.group(this.workerGroup).channel(NettyUtils.getSocketChannelClass()).option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(this.clientConfig.isSoKeepalive())).option(ChannelOption.TCP_NODELAY, Boolean.valueOf(this.clientConfig.isTcpNoDelay())).option(ChannelOption.SO_SNDBUF, Integer.valueOf(this.clientConfig.getSendBufferSize())).option(ChannelOption.SO_RCVBUF, Integer.valueOf(this.clientConfig.getReceiveBufferSize())).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(this.clientConfig.getConnectTimeoutMillis())).handler(new LoggingHandler(LogLevel.DEBUG)).handler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.dolphinscheduler.rpc.remote.NettyClient.3
            public void initChannel(SocketChannel socketChannel) {
                socketChannel.pipeline().addLast(new ChannelHandler[]{new NettyEncoder()}).addLast(new ChannelHandler[]{new NettyDecoder(RpcResponse.class)}).addLast("client-idle-handler", new IdleStateHandler(6000L, 0L, 0L, TimeUnit.MILLISECONDS)).addLast(new ChannelHandler[]{new NettyClientHandler()});
            }
        });
        this.isStarted.compareAndSet(false, true);
    }

    public RpcResponse sendMsg(Host host, RpcProtocol<RpcRequest> rpcProtocol, Boolean bool) {
        Channel channel = getChannel(host);
        if (!$assertionsDisabled && channel == null) {
            throw new AssertionError();
        }
        RpcRequest body = rpcProtocol.getBody();
        RpcRequestCache rpcRequestCache = new RpcRequestCache();
        String str = body.getClassName() + body.getMethodName();
        rpcRequestCache.setServiceName(str);
        long requestId = rpcProtocol.getMsgHeader().getRequestId();
        RpcFuture rpcFuture = null;
        if (Boolean.FALSE.equals(bool)) {
            rpcFuture = new RpcFuture(body, requestId);
            rpcRequestCache.setRpcFuture(rpcFuture);
        }
        RpcRequestTable.put(rpcProtocol.getMsgHeader().getRequestId(), rpcRequestCache);
        channel.writeAndFlush(rpcProtocol);
        RpcResponse rpcResponse = null;
        if (Boolean.TRUE.equals(bool)) {
            RpcResponse rpcResponse2 = new RpcResponse();
            rpcResponse2.setStatus((byte) 0);
            rpcResponse2.setResult(true);
            return rpcResponse2;
        }
        try {
        } catch (InterruptedException e) {
            this.logger.error("send msg error，service name is {}", str, e);
            Thread.currentThread().interrupt();
        }
        if (!$assertionsDisabled && rpcFuture == null) {
            throw new AssertionError();
        }
        rpcResponse = rpcFuture.get2();
        return rpcResponse;
    }

    public void close() {
        if (this.isStarted.compareAndSet(true, false)) {
            try {
                closeChannels();
                if (this.workerGroup != null) {
                    this.workerGroup.shutdownGracefully();
                }
            } catch (Exception e) {
                this.logger.error("netty client close exception", e);
            }
            this.logger.info("netty client closed");
        }
    }

    private void closeChannels() {
        Iterator<Channel> it = this.channels.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.channels.clear();
    }

    static {
        $assertionsDisabled = !NettyClient.class.desiredAssertionStatus();
    }
}
