package org.apache.pinot.transport.netty;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.pinot.common.metrics.AggregatedMetricsRegistry;
import org.apache.pinot.common.metrics.MetricsHelper;
import org.apache.pinot.transport.metrics.AggregatedTransportServerMetrics;
import org.apache.pinot.transport.metrics.NettyServerMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/transport/netty/NettyServer.class */
public abstract class NettyServer implements Runnable {
    protected static Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);
    public static final String AGGREGATED_SERVER_METRICS_NAME = "Server_Global_Metric_";
    protected int _port;
    protected AtomicBoolean _shutdownComplete;
    protected final EventLoopGroup _bossGroup;
    protected final EventLoopGroup _workerGroup;
    protected volatile Channel _channel;
    protected RequestHandlerFactory _handlerFactory;
    protected final AggregatedMetricsRegistry _metricsRegistry;
    protected final AggregatedTransportServerMetrics _metrics;
    protected final long _defaultLargeQueryLatencyMs;

    /* loaded from: input_file:org/apache/pinot/transport/netty/NettyServer$NettyChannelInboundHandler.class */
    public static class NettyChannelInboundHandler extends ChannelInboundHandlerAdapter {
        private final long _defaultLargeQueryLatencyMs;
        private final RequestHandler _handler;
        private final NettyServerMetrics _metric;

        public NettyChannelInboundHandler(RequestHandler requestHandler, NettyServerMetrics nettyServerMetrics, long j) {
            this._handler = requestHandler;
            this._metric = nettyServerMetrics;
            this._defaultLargeQueryLatencyMs = j;
        }

        public NettyChannelInboundHandler(RequestHandler requestHandler, NettyServerMetrics nettyServerMetrics) {
            this(requestHandler, nettyServerMetrics, 100L);
        }

        public void channelRead(final ChannelHandlerContext channelHandlerContext, Object obj) {
            final long currentTimeMillis = System.currentTimeMillis();
            ByteBuf byteBuf = (ByteBuf) obj;
            final int readableBytes = byteBuf.readableBytes();
            byte[] bArr = new byte[readableBytes];
            byteBuf.readBytes(bArr);
            byteBuf.release();
            final MetricsHelper.TimerContext startTimer = MetricsHelper.startTimer();
            Futures.addCallback(this._handler.processRequest(bArr), new FutureCallback<byte[]>() { // from class: org.apache.pinot.transport.netty.NettyServer.NettyChannelInboundHandler.1
                void sendResponse(@Nonnull byte[] bArr2) {
                    startTimer.stop();
                    ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(bArr2);
                    MetricsHelper.TimerContext startTimer2 = MetricsHelper.startTimer();
                    ChannelFuture writeAndFlush = channelHandlerContext.writeAndFlush(wrappedBuffer);
                    int i = readableBytes;
                    MetricsHelper.TimerContext timerContext = startTimer;
                    long j = currentTimeMillis;
                    writeAndFlush.addListener(future -> {
                        startTimer2.stop();
                        NettyChannelInboundHandler.this._metric.addServingStats(i, bArr2.length, 1L, false, timerContext.getLatencyMs(), startTimer2.getLatencyMs());
                        long currentTimeMillis2 = System.currentTimeMillis() - j;
                        if (currentTimeMillis2 > NettyChannelInboundHandler.this._defaultLargeQueryLatencyMs) {
                            NettyServer.LOGGER.info("Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}", new Object[]{Long.valueOf(timerContext.getLatencyMs()), Long.valueOf(startTimer2.getLatencyMs()), Long.valueOf(currentTimeMillis2)});
                        }
                    });
                }

                public void onSuccess(@Nullable byte[] bArr2) {
                    if (bArr2 == null) {
                        bArr2 = new byte[0];
                    }
                    sendResponse(bArr2);
                }

                public void onFailure(Throwable th) {
                    NettyServer.LOGGER.error("Request processing returned unhandled exception, error: ", th);
                    sendResponse(new byte[0]);
                }
            }, MoreExecutors.directExecutor());
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            NettyServer.LOGGER.error("Got exception in the channel handler", th);
            this._metric.addServingStats(0L, 0L, 0L, true, 0L, 0L);
            channelHandlerContext.close();
        }

        public String toString() {
            return "NettyChannelInboundHandler [_handler=" + this._handler + ", _metric=" + this._metric + "]";
        }
    }

    /* loaded from: input_file:org/apache/pinot/transport/netty/NettyServer$RequestHandler.class */
    public interface RequestHandler {
        ListenableFuture<byte[]> processRequest(byte[] bArr);
    }

    /* loaded from: input_file:org/apache/pinot/transport/netty/NettyServer$RequestHandlerFactory.class */
    public interface RequestHandlerFactory {
        RequestHandler createNewRequestHandler();
    }

    public NettyServer(int i, RequestHandlerFactory requestHandlerFactory, AggregatedMetricsRegistry aggregatedMetricsRegistry, long j) {
        this(i, requestHandlerFactory, aggregatedMetricsRegistry, j, 1, 20);
    }

    public NettyServer(int i, RequestHandlerFactory requestHandlerFactory, AggregatedMetricsRegistry aggregatedMetricsRegistry, long j, int i2, int i3) {
        this._shutdownComplete = new AtomicBoolean(false);
        this._channel = null;
        this._port = i;
        this._handlerFactory = requestHandlerFactory;
        this._metricsRegistry = aggregatedMetricsRegistry;
        this._metrics = new AggregatedTransportServerMetrics(this._metricsRegistry, AGGREGATED_SERVER_METRICS_NAME + i + "_");
        this._defaultLargeQueryLatencyMs = j;
        this._bossGroup = new NioEventLoopGroup(i2);
        this._workerGroup = new NioEventLoopGroup(i3);
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            ServerBootstrap serverBootstrap = getServerBootstrap();
            LOGGER.info("Binding to the server port !!");
            ChannelFuture sync = serverBootstrap.bind(this._port).sync();
            this._channel = sync.channel();
            LOGGER.info("Server bounded to port :" + this._port + ", Waiting for closing");
            sync.channel().closeFuture().sync();
            LOGGER.info("Server boss channel is closed. Gracefully shutting down the server netty threads and pipelines");
        } catch (Exception e) {
            LOGGER.error("Got exception in the main server thread. Stopping !!", e);
        } finally {
            this._shutdownComplete.set(true);
        }
    }

    protected abstract ServerBootstrap getServerBootstrap();

    public boolean isStarted() {
        return this._channel != null;
    }

    public void shutdownGracefully() {
        LOGGER.info("Shutdown requested in the server !!");
        if (null != this._channel) {
            LOGGER.info("Closing the server channel");
            this._channel.close();
            this._bossGroup.shutdownGracefully();
            this._workerGroup.shutdownGracefully();
        }
    }

    public void waitForShutdown(long j) {
        LOGGER.info("Waiting for Shutdown");
        if (this._channel != null) {
            LOGGER.info("Closing the server channel");
            long currentTimeMillis = System.currentTimeMillis() + j;
            ChannelFuture close = this._channel.close();
            Future shutdownGracefully = this._bossGroup.shutdownGracefully();
            Future shutdownGracefully2 = this._workerGroup.shutdownGracefully();
            long currentTimeMillis2 = System.currentTimeMillis();
            if (currentTimeMillis > currentTimeMillis2) {
                close.awaitUninterruptibly(currentTimeMillis - currentTimeMillis2, TimeUnit.MILLISECONDS);
            }
            long currentTimeMillis3 = System.currentTimeMillis();
            if (currentTimeMillis > currentTimeMillis3) {
                shutdownGracefully.awaitUninterruptibly(currentTimeMillis - currentTimeMillis3, TimeUnit.MILLISECONDS);
            }
            long currentTimeMillis4 = System.currentTimeMillis();
            if (currentTimeMillis > currentTimeMillis4) {
                shutdownGracefully2.awaitUninterruptibly(currentTimeMillis - currentTimeMillis4, TimeUnit.MILLISECONDS);
            }
            Preconditions.checkState(close.isDone(), "Unable to close the channel in %s ms", j);
            Preconditions.checkState(shutdownGracefully.isDone(), "Unable to shutdown the boss group in %s ms", j);
            Preconditions.checkState(shutdownGracefully2.isDone(), "Unable to shutdown the worker group in %s ms", j);
        }
    }

    public boolean isShutdownComplete() {
        return this._shutdownComplete.get();
    }
}
