package org.apache.pinot.transport.scattergather;

import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.BrokerQueryPhase;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.transport.common.CompositeFuture;
import org.apache.pinot.transport.common.ServerResponseFuture;
import org.apache.pinot.transport.netty.NettyClientConnection;
import org.apache.pinot.transport.netty.PooledNettyClientResourceManager;
import org.apache.pinot.transport.pool.KeyedPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/transport/scattergather/ScatterGatherImpl.class */
public class ScatterGatherImpl implements ScatterGather {
    private static final Logger LOGGER = LoggerFactory.getLogger(ScatterGatherImpl.class);
    private final KeyedPool<PooledNettyClientResourceManager.PooledClientConnection> _connPool;
    private final ExecutorService _executorService;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/transport/scattergather/ScatterGatherImpl$ConnectionLimitReachedException.class */
    public static class ConnectionLimitReachedException extends RuntimeException {
        ConnectionLimitReachedException(String str) {
            super(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/transport/scattergather/ScatterGatherImpl$ScatterGatherRequestContext.class */
    public static class ScatterGatherRequestContext {
        private final ScatterGatherRequest _request;
        private final long _startTimeMs = System.currentTimeMillis();

        public ScatterGatherRequestContext(ScatterGatherRequest scatterGatherRequest) {
            this._request = scatterGatherRequest;
        }

        public long getRemainingTimeMs() {
            long requestTimeoutMs = this._request.getRequestTimeoutMs();
            if (requestTimeoutMs < 0) {
                return Long.MAX_VALUE;
            }
            return requestTimeoutMs - (System.currentTimeMillis() - this._startTimeMs);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/transport/scattergather/ScatterGatherImpl$SingleRequestHandler.class */
    public static class SingleRequestHandler implements Runnable {
        private static final int MAX_CONN_RETRIES = 3;
        private final ScatterGatherRequest _request;
        private final List<String> _segments;
        private final ServerInstance _server;
        private final CountDownLatch _requestDispatchLatch;
        private volatile NettyClientConnection.ResponseFuture _responseFuture;
        private final KeyedPool<PooledNettyClientResourceManager.PooledClientConnection> _connPool;
        private final long _timeoutMS;
        private final BrokerMetrics _brokerMetrics;
        private long _startTime;
        private long _endTime;
        private final AtomicBoolean _isSent = new AtomicBoolean(false);
        private final AtomicBoolean _isCancelled = new AtomicBoolean(false);
        private final long _initTime = System.currentTimeMillis();

        public SingleRequestHandler(KeyedPool<PooledNettyClientResourceManager.PooledClientConnection> keyedPool, ServerInstance serverInstance, ScatterGatherRequest scatterGatherRequest, List<String> list, long j, CountDownLatch countDownLatch, BrokerMetrics brokerMetrics) {
            this._connPool = keyedPool;
            this._server = serverInstance;
            this._request = scatterGatherRequest;
            this._segments = list;
            this._requestDispatchLatch = countDownLatch;
            this._timeoutMS = j;
            this._brokerMetrics = brokerMetrics;
        }

        @Override // java.lang.Runnable
        public synchronized void run() {
            try {
                this._startTime = System.currentTimeMillis();
                runInternal();
            } finally {
                this._endTime = System.currentTimeMillis();
            }
        }

        public long getConnStartTimeMillis() {
            return this._startTime - this._initTime;
        }

        public long getSendCompletionTimeMillis() {
            if (this._endTime > this._initTime) {
                return this._endTime - this._initTime;
            }
            return 0L;
        }

        public long getStartDelayMillis() {
            return this._startTime - this._initTime;
        }

        private void runInternal() {
            if (this._isCancelled.get()) {
                ScatterGatherImpl.LOGGER.error("Request {} to server {} cancelled even before request is sent !! Not sending request", Long.valueOf(this._request.getRequestId()), this._server);
                this._requestDispatchLatch.countDown();
                return;
            }
            ServerResponseFuture serverResponseFuture = null;
            long currentTimeMillis = this._timeoutMS - (System.currentTimeMillis() - this._startTime);
            long nanoTime = System.nanoTime();
            try {
                try {
                    ServerResponseFuture<PooledNettyClientResourceManager.PooledClientConnection> checkoutObject = this._connPool.checkoutObject(this._server, String.valueOf(this._request.getRequestId()));
                    byte[] requestForService = this._request.getRequestForService(this._segments);
                    int i = 0;
                    while (currentTimeMillis > 0) {
                        PooledNettyClientResourceManager.PooledClientConnection one = checkoutObject.getOne(currentTimeMillis, TimeUnit.MILLISECONDS);
                        if (one != null && one.validate()) {
                            long nanoTime2 = System.nanoTime() - nanoTime;
                            this._responseFuture = one.sendRequest(Unpooled.wrappedBuffer(requestForService), this._request.getRequestId(), currentTimeMillis);
                            this._isSent.set(true);
                            ScatterGatherImpl.LOGGER.debug("Response Future is : {}", this._responseFuture);
                            this._requestDispatchLatch.countDown();
                            BrokerRequest brokerRequest = this._request.getBrokerRequest();
                            this._brokerMetrics.addPhaseTiming(brokerRequest, BrokerQueryPhase.REQUEST_CONNECTION_WAIT, nanoTime2);
                            if (currentTimeMillis < 0) {
                                this._brokerMetrics.addMeteredQueryValue(brokerRequest, BrokerMeter.REQUEST_CONNECTION_TIMEOUTS, 1L);
                            }
                            if (0 != 0) {
                                if (1 != 0) {
                                    this._brokerMetrics.addMeteredQueryValue(brokerRequest, BrokerMeter.REQUEST_DROPPED_DUE_TO_SEND_ERROR, 1L);
                                    return;
                                }
                                if (checkoutObject != null) {
                                    checkoutObject.cancel(true);
                                }
                                this._brokerMetrics.addMeteredQueryValue(brokerRequest, BrokerMeter.REQUEST_DROPPED_DUE_TO_CONNECTION_ERROR, 1L);
                                return;
                            }
                            return;
                        }
                        Map<ServerInstance, Throwable> error = checkoutObject.getError();
                        String str = "";
                        if (error != null && error.containsKey(this._server)) {
                            str = error.get(this._server).getMessage();
                        }
                        if (one != null) {
                            ScatterGatherImpl.LOGGER.warn("Destroying invalid conn {}:{}", one, str);
                            this._connPool.destroyObject(this._server, one);
                        }
                        i++;
                        if (i == 2) {
                            throw new ConnectionLimitReachedException("Could not connect to " + this._server + " after " + i + " attempts(timeRemaining=" + currentTimeMillis + "ms)");
                        }
                        checkoutObject = this._connPool.checkoutObject(this._server, "none");
                        currentTimeMillis = this._timeoutMS - (System.currentTimeMillis() - this._startTime);
                    }
                    throw new TimeoutException("Timed out trying to connect to " + this._server + "(timeout=" + this._timeoutMS + "ms,ntries=" + i + ")");
                } catch (TimeoutException e) {
                    ScatterGatherImpl.LOGGER.warn("Timed out waiting for connection for server ({})({})(gotConnection={}):{}. See metric {}", new Object[]{this._server, Long.valueOf(this._request.getRequestId()), false, e.getMessage(), BrokerMeter.REQUEST_DROPPED_DUE_TO_CONNECTION_ERROR.getMeterName()});
                    this._responseFuture = new NettyClientConnection.ResponseFuture(this._server, e, "Error Future for request " + this._request.getRequestId());
                    this._requestDispatchLatch.countDown();
                    BrokerRequest brokerRequest2 = this._request.getBrokerRequest();
                    this._brokerMetrics.addPhaseTiming(brokerRequest2, BrokerQueryPhase.REQUEST_CONNECTION_WAIT, 0L);
                    if (currentTimeMillis < 0) {
                        this._brokerMetrics.addMeteredQueryValue(brokerRequest2, BrokerMeter.REQUEST_CONNECTION_TIMEOUTS, 1L);
                    }
                    if (1 != 0) {
                        if (0 != 0) {
                            this._brokerMetrics.addMeteredQueryValue(brokerRequest2, BrokerMeter.REQUEST_DROPPED_DUE_TO_SEND_ERROR, 1L);
                            return;
                        }
                        if (0 != 0) {
                            serverResponseFuture.cancel(true);
                        }
                        this._brokerMetrics.addMeteredQueryValue(brokerRequest2, BrokerMeter.REQUEST_DROPPED_DUE_TO_CONNECTION_ERROR, 1L);
                    }
                } catch (ConnectionLimitReachedException e2) {
                    ScatterGatherImpl.LOGGER.warn("Request {} not sent (gotConnection={}):{}. See metric {}", new Object[]{Long.valueOf(this._request.getRequestId()), false, e2.getMessage(), BrokerMeter.REQUEST_DROPPED_DUE_TO_CONNECTION_ERROR.getMeterName()});
                    this._responseFuture = new NettyClientConnection.ResponseFuture(this._server, e2, "Error Future for request " + this._request.getRequestId());
                    this._requestDispatchLatch.countDown();
                    BrokerRequest brokerRequest3 = this._request.getBrokerRequest();
                    this._brokerMetrics.addPhaseTiming(brokerRequest3, BrokerQueryPhase.REQUEST_CONNECTION_WAIT, 0L);
                    if (currentTimeMillis < 0) {
                        this._brokerMetrics.addMeteredQueryValue(brokerRequest3, BrokerMeter.REQUEST_CONNECTION_TIMEOUTS, 1L);
                    }
                    if (1 != 0) {
                        if (0 != 0) {
                            this._brokerMetrics.addMeteredQueryValue(brokerRequest3, BrokerMeter.REQUEST_DROPPED_DUE_TO_SEND_ERROR, 1L);
                            return;
                        }
                        if (0 != 0) {
                            serverResponseFuture.cancel(true);
                        }
                        this._brokerMetrics.addMeteredQueryValue(brokerRequest3, BrokerMeter.REQUEST_DROPPED_DUE_TO_CONNECTION_ERROR, 1L);
                    }
                } catch (Exception e3) {
                    ScatterGatherImpl.LOGGER.error("Got exception sending request ({})(gotConnection={}). Setting error future", new Object[]{Long.valueOf(this._request.getRequestId()), false, e3});
                    this._responseFuture = new NettyClientConnection.ResponseFuture(this._server, e3, "Error Future for request " + this._request.getRequestId());
                    this._requestDispatchLatch.countDown();
                    BrokerRequest brokerRequest4 = this._request.getBrokerRequest();
                    this._brokerMetrics.addPhaseTiming(brokerRequest4, BrokerQueryPhase.REQUEST_CONNECTION_WAIT, 0L);
                    if (currentTimeMillis < 0) {
                        this._brokerMetrics.addMeteredQueryValue(brokerRequest4, BrokerMeter.REQUEST_CONNECTION_TIMEOUTS, 1L);
                    }
                    if (1 != 0) {
                        if (0 != 0) {
                            this._brokerMetrics.addMeteredQueryValue(brokerRequest4, BrokerMeter.REQUEST_DROPPED_DUE_TO_SEND_ERROR, 1L);
                            return;
                        }
                        if (0 != 0) {
                            serverResponseFuture.cancel(true);
                        }
                        this._brokerMetrics.addMeteredQueryValue(brokerRequest4, BrokerMeter.REQUEST_DROPPED_DUE_TO_CONNECTION_ERROR, 1L);
                    }
                }
            } catch (Throwable th) {
                this._requestDispatchLatch.countDown();
                BrokerRequest brokerRequest5 = this._request.getBrokerRequest();
                this._brokerMetrics.addPhaseTiming(brokerRequest5, BrokerQueryPhase.REQUEST_CONNECTION_WAIT, 0L);
                if (currentTimeMillis < 0) {
                    this._brokerMetrics.addMeteredQueryValue(brokerRequest5, BrokerMeter.REQUEST_CONNECTION_TIMEOUTS, 1L);
                }
                if (1 != 0) {
                    if (0 != 0) {
                        this._brokerMetrics.addMeteredQueryValue(brokerRequest5, BrokerMeter.REQUEST_DROPPED_DUE_TO_SEND_ERROR, 1L);
                    } else {
                        if (0 != 0) {
                            serverResponseFuture.cancel(true);
                        }
                        this._brokerMetrics.addMeteredQueryValue(brokerRequest5, BrokerMeter.REQUEST_DROPPED_DUE_TO_CONNECTION_ERROR, 1L);
                    }
                }
                throw th;
            }
        }

        public synchronized void cancel() {
            if (this._isCancelled.get()) {
                return;
            }
            this._isCancelled.set(true);
            if (this._isSent.get()) {
                this._responseFuture.cancel(true);
            }
        }

        public ServerInstance getServer() {
            return this._server;
        }

        public NettyClientConnection.ResponseFuture getResponseFuture() {
            return this._responseFuture;
        }

        public boolean isSent() {
            return this._isSent.get();
        }
    }

    public ScatterGatherImpl(@Nonnull KeyedPool<PooledNettyClientResourceManager.PooledClientConnection> keyedPool, @Nonnull ExecutorService executorService) {
        this._connPool = keyedPool;
        this._executorService = executorService;
    }

    @Override // org.apache.pinot.transport.scattergather.ScatterGather
    @Nonnull
    public CompositeFuture<byte[]> scatterGather(@Nonnull ScatterGatherRequest scatterGatherRequest, @Nonnull ScatterGatherStats scatterGatherStats, @Nullable Boolean bool, @Nonnull BrokerMetrics brokerMetrics) throws InterruptedException {
        return sendRequest(new ScatterGatherRequestContext(scatterGatherRequest), scatterGatherStats, bool, brokerMetrics);
    }

    @Override // org.apache.pinot.transport.scattergather.ScatterGather
    @Nonnull
    public CompositeFuture<byte[]> scatterGather(@Nonnull ScatterGatherRequest scatterGatherRequest, @Nonnull ScatterGatherStats scatterGatherStats, @Nonnull BrokerMetrics brokerMetrics) throws InterruptedException {
        return scatterGather(scatterGatherRequest, scatterGatherStats, null, brokerMetrics);
    }

    private CompositeFuture<byte[]> sendRequest(ScatterGatherRequestContext scatterGatherRequestContext, ScatterGatherStats scatterGatherStats, Boolean bool, BrokerMetrics brokerMetrics) throws InterruptedException {
        ScatterGatherRequest scatterGatherRequest = scatterGatherRequestContext._request;
        Map<String, List<String>> routingTable = scatterGatherRequest.getRoutingTable();
        CountDownLatch countDownLatch = new CountDownLatch(routingTable.size());
        ArrayList<SingleRequestHandler> arrayList = new ArrayList(routingTable.size());
        for (Map.Entry<String, List<String>> entry : routingTable.entrySet()) {
            ServerInstance forInstanceName = ServerInstance.forInstanceName(entry.getKey());
            String shortHostName = forInstanceName.getShortHostName();
            if (bool != null) {
                shortHostName = bool.booleanValue() ? shortHostName + ScatterGatherStats.OFFLINE_TABLE_SUFFIX : shortHostName + ScatterGatherStats.REALTIME_TABLE_SUFFIX;
            }
            scatterGatherStats.initServer(shortHostName);
            SingleRequestHandler singleRequestHandler = new SingleRequestHandler(this._connPool, forInstanceName, scatterGatherRequest, entry.getValue(), scatterGatherRequestContext.getRemainingTimeMs(), countDownLatch, brokerMetrics);
            this._executorService.submit(singleRequestHandler);
            arrayList.add(singleRequestHandler);
        }
        CompositeFuture<byte[]> compositeFuture = new CompositeFuture<>("scatterRequest " + scatterGatherRequest.getRequestId(), CompositeFuture.GatherModeOnError.SHORTCIRCUIT_AND);
        long remainingTimeMs = scatterGatherRequestContext.getRemainingTimeMs();
        if (countDownLatch.await(remainingTimeMs, TimeUnit.MILLISECONDS)) {
            ArrayList arrayList2 = new ArrayList();
            for (SingleRequestHandler singleRequestHandler2 : arrayList) {
                arrayList2.add(singleRequestHandler2.getResponseFuture());
                String shortHostName2 = singleRequestHandler2.getServer().getShortHostName();
                if (bool != null) {
                    shortHostName2 = bool.booleanValue() ? shortHostName2 + ScatterGatherStats.OFFLINE_TABLE_SUFFIX : shortHostName2 + ScatterGatherStats.REALTIME_TABLE_SUFFIX;
                }
                scatterGatherStats.setSendStartTimeMillis(shortHostName2, singleRequestHandler2.getConnStartTimeMillis());
                scatterGatherStats.setConnStartTimeMillis(shortHostName2, singleRequestHandler2.getStartDelayMillis());
                scatterGatherStats.setSendCompletionTimeMillis(shortHostName2, singleRequestHandler2.getSendCompletionTimeMillis());
            }
            compositeFuture.start(arrayList2);
        } else {
            LOGGER.error("Request (" + scatterGatherRequest.getRequestId() + ") not sent completely within time (" + remainingTimeMs + " ms) !! Cancelling !!. NumSentFailed:" + countDownLatch.getCount());
            compositeFuture.start(null);
            for (SingleRequestHandler singleRequestHandler3 : arrayList) {
                LOGGER.info("Request {} to {} was sent successfully:{}, cancelling.", new Object[]{Long.valueOf(scatterGatherRequest.getRequestId()), singleRequestHandler3.getServer(), Boolean.valueOf(singleRequestHandler3.isSent())});
                singleRequestHandler3.cancel();
            }
        }
        return compositeFuture;
    }
}
