package org.apache.inlong.tubemq.corerpc.netty;

import com.google.protobuf.Message;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.protobuf.generated.RPCProtos;
import org.apache.inlong.tubemq.corebase.utils.AddressUtils;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.corerpc.RequestWrapper;
import org.apache.inlong.tubemq.corerpc.RpcConfig;
import org.apache.inlong.tubemq.corerpc.RpcConstants;
import org.apache.inlong.tubemq.corerpc.RpcDataPack;
import org.apache.inlong.tubemq.corerpc.codec.PbEnDecoder;
import org.apache.inlong.tubemq.corerpc.exception.ServerNotReadyException;
import org.apache.inlong.tubemq.corerpc.protocol.Protocol;
import org.apache.inlong.tubemq.corerpc.protocol.ProtocolFactory;
import org.apache.inlong.tubemq.corerpc.server.ServiceRpcServer;
import org.apache.inlong.tubemq.corerpc.utils.MixUtils;
import org.apache.inlong.tubemq.corerpc.utils.TSSLEngineUtil;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.DefaultChannelPipeline;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/tubemq/corerpc/netty/NettyRpcServer.class */
public class NettyRpcServer implements ServiceRpcServer {
    private static final Logger logger = LoggerFactory.getLogger(NettyRpcServer.class);
    private static final ConcurrentHashMap<String, AtomicLong> errParseAddrMap = new ConcurrentHashMap<>();
    private static AtomicLong lastParseTime = new AtomicLong(0);
    private ServerBootstrap bootstrap;
    private int protocolType;
    private boolean isOverTLS;
    private String keyStorePath;
    private String keyStorePassword;
    private boolean needTwoWayAuthentic;
    private String trustStorePath;
    private String trustStorePassword;
    private final ConcurrentHashMap<Integer, Protocol> protocols = new ConcurrentHashMap<>();
    private NioServerSocketChannelFactory channelFactory = null;
    private AtomicBoolean started = new AtomicBoolean(false);

    /* loaded from: input_file:org/apache/inlong/tubemq/corerpc/netty/NettyRpcServer$NettyServerHandler.class */
    private class NettyServerHandler extends SimpleChannelUpstreamHandler {
        private int protocolType;

        public NettyServerHandler(int i) {
            this.protocolType = 10;
            this.protocolType = i;
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) throws Exception {
            if (exceptionEvent.getCause() instanceof IOException) {
                return;
            }
            NettyRpcServer.logger.error("catch some exception not IOException", exceptionEvent.getCause());
        }

        public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
            if (messageEvent.getMessage() instanceof RpcDataPack) {
                RpcDataPack rpcDataPack = (RpcDataPack) messageEvent.getMessage();
                Channel channel = channelHandlerContext.getChannel();
                if (channel == null) {
                    return;
                }
                String remoteAddressIP = AddressUtils.getRemoteAddressIP(channel);
                try {
                    if (!NettyRpcServer.this.isServiceStarted()) {
                        throw new ServerNotReadyException("RpcServer is not running yet");
                    }
                    ByteBufferInputStream byteBufferInputStream = new ByteBufferInputStream(rpcDataPack.getDataLst());
                    RPCProtos.RpcConnHeader parseDelimitedFrom = RPCProtos.RpcConnHeader.parseDelimitedFrom(byteBufferInputStream);
                    RPCProtos.RequestHeader parseDelimitedFrom2 = RPCProtos.RequestHeader.parseDelimitedFrom(byteBufferInputStream);
                    int protocolVer = parseDelimitedFrom2.getProtocolVer();
                    RPCProtos.RequestBody parseDelimitedFrom3 = RPCProtos.RequestBody.parseDelimitedFrom(byteBufferInputStream);
                    try {
                        RequestWrapper requestWrapper = new RequestWrapper(parseDelimitedFrom2.getServiceType(), this.protocolType, parseDelimitedFrom2.getProtocolVer(), parseDelimitedFrom.getFlag(), parseDelimitedFrom3.getTimeout());
                        requestWrapper.setMethodId(parseDelimitedFrom3.getMethod());
                        requestWrapper.setRequestData(PbEnDecoder.pbDecode(true, parseDelimitedFrom3.getMethod(), parseDelimitedFrom3.getRequest().toByteArray()));
                        requestWrapper.setSerialNo(rpcDataPack.getSerialNo());
                        ((Protocol) NettyRpcServer.this.protocols.get(Integer.valueOf(this.protocolType))).handleRequest(new NettyRequestContext(requestWrapper, channelHandlerContext, System.currentTimeMillis()), remoteAddressIP);
                    } catch (Throwable th) {
                        List<ByteBuffer> prepareResponse = prepareResponse(null, protocolVer, RPCProtos.ResponseHeader.Status.FATAL, th.getClass().getName(), new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE).append("IPC server handle request error :").append(th.getMessage()).toString());
                        if (prepareResponse != null) {
                            rpcDataPack.setDataLst(prepareResponse);
                            channelHandlerContext.getChannel().write(rpcDataPack);
                        }
                    }
                } catch (Throwable th2) {
                    if (!(th2 instanceof ServerNotReadyException) && remoteAddressIP != null) {
                        AtomicLong atomicLong = (AtomicLong) NettyRpcServer.errParseAddrMap.get(remoteAddressIP);
                        if (atomicLong == null) {
                            AtomicLong atomicLong2 = new AtomicLong(0L);
                            atomicLong = (AtomicLong) NettyRpcServer.errParseAddrMap.putIfAbsent(remoteAddressIP, atomicLong2);
                            if (atomicLong == null) {
                                atomicLong = atomicLong2;
                            }
                        }
                        atomicLong.incrementAndGet();
                        long j = NettyRpcServer.lastParseTime.get();
                        if (System.currentTimeMillis() - j > TBaseConstants.CFG_DEF_META_FORCE_UPDATE_PERIOD && NettyRpcServer.lastParseTime.compareAndSet(j, System.currentTimeMillis())) {
                            NettyRpcServer.logger.warn(new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE).append("[Abnormal Visit] Abnormal Message Content visit list is :").append(NettyRpcServer.errParseAddrMap).toString());
                            NettyRpcServer.errParseAddrMap.clear();
                        }
                    }
                    List<ByteBuffer> prepareResponse2 = prepareResponse(null, 3, RPCProtos.ResponseHeader.Status.FATAL, th2.getClass().getName(), new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE).append("IPC server unable to read call parameters:").append(th2.getMessage()).toString());
                    if (prepareResponse2 != null) {
                        rpcDataPack.setDataLst(prepareResponse2);
                        channel.write(rpcDataPack);
                    }
                }
            }
        }

        protected List<ByteBuffer> prepareResponse(Object obj, int i, RPCProtos.ResponseHeader.Status status, String str, String str2) {
            ByteBufferOutputStream byteBufferOutputStream = new ByteBufferOutputStream();
            DataOutputStream dataOutputStream = new DataOutputStream(byteBufferOutputStream);
            String replaceClassNamePrefix = MixUtils.replaceClassNamePrefix(str, true, i);
            try {
                RPCProtos.RpcConnHeader.Builder newBuilder = RPCProtos.RpcConnHeader.newBuilder();
                newBuilder.setFlag(1);
                newBuilder.build().writeDelimitedTo(dataOutputStream);
                RPCProtos.ResponseHeader.Builder newBuilder2 = RPCProtos.ResponseHeader.newBuilder();
                newBuilder2.setStatus(status);
                newBuilder2.build().writeDelimitedTo(dataOutputStream);
                if (str2 != null) {
                    RPCProtos.RspExceptionBody.Builder newBuilder3 = RPCProtos.RspExceptionBody.newBuilder();
                    newBuilder3.setExceptionName(replaceClassNamePrefix);
                    newBuilder3.setStackTrace(str2);
                    newBuilder3.build().writeDelimitedTo(dataOutputStream);
                } else if (obj != null) {
                    ((Message) obj).writeDelimitedTo(dataOutputStream);
                }
            } catch (IOException e) {
                NettyRpcServer.logger.warn(new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE).append("Exception while creating response ").append(e).toString());
            }
            return byteBufferOutputStream.getBufferList();
        }
    }

    public NettyRpcServer(RpcConfig rpcConfig) throws Exception {
        this.protocolType = 10;
        this.keyStorePath = TStringUtils.EMPTY;
        this.keyStorePassword = TStringUtils.EMPTY;
        this.needTwoWayAuthentic = false;
        this.trustStorePath = TStringUtils.EMPTY;
        this.trustStorePassword = TStringUtils.EMPTY;
        this.isOverTLS = rpcConfig.getBoolean(RpcConstants.TLS_OVER_TCP, false);
        if (this.isOverTLS) {
            this.protocolType = 11;
            this.keyStorePath = rpcConfig.getString(RpcConstants.TLS_KEYSTORE_PATH);
            this.keyStorePassword = rpcConfig.getString(RpcConstants.TLS_KEYSTORE_PASSWORD);
            this.needTwoWayAuthentic = rpcConfig.getBoolean(RpcConstants.TLS_TWO_WAY_AUTHENTIC, false);
            if (this.needTwoWayAuthentic) {
                this.trustStorePath = rpcConfig.getString(RpcConstants.TLS_TRUSTSTORE_PATH);
                this.trustStorePassword = rpcConfig.getString(RpcConstants.TLS_TRUSTSTORE_PASSWORD);
            }
            if (this.keyStorePath == null || this.keyStorePassword == null) {
                throw new Exception(new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE).append("Required parameters: ").append(RpcConstants.TLS_KEYSTORE_PATH).append(" or ").append(RpcConstants.TLS_KEYSTORE_PASSWORD).append(" for TLS!").toString());
            }
            if (this.needTwoWayAuthentic && (this.trustStorePath == null || this.trustStorePassword == null)) {
                throw new Exception(new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE).append("Required parameters: ").append(RpcConstants.TLS_TRUSTSTORE_PATH).append(" or ").append(RpcConstants.TLS_TRUSTSTORE_PASSWORD).append(" for TLS!").toString());
            }
        }
        this.bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), rpcConfig.getInt(RpcConstants.BOSS_COUNT, 1), Executors.newCachedThreadPool(), rpcConfig.getInt(RpcConstants.WORKER_COUNT, RpcConstants.CFG_DEFAULT_SERVER_WORKER_COUNT)));
        this.bootstrap.setOption("tcpNoDelay", Boolean.valueOf(rpcConfig.getBoolean(RpcConstants.TCP_NODELAY, true)));
        this.bootstrap.setOption("reuseAddress", Boolean.valueOf(rpcConfig.getBoolean(RpcConstants.TCP_REUSEADDRESS, true)));
        long j = rpcConfig.getLong(RpcConstants.NETTY_WRITE_HIGH_MARK, -1L);
        if (j > 0) {
            this.bootstrap.setOption("writeBufferHighWaterMark", Long.valueOf(j));
        }
        long j2 = rpcConfig.getLong(RpcConstants.NETTY_WRITE_LOW_MARK, -1L);
        if (j2 > 0) {
            this.bootstrap.setOption("writeBufferLowWaterMark", Long.valueOf(j2));
        }
        long j3 = rpcConfig.getLong(RpcConstants.NETTY_TCP_SENDBUF, -1L);
        if (j3 > 0) {
            this.bootstrap.setOption("sendBufferSize", Long.valueOf(j3));
        }
        long j4 = rpcConfig.getLong(RpcConstants.NETTY_TCP_RECEIVEBUF, -1L);
        if (j4 > 0) {
            this.bootstrap.setOption("receiveBufferSize", Long.valueOf(j4));
        }
    }

    @Override // org.apache.inlong.tubemq.corerpc.server.RpcServer
    public void start(int i) throws Exception {
        if (this.started.get()) {
            return;
        }
        this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() { // from class: org.apache.inlong.tubemq.corerpc.netty.NettyRpcServer.1
            public ChannelPipeline getPipeline() throws Exception {
                DefaultChannelPipeline defaultChannelPipeline = new DefaultChannelPipeline();
                if (NettyRpcServer.this.isOverTLS) {
                    try {
                        defaultChannelPipeline.addLast("ssl", new SslHandler(TSSLEngineUtil.createSSLEngine(NettyRpcServer.this.keyStorePath, NettyRpcServer.this.trustStorePath, NettyRpcServer.this.keyStorePassword, NettyRpcServer.this.trustStorePassword, false, NettyRpcServer.this.needTwoWayAuthentic)));
                    } catch (Throwable th) {
                        NettyRpcServer.logger.error("TLS NettyRpcServer init SSLEngine error, system auto exit!", th);
                        System.exit(1);
                    }
                }
                defaultChannelPipeline.addLast("protocolEncoder", new NettyProtocolDecoder());
                defaultChannelPipeline.addLast("protocolDecoder", new NettyProtocolEncoder());
                defaultChannelPipeline.addLast("serverHandler", new NettyServerHandler(NettyRpcServer.this.protocolType));
                return defaultChannelPipeline;
            }
        });
        this.bootstrap.bind(new InetSocketAddress(i));
        this.started.set(true);
        if (this.isOverTLS) {
            logger.info(new StringBuilder(256).append("TLS RpcServer started, listen port: ").append(i).toString());
        } else {
            logger.info(new StringBuilder(256).append("TCP RpcServer started, listen port: ").append(i).toString());
        }
    }

    @Override // org.apache.inlong.tubemq.corerpc.server.ServiceRpcServer
    public void publishService(String str, Object obj, ExecutorService executorService) throws Exception {
        Protocol protocol = this.protocols.get(Integer.valueOf(this.protocolType));
        if (protocol == null) {
            if (ProtocolFactory.getProtocol(this.protocolType) == null) {
                throw new Exception(new StringBuilder(256).append("Invalid protocol type ").append(this.protocolType).append("! You have to register you new protocol before publish service.").toString());
            }
            protocol = ProtocolFactory.getProtocolInstance(this.protocolType);
            this.protocols.put(Integer.valueOf(this.protocolType), protocol);
        }
        protocol.registerService(this.isOverTLS, str, obj, executorService);
    }

    @Override // org.apache.inlong.tubemq.corerpc.server.ServiceRpcServer
    public void removeService(int i, String str) throws Exception {
        Protocol protocol = this.protocols.get(Integer.valueOf(i));
        if (protocol != null) {
            protocol.removeService(str);
        }
    }

    @Override // org.apache.inlong.tubemq.corerpc.server.ServiceRpcServer
    public void removeAllService(int i) throws Exception {
        Protocol protocol = this.protocols.get(Integer.valueOf(i));
        if (protocol != null) {
            protocol.removeAllService();
        }
    }

    @Override // org.apache.inlong.tubemq.corerpc.server.ServiceRpcServer
    public boolean isServiceStarted() {
        return this.started.get();
    }

    @Override // org.apache.inlong.tubemq.corerpc.server.RpcServer
    public void stop() throws Exception {
        if (this.started.get() && this.started.compareAndSet(true, false)) {
            logger.info("Stopping RpcServer...");
            this.bootstrap.releaseExternalResources();
            logger.info("RpcServer stop successfully.");
        }
    }
}
