package org.apache.pinot.core.transport;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.server.access.AccessControl;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/core/transport/InstanceRequestHandler.class */
public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private static final Logger LOGGER = LoggerFactory.getLogger(InstanceRequestHandler.class);
    private static final int SLOW_QUERY_LATENCY_THRESHOLD_MS = 100;
    private final TDeserializer _deserializer;
    private final QueryScheduler _queryScheduler;
    private final ServerMetrics _serverMetrics;
    private final AccessControl _accessControl;

    public InstanceRequestHandler(QueryScheduler queryScheduler, ServerMetrics serverMetrics, AccessControl accessControl) {
        this._queryScheduler = queryScheduler;
        this._serverMetrics = serverMetrics;
        this._accessControl = accessControl;
        try {
            this._deserializer = new TDeserializer(new TCompactProtocol.Factory());
        } catch (TTransportException e) {
            throw new RuntimeException("Failed to initialize Thrift Deserializer", e);
        }
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        super.userEventTriggered(channelHandlerContext, obj);
        if (!(obj instanceof SslHandshakeCompletionEvent) || this._accessControl.isAuthorizedChannel(channelHandlerContext)) {
            return;
        }
        channelHandlerContext.disconnect();
        LOGGER.error("Exception while processing instance request: Unauthorized access to pinot-server");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        long j = 0;
        TBase tBase = null;
        byte[] bArr = null;
        String str = null;
        try {
            int readableBytes = byteBuf.readableBytes();
            tBase = new InstanceRequest();
            bArr = new byte[readableBytes];
            j = System.currentTimeMillis();
            this._serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1L);
            this._serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, readableBytes);
            byteBuf.readBytes(bArr);
            this._deserializer.deserialize(tBase, bArr);
            ServerQueryRequest serverQueryRequest = new ServerQueryRequest(tBase, this._serverMetrics, j);
            serverQueryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, j).stopAndRecord();
            str = serverQueryRequest.getTableNameWithType();
            Futures.addCallback(this._queryScheduler.submit(serverQueryRequest), createCallback(channelHandlerContext, str, j, tBase, serverQueryRequest), MoreExecutors.directExecutor());
        } catch (Exception e) {
            if (e instanceof TException) {
                this._serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1L);
            }
            String hexString = bArr != null ? BytesUtils.toHexString(bArr) : "";
            long requestId = tBase != null ? tBase.getRequestId() : 0L;
            LOGGER.error("Exception while processing instance request: {}", hexString, e);
            sendErrorResponse(channelHandlerContext, requestId, str, j, DataTableBuilder.getEmptyDataTable(), e);
        }
    }

    private FutureCallback<byte[]> createCallback(final ChannelHandlerContext channelHandlerContext, final String str, final long j, final InstanceRequest instanceRequest, final ServerQueryRequest serverQueryRequest) {
        return new FutureCallback<byte[]>() { // from class: org.apache.pinot.core.transport.InstanceRequestHandler.1
            public void onSuccess(@Nullable byte[] bArr) {
                if (bArr != null) {
                    InstanceRequestHandler.this.sendResponse(channelHandlerContext, serverQueryRequest.getTableNameWithType(), j, bArr);
                } else {
                    InstanceRequestHandler.this.sendErrorResponse(channelHandlerContext, serverQueryRequest.getRequestId(), str, j, DataTableBuilder.getEmptyDataTable(), new Exception("Null query response."));
                }
            }

            public void onFailure(Throwable th) {
                InstanceRequestHandler.LOGGER.error("Exception while processing instance request", th);
                InstanceRequestHandler.this.sendErrorResponse(channelHandlerContext, instanceRequest.getRequestId(), str, j, DataTableBuilder.getEmptyDataTable(), new Exception(th));
            }
        };
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        String str = "Unhandled Exception in " + getClass().getCanonicalName();
        LOGGER.error(str, th);
        sendErrorResponse(channelHandlerContext, 0L, null, System.currentTimeMillis(), DataTableBuilder.getEmptyDataTable(), new Exception(str, th));
    }

    private void sendErrorResponse(ChannelHandlerContext channelHandlerContext, long j, String str, long j2, DataTable dataTable, Exception exc) {
        try {
            try {
                dataTable.getMetadata().put(DataTable.MetadataKey.REQUEST_ID.getName(), Long.toString(j));
                dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, exc));
                sendResponse(channelHandlerContext, str, j2, dataTable.toBytes());
                LOGGER.error("Query processing error: ", exc);
                this._serverMetrics.addMeteredGlobalValue(ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1L);
            } catch (Exception e) {
                LOGGER.error("Exception while sending query processing error to Broker.", e);
                LOGGER.error("Query processing error: ", exc);
                this._serverMetrics.addMeteredGlobalValue(ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1L);
            }
        } catch (Throwable th) {
            LOGGER.error("Query processing error: ", exc);
            this._serverMetrics.addMeteredGlobalValue(ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1L);
            throw th;
        }
    }

    private void sendResponse(ChannelHandlerContext channelHandlerContext, String str, long j, byte[] bArr) {
        long currentTimeMillis = System.currentTimeMillis();
        int i = (int) (currentTimeMillis - j);
        channelHandlerContext.writeAndFlush(Unpooled.wrappedBuffer(bArr)).addListener(future -> {
            long currentTimeMillis2 = System.currentTimeMillis();
            int i2 = (int) (currentTimeMillis2 - currentTimeMillis);
            this._serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT, 1L);
            this._serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, bArr.length);
            this._serverMetrics.addTimedTableValue(str, ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY, i2, TimeUnit.MILLISECONDS);
            int i3 = (int) (currentTimeMillis2 - j);
            if (i3 > 100) {
                LOGGER.info("Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3)});
            }
        });
    }
}
