package org.apache.pinot.broker.requesthandler;

import com.yammer.metrics.core.MetricsRegistry;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.Configuration;
import org.apache.pinot.broker.api.RequestStatistics;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler;
import org.apache.pinot.broker.routing.RoutingTable;
import org.apache.pinot.broker.routing.TimeBoundaryService;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.BrokerQueryPhase;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.serde.SerDe;
import org.apache.pinot.transport.common.CompositeFuture;
import org.apache.pinot.transport.conf.TransportClientConf;
import org.apache.pinot.transport.config.ConnectionPoolConfig;
import org.apache.pinot.transport.metrics.NettyClientMetrics;
import org.apache.pinot.transport.netty.PooledNettyClientResourceManager;
import org.apache.pinot.transport.pool.KeyedPool;
import org.apache.pinot.transport.pool.KeyedPoolImpl;
import org.apache.pinot.transport.scattergather.ScatterGather;
import org.apache.pinot.transport.scattergather.ScatterGatherImpl;
import org.apache.pinot.transport.scattergather.ScatterGatherRequest;
import org.apache.pinot.transport.scattergather.ScatterGatherStats;
import org.apache.thrift.protocol.TCompactProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.class */
public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler {
    private static final Logger LOGGER;
    private static final String TRANSPORT_CONFIG_PREFIX = "pinot.broker.transport";
    private final EventLoopGroup _eventLoopGroup;
    private final ScheduledThreadPoolExecutor _poolTimeoutExecutor;
    private final ExecutorService _requestSenderPool;
    private final KeyedPool<PooledNettyClientResourceManager.PooledClientConnection> _connPool;
    private final ScatterGather _scatterGather;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler$PhaseTimes.class */
    public class PhaseTimes {
        private long _scatterTimeNs;
        private long _gatherTimeNs;
        private long _deserializationTimeNs;
        private long _reduceTimeNs;

        private PhaseTimes() {
            this._scatterTimeNs = 0L;
            this._gatherTimeNs = 0L;
            this._deserializationTimeNs = 0L;
            this._reduceTimeNs = 0L;
        }

        public void addToScatterTime(long j) {
            this._scatterTimeNs += j;
        }

        public void addToGatherTime(long j) {
            this._gatherTimeNs += j;
        }

        public void addToDeserializationTime(long j) {
            this._deserializationTimeNs += j;
        }

        public void addToReduceTime(long j) {
            this._reduceTimeNs += j;
        }

        public void addPhaseTimesToBrokerMetrics(String str) {
            ConnectionPoolBrokerRequestHandler.this._brokerMetrics.addPhaseTiming(str, BrokerQueryPhase.SCATTER_GATHER, this._scatterTimeNs + this._gatherTimeNs);
            ConnectionPoolBrokerRequestHandler.this._brokerMetrics.addPhaseTiming(str, BrokerQueryPhase.DESERIALIZATION, this._deserializationTimeNs);
            ConnectionPoolBrokerRequestHandler.this._brokerMetrics.addPhaseTiming(str, BrokerQueryPhase.REDUCE, this._reduceTimeNs);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler$ScatterGatherRequestImpl.class */
    public static class ScatterGatherRequestImpl implements ScatterGatherRequest {
        private final BrokerRequest _brokerRequest;
        private final Map<String, List<String>> _routingTable;
        private final long _requestId;
        private final long _requestTimeoutMs;
        private final String _brokerId;

        public ScatterGatherRequestImpl(BrokerRequest brokerRequest, Map<String, List<String>> map, long j, long j2, String str) {
            this._brokerRequest = brokerRequest;
            this._routingTable = map;
            this._requestId = j;
            this._requestTimeoutMs = j2;
            this._brokerId = str;
        }

        public Map<String, List<String>> getRoutingTable() {
            return this._routingTable;
        }

        public byte[] getRequestForService(List<String> list) {
            InstanceRequest instanceRequest = new InstanceRequest();
            instanceRequest.setRequestId(this._requestId);
            instanceRequest.setEnableTrace(this._brokerRequest.isEnableTrace());
            instanceRequest.setQuery(this._brokerRequest);
            instanceRequest.setSearchSegments(list);
            instanceRequest.setBrokerId(this._brokerId);
            return new SerDe(new TCompactProtocol.Factory()).serialize(instanceRequest);
        }

        public long getRequestId() {
            return this._requestId;
        }

        public long getRequestTimeoutMs() {
            return this._requestTimeoutMs;
        }

        public BrokerRequest getBrokerRequest() {
            return this._brokerRequest;
        }
    }

    public ConnectionPoolBrokerRequestHandler(Configuration configuration, RoutingTable routingTable, TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics, MetricsRegistry metricsRegistry) {
        super(configuration, routingTable, timeBoundaryService, accessControlFactory, queryQuotaManager, brokerMetrics);
        TransportClientConf transportClientConf = new TransportClientConf();
        transportClientConf.init(this._config.subset(TRANSPORT_CONFIG_PREFIX));
        this._eventLoopGroup = new NioEventLoopGroup();
        PooledNettyClientResourceManager pooledNettyClientResourceManager = new PooledNettyClientResourceManager(this._eventLoopGroup, new HashedWheelTimer(), new NettyClientMetrics(metricsRegistry, "client_"));
        this._poolTimeoutExecutor = new ScheduledThreadPoolExecutor(50);
        this._requestSenderPool = Executors.newCachedThreadPool();
        ConnectionPoolConfig connPool = transportClientConf.getConnPool();
        this._connPool = new KeyedPoolImpl(connPool.getMinConnectionsPerServer(), connPool.getMaxConnectionsPerServer(), connPool.getIdleTimeoutMs(), connPool.getMaxBacklogPerServer(), pooledNettyClientResourceManager, this._poolTimeoutExecutor, this._requestSenderPool, metricsRegistry);
        pooledNettyClientResourceManager.setPool(this._connPool);
        this._scatterGather = new ScatterGatherImpl(this._connPool, this._requestSenderPool);
    }

    public KeyedPool<PooledNettyClientResourceManager.PooledClientConnection> getConnPool() {
        return this._connPool;
    }

    @Override // org.apache.pinot.broker.requesthandler.BrokerRequestHandler
    public synchronized void start() {
        this._connPool.start();
    }

    @Override // org.apache.pinot.broker.requesthandler.BrokerRequestHandler
    public synchronized void shutDown() {
        this._connPool.shutdown();
        this._requestSenderPool.shutdown();
        this._poolTimeoutExecutor.shutdown();
        this._eventLoopGroup.shutdownGracefully();
    }

    @Override // org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler
    protected BrokerResponse processBrokerRequest(long j, BrokerRequest brokerRequest, @Nullable BrokerRequest brokerRequest2, @Nullable Map<String, List<String>> map, @Nullable BrokerRequest brokerRequest3, @Nullable Map<String, List<String>> map2, long j2, BaseBrokerRequestHandler.ServerStats serverStats, RequestStatistics requestStatistics) throws Exception {
        ScatterGatherStats scatterGatherStats = new ScatterGatherStats();
        PhaseTimes phaseTimes = new PhaseTimes();
        String str = null;
        String str2 = null;
        CompositeFuture<byte[]> compositeFuture = null;
        CompositeFuture<byte[]> compositeFuture2 = null;
        if (brokerRequest2 != null) {
            if (!$assertionsDisabled && map == null) {
                throw new AssertionError();
            }
            str = brokerRequest2.getQuerySource().getTableName();
            compositeFuture = scatterBrokerRequest(j, brokerRequest2, map, true, j2, scatterGatherStats, phaseTimes);
        }
        if (brokerRequest3 != null) {
            if (!$assertionsDisabled && map2 == null) {
                throw new AssertionError();
            }
            str2 = brokerRequest3.getQuerySource().getTableName();
            compositeFuture2 = scatterBrokerRequest(j, brokerRequest3, map2, false, j2, scatterGatherStats, phaseTimes);
        }
        int i = 0;
        long nanoTime = System.nanoTime();
        ArrayList arrayList = new ArrayList();
        Map<ServerInstance, byte[]> map3 = null;
        Map<ServerInstance, byte[]> map4 = null;
        if (compositeFuture != null) {
            i = 0 + compositeFuture.getNumFutures();
            map3 = gatherServerResponses(compositeFuture, scatterGatherStats, true, str, arrayList);
        }
        if (compositeFuture2 != null) {
            i += compositeFuture2.getNumFutures();
            map4 = gatherServerResponses(compositeFuture2, scatterGatherStats, false, str2, arrayList);
        }
        if (map3 == null && map4 == null) {
            return new BrokerResponseNative(arrayList);
        }
        long nanoTime2 = System.nanoTime();
        phaseTimes.addToGatherTime(nanoTime2 - nanoTime);
        serverStats.setServerStats(scatterGatherStats.toString());
        int i2 = 0;
        HashMap hashMap = new HashMap();
        long j3 = 0;
        if (map3 != null) {
            i2 = 0 + map3.size();
            j3 = 0 + deserializeServerResponses(map3, true, hashMap, str, arrayList);
        }
        if (map4 != null) {
            i2 += map4.size();
            j3 += deserializeServerResponses(map4, false, hashMap, str2, arrayList);
        }
        long nanoTime3 = System.nanoTime();
        phaseTimes.addToDeserializationTime(nanoTime3 - nanoTime2);
        BrokerResponseNative reduceOnDataTable = this._brokerReduceService.reduceOnDataTable(brokerRequest, hashMap, this._brokerMetrics);
        long nanoTime4 = System.nanoTime() - nanoTime3;
        phaseTimes.addToReduceTime(nanoTime4);
        requestStatistics.setReduceTimeNanos(nanoTime4);
        reduceOnDataTable.setExceptions(arrayList);
        reduceOnDataTable.setNumServersQueried(i);
        reduceOnDataTable.setNumServersResponded(i2);
        String extractRawTableName = TableNameBuilder.extractRawTableName(brokerRequest.getQuerySource().getTableName());
        phaseTimes.addPhaseTimesToBrokerMetrics(extractRawTableName);
        if (reduceOnDataTable.getExceptionsSize() > 0) {
            this._brokerMetrics.addMeteredTableValue(extractRawTableName, BrokerMeter.BROKER_RESPONSES_WITH_PROCESSING_EXCEPTIONS, 1L);
        }
        if (i > i2) {
            this._brokerMetrics.addMeteredTableValue(extractRawTableName, BrokerMeter.BROKER_RESPONSES_WITH_PARTIAL_SERVERS_RESPONDED, 1L);
        }
        this._brokerMetrics.addMeteredQueryValue(brokerRequest, BrokerMeter.TOTAL_SERVER_RESPONSE_SIZE, j3);
        return reduceOnDataTable;
    }

    private CompositeFuture<byte[]> scatterBrokerRequest(long j, BrokerRequest brokerRequest, Map<String, List<String>> map, boolean z, long j2, ScatterGatherStats scatterGatherStats, PhaseTimes phaseTimes) throws InterruptedException {
        long nanoTime = System.nanoTime();
        CompositeFuture<byte[]> scatterGather = this._scatterGather.scatterGather(new ScatterGatherRequestImpl(brokerRequest, map, j, j2, this._brokerId), scatterGatherStats, Boolean.valueOf(z), this._brokerMetrics);
        phaseTimes.addToScatterTime(System.nanoTime() - nanoTime);
        return scatterGather;
    }

    private Map<ServerInstance, byte[]> gatherServerResponses(CompositeFuture<byte[]> compositeFuture, ScatterGatherStats scatterGatherStats, boolean z, String str, List<ProcessingException> list) {
        try {
            Map<ServerInstance, byte[]> map = compositeFuture.get();
            Iterator<Map.Entry<ServerInstance, byte[]>> it = map.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<ServerInstance, byte[]> next = it.next();
                if (next.getValue().length == 0) {
                    LOGGER.warn("Got empty response from server: {]", next.getKey().getShortHostName());
                    it.remove();
                }
            }
            scatterGatherStats.setResponseTimeMillis(compositeFuture.getResponseTimes(), z);
            return map;
        } catch (Exception e) {
            LOGGER.error("Caught exception while fetching responses for table: {}", str, e);
            this._brokerMetrics.addMeteredTableValue(str, BrokerMeter.RESPONSE_FETCH_EXCEPTIONS, 1L);
            list.add(QueryException.getException(QueryException.BROKER_GATHER_ERROR, e));
            return null;
        }
    }

    private long deserializeServerResponses(Map<ServerInstance, byte[]> map, boolean z, Map<ServerInstance, DataTable> map2, String str, List<ProcessingException> list) {
        long j = 0;
        for (Map.Entry<ServerInstance, byte[]> entry : map.entrySet()) {
            ServerInstance key = entry.getKey();
            if (!z) {
                key = key.withSeq(1);
            }
            j += r0.length;
            try {
                map2.put(key, DataTableFactory.getDataTable(entry.getValue()));
            } catch (Exception e) {
                LOGGER.error("Caught exceptions while deserializing response for table: {} from server: {}", new Object[]{str, key, e});
                this._brokerMetrics.addMeteredTableValue(str, BrokerMeter.DATA_TABLE_DESERIALIZATION_EXCEPTIONS, 1L);
                list.add(QueryException.getException(QueryException.DATA_TABLE_DESERIALIZATION_ERROR, e));
            }
        }
        return j;
    }

    static {
        $assertionsDisabled = !ConnectionPoolBrokerRequestHandler.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(ConnectionPoolBrokerRequestHandler.class);
    }
}
