package org.apache.pinot.core.transport;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.config.NettyConfig;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.BrokerTimer;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.core.util.OsCheck;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/pinot/core/transport/ServerChannels.class */
public class ServerChannels {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServerChannels.class);
    public static final String CHANNEL_LOCK_TIMEOUT_MSG = "Timeout while acquiring channel lock";
    private static final long TRY_CONNECT_CHANNEL_LOCK_TIMEOUT_MS = 5000;
    private final QueryRouter _queryRouter;
    private final BrokerMetrics _brokerMetrics;
    private final ThreadLocal<TSerializer> _threadLocalTSerializer;
    private final ConcurrentHashMap<ServerRoutingInstance, ServerChannel> _serverToChannelMap = new ConcurrentHashMap<>();
    private final TlsConfig _tlsConfig;
    private final EventLoopGroup _eventLoopGroup;
    private final Class<? extends SocketChannel> _channelClass;

    /* JADX INFO: Access modifiers changed from: private */
    @ThreadSafe
    /* loaded from: input_file:org/apache/pinot/core/transport/ServerChannels$ServerChannel.class */
    public class ServerChannel {
        final ServerRoutingInstance _serverRoutingInstance;
        final Bootstrap _bootstrap;
        final ReentrantLock _channelLock = new ReentrantLock();
        Channel _channel;

        ServerChannel(ServerRoutingInstance serverRoutingInstance) {
            this._serverRoutingInstance = serverRoutingInstance;
            this._bootstrap = new Bootstrap().remoteAddress(serverRoutingInstance.getHostname(), serverRoutingInstance.getPort()).group(ServerChannels.this._eventLoopGroup).channel(ServerChannels.this._channelClass).option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.pinot.core.transport.ServerChannels.ServerChannel.1
                /* JADX INFO: Access modifiers changed from: protected */
                public void initChannel(SocketChannel socketChannel) {
                    if (ServerChannels.this._tlsConfig != null) {
                        socketChannel.pipeline().addLast(ChannelHandlerFactory.SSL, ChannelHandlerFactory.getClientTlsHandler(ServerChannels.this._tlsConfig, socketChannel));
                    }
                    socketChannel.pipeline().addLast(new ChannelHandler[]{ChannelHandlerFactory.getLengthFieldBasedFrameDecoder()});
                    socketChannel.pipeline().addLast(new ChannelHandler[]{ChannelHandlerFactory.getLengthFieldPrepender()});
                    socketChannel.pipeline().addLast(new ChannelHandler[]{ChannelHandlerFactory.getDataTableHandler(ServerChannels.this._queryRouter, ServerChannel.this._serverRoutingInstance, ServerChannels.this._brokerMetrics)});
                }
            });
        }

        void sendRequest(String str, AsyncQueryResponse asyncQueryResponse, ServerRoutingInstance serverRoutingInstance, byte[] bArr, long j) throws InterruptedException, TimeoutException {
            if (!this._channelLock.tryLock(j, TimeUnit.MILLISECONDS)) {
                throw new TimeoutException(ServerChannels.CHANNEL_LOCK_TIMEOUT_MSG);
            }
            try {
                connectWithoutLocking();
                sendRequestWithoutLocking(str, asyncQueryResponse, serverRoutingInstance, bArr);
                this._channelLock.unlock();
            } catch (Throwable th) {
                this._channelLock.unlock();
                throw th;
            }
        }

        void connectWithoutLocking() throws InterruptedException {
            if (this._channel == null || !this._channel.isActive()) {
                long currentTimeMillis = System.currentTimeMillis();
                this._channel = this._bootstrap.connect().sync().channel();
                ServerChannels.this._brokerMetrics.setValueOfGlobalGauge(BrokerGauge.NETTY_CONNECTION_CONNECT_TIME_MS, System.currentTimeMillis() - currentTimeMillis);
            }
        }

        void sendRequestWithoutLocking(String str, AsyncQueryResponse asyncQueryResponse, ServerRoutingInstance serverRoutingInstance, byte[] bArr) {
            long currentTimeMillis = System.currentTimeMillis();
            this._channel.writeAndFlush(Unpooled.wrappedBuffer(bArr)).addListener(future -> {
                int currentTimeMillis2 = (int) (System.currentTimeMillis() - currentTimeMillis);
                ServerChannels.this._brokerMetrics.addTimedTableValue(str, BrokerTimer.NETTY_CONNECTION_SEND_REQUEST_LATENCY, currentTimeMillis2, TimeUnit.MILLISECONDS);
                asyncQueryResponse.markRequestSent(serverRoutingInstance, currentTimeMillis2);
            });
            ServerChannels.this._brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_REQUESTS_SENT, 1L);
            ServerChannels.this._brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_BYTES_SENT, bArr.length);
        }

        void connect() throws InterruptedException, TimeoutException {
            if (!this._channelLock.tryLock(ServerChannels.TRY_CONNECT_CHANNEL_LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
                throw new TimeoutException(ServerChannels.CHANNEL_LOCK_TIMEOUT_MSG);
            }
            try {
                connectWithoutLocking();
            } finally {
                this._channelLock.unlock();
            }
        }
    }

    public ServerChannels(QueryRouter queryRouter, BrokerMetrics brokerMetrics, @Nullable NettyConfig nettyConfig, @Nullable TlsConfig tlsConfig) {
        boolean z = nettyConfig != null && nettyConfig.isNativeTransportsEnabled();
        OsCheck.OSType operatingSystemType = OsCheck.getOperatingSystemType();
        if (z && operatingSystemType == OsCheck.OSType.Linux && Epoll.isAvailable()) {
            this._eventLoopGroup = new EpollEventLoopGroup();
            this._channelClass = EpollSocketChannel.class;
            LOGGER.info("Using Epoll event loop");
        } else if (z && operatingSystemType == OsCheck.OSType.MacOS && KQueue.isAvailable()) {
            this._eventLoopGroup = new KQueueEventLoopGroup();
            this._channelClass = KQueueSocketChannel.class;
            LOGGER.info("Using KQueue event loop");
        } else {
            this._eventLoopGroup = new NioEventLoopGroup();
            this._channelClass = NioSocketChannel.class;
            StringBuilder sb = new StringBuilder("Using NIO event loop");
            if (operatingSystemType == OsCheck.OSType.Linux && z) {
                sb.append(", as Epoll is not available: ").append(Epoll.unavailabilityCause());
            } else if (operatingSystemType == OsCheck.OSType.MacOS && z) {
                sb.append(", as KQueue is not available: ").append(KQueue.unavailabilityCause());
            }
            LOGGER.info(sb.toString());
        }
        this._queryRouter = queryRouter;
        this._brokerMetrics = brokerMetrics;
        this._tlsConfig = tlsConfig;
        this._threadLocalTSerializer = ThreadLocal.withInitial(() -> {
            try {
                return new TSerializer(new TCompactProtocol.Factory());
            } catch (TTransportException e) {
                throw new RuntimeException("Failed to initialize Thrift Serializer", e);
            }
        });
    }

    public void sendRequest(String str, AsyncQueryResponse asyncQueryResponse, ServerRoutingInstance serverRoutingInstance, InstanceRequest instanceRequest, long j) throws Exception {
        this._serverToChannelMap.computeIfAbsent(serverRoutingInstance, serverRoutingInstance2 -> {
            return new ServerChannel(serverRoutingInstance2);
        }).sendRequest(str, asyncQueryResponse, serverRoutingInstance, this._threadLocalTSerializer.get().serialize(instanceRequest), j);
    }

    public void connect(ServerRoutingInstance serverRoutingInstance) throws InterruptedException, TimeoutException {
        this._serverToChannelMap.computeIfAbsent(serverRoutingInstance, serverRoutingInstance2 -> {
            return new ServerChannel(serverRoutingInstance2);
        }).connect();
    }

    public void shutDown() {
        this._eventLoopGroup.shutdownGracefully(0L, 0L, TimeUnit.SECONDS);
    }
}
