package org.apache.pinot.connector.presto;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.$internal.com.google.common.collect.ImmutableMap;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.PinotMetricUtils;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.connector.presto.plugin.metrics.NoopPinotMetricFactory;
import org.apache.pinot.core.transport.AsyncQueryResponse;
import org.apache.pinot.core.transport.QueryRouter;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerResponse;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;

/* loaded from: input_file:org/apache/pinot/connector/presto/PinotScatterGatherQueryClient.class */
public class PinotScatterGatherQueryClient {
    private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler();
    private static final String PRESTO_HOST_PREFIX = "presto-pinot-";
    private final BrokerMetrics _brokerMetrics;
    private final Config _config;
    private final Queue<QueryRouter> _queryRouters = new ConcurrentLinkedQueue();
    private final Map<String, AtomicInteger> _concurrentQueriesCountMap = new ConcurrentHashMap();
    private final String _prestoHostId = getDefaultPrestoId();

    /* loaded from: input_file:org/apache/pinot/connector/presto/PinotScatterGatherQueryClient$Config.class */
    public static class Config {
        private final int _threadPoolSize;
        private final int _maxBacklogPerServer;
        private TlsConfig _tlsConfig = new TlsConfig();

        @Deprecated
        private final long _idleTimeoutMillis;

        @Deprecated
        private final int _minConnectionsPerServer;

        @Deprecated
        private final int _maxConnectionsPerServer;

        public Config(Map<String, Object> map) {
            this._idleTimeoutMillis = Long.parseLong(map.get("idleTimeoutMillis").toString());
            this._threadPoolSize = Integer.parseInt(map.get("threadPoolSize").toString());
            this._minConnectionsPerServer = Integer.parseInt(map.get("minConnectionsPerServer").toString());
            this._maxBacklogPerServer = Integer.parseInt(map.get("maxBacklogPerServer").toString());
            this._maxConnectionsPerServer = Integer.parseInt(map.get("maxConnectionsPerServer").toString());
            this._tlsConfig.setClientAuthEnabled(Boolean.parseBoolean(map.get("isClientAuthEnabled").toString()));
            this._tlsConfig.setTrustStorePath(map.get("trustStorePath").toString());
            this._tlsConfig.setTrustStorePassword(map.get("trustStorePassword").toString());
            this._tlsConfig.setTrustStoreType(map.get("trustStoreType").toString());
            this._tlsConfig.setKeyStorePath(map.get("keyStorePath").toString());
            this._tlsConfig.setKeyStorePassword(map.get("keyStorePassword").toString());
            this._tlsConfig.setKeyStoreType(map.get("keyStoreType").toString());
            this._tlsConfig.setSslProvider(map.get("sslProvider").toString());
        }

        public Config(long j, int i, int i2, int i3, int i4) {
            this._idleTimeoutMillis = j;
            this._threadPoolSize = i;
            this._minConnectionsPerServer = i2;
            this._maxBacklogPerServer = i3;
            this._maxConnectionsPerServer = i4;
            this._tlsConfig.setClientAuthEnabled(false);
        }

        public int getThreadPoolSize() {
            return this._threadPoolSize;
        }

        public int getMaxBacklogPerServer() {
            return this._maxBacklogPerServer;
        }

        @Deprecated
        public long getIdleTimeoutMillis() {
            return this._idleTimeoutMillis;
        }

        @Deprecated
        public int getMinConnectionsPerServer() {
            return this._minConnectionsPerServer;
        }

        @Deprecated
        public int getMaxConnectionsPerServer() {
            return this._maxConnectionsPerServer;
        }

        public boolean isClientAuthEnabled() {
            return this._tlsConfig.isClientAuthEnabled();
        }

        public String getTrustStoreType() {
            return this._tlsConfig.getTrustStoreType();
        }

        public String getTrustStorePath() {
            return this._tlsConfig.getTrustStorePath();
        }

        public String getTrustStorePassword() {
            return this._tlsConfig.getTrustStorePassword();
        }

        public String getKeyStoreType() {
            return this._tlsConfig.getKeyStoreType();
        }

        public String getKeyStorePath() {
            return this._tlsConfig.getKeyStorePath();
        }

        public String getKeyStorePassword() {
            return this._tlsConfig.getKeyStorePassword();
        }

        public String getSslProvider() {
            return this._tlsConfig.getSslProvider();
        }
    }

    /* loaded from: input_file:org/apache/pinot/connector/presto/PinotScatterGatherQueryClient$ErrorCode.class */
    public enum ErrorCode {
        PINOT_INSUFFICIENT_SERVER_RESPONSE(true),
        PINOT_INVALID_SQL_GENERATED(false),
        PINOT_UNCLASSIFIED_ERROR(false),
        PINOT_QUERY_BACKLOG_FULL(false);

        private final boolean _retriable;

        ErrorCode(boolean z) {
            this._retriable = z;
        }

        public boolean isRetriable() {
            return this._retriable;
        }
    }

    /* loaded from: input_file:org/apache/pinot/connector/presto/PinotScatterGatherQueryClient$PinotException.class */
    public static class PinotException extends RuntimeException {
        private final ErrorCode _errorCode;

        public PinotException(ErrorCode errorCode, String str, Throwable th) {
            super(str, th);
            this._errorCode = errorCode;
        }

        public PinotException(ErrorCode errorCode, String str) {
            this(errorCode, str, null);
        }

        public ErrorCode getErrorCode() {
            return this._errorCode;
        }
    }

    public PinotScatterGatherQueryClient(Config config) {
        PinotMetricUtils.init(new PinotConfiguration(ImmutableMap.of(CommonConstants.CONFIG_OF_METRICS_FACTORY_CLASS_NAME, NoopPinotMetricFactory.class.getName())));
        this._brokerMetrics = new BrokerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
        this._brokerMetrics.initializeGlobalMeters();
        TlsConfig tlsConfig = getTlsConfig(config);
        for (int i = 0; i < config.getThreadPoolSize(); i++) {
            this._queryRouters.add(new QueryRouter(String.format("%s-%d", this._prestoHostId, Integer.valueOf(i)), this._brokerMetrics, tlsConfig));
        }
        this._config = config;
    }

    private TlsConfig getTlsConfig(Config config) {
        TlsConfig tlsConfig = new TlsConfig();
        tlsConfig.setClientAuthEnabled(config.isClientAuthEnabled());
        tlsConfig.setTrustStoreType(config.getTrustStoreType());
        tlsConfig.setTrustStorePath(config.getTrustStorePath());
        tlsConfig.setTrustStorePassword(config.getTrustStorePassword());
        tlsConfig.setKeyStoreType(config.getKeyStoreType());
        tlsConfig.setKeyStorePath(config.getKeyStorePath());
        tlsConfig.setKeyStorePassword(config.getKeyStorePassword());
        tlsConfig.setSslProvider(config.getSslProvider());
        return tlsConfig;
    }

    private static <T> T doWithRetries(int i, Function<Integer, T> function) {
        PinotException pinotException = null;
        for (int i2 = 0; i2 < i; i2++) {
            try {
                return function.apply(Integer.valueOf(i2));
            } catch (PinotException e) {
                if (pinotException == null) {
                    pinotException = e;
                }
                if (!e.getErrorCode().isRetriable()) {
                    throw e;
                }
            }
        }
        throw pinotException;
    }

    private String getDefaultPrestoId() {
        String str;
        try {
            str = PRESTO_HOST_PREFIX + InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            str = PRESTO_HOST_PREFIX;
        }
        return str;
    }

    public Map<ServerInstance, DataTable> queryPinotServerForDataTable(String str, String str2, List<String> list, long j, boolean z, int i) {
        try {
            BrokerRequest compileToBrokerRequest = REQUEST_COMPILER.compileToBrokerRequest(str);
            HashMap hashMap = new HashMap();
            hashMap.put(new ServerInstance(new InstanceConfig(str2)), new ArrayList(list));
            return (Map) doWithRetries(i, num -> {
                String extractRawTableName = TableNameBuilder.extractRawTableName(compileToBrokerRequest.getQuerySource().getTableName());
                if (!this._concurrentQueriesCountMap.containsKey(str2)) {
                    this._concurrentQueriesCountMap.put(str2, new AtomicInteger(0));
                }
                if (this._concurrentQueriesCountMap.get(str2).get() > this._config.getMaxBacklogPerServer()) {
                    throw new PinotException(ErrorCode.PINOT_QUERY_BACKLOG_FULL, "Reaching server query max backlog size is - " + this._config.getMaxBacklogPerServer());
                }
                this._concurrentQueriesCountMap.get(str2).incrementAndGet();
                QueryRouter nextAvailableQueryRouter = getNextAvailableQueryRouter();
                Map<ServerInstance, DataTable> gatherServerResponses = gatherServerResponses(z, hashMap, TableNameBuilder.getTableTypeFromTableName(compileToBrokerRequest.getQuerySource().getTableName()) == TableType.REALTIME ? nextAvailableQueryRouter.submitQuery(num.intValue(), extractRawTableName, null, null, compileToBrokerRequest, hashMap, j) : nextAvailableQueryRouter.submitQuery(num.intValue(), extractRawTableName, compileToBrokerRequest, hashMap, null, null, j), compileToBrokerRequest.getQuerySource().getTableName());
                this._queryRouters.offer(nextAvailableQueryRouter);
                this._concurrentQueriesCountMap.get(str2).decrementAndGet();
                return gatherServerResponses;
            });
        } catch (Exception e) {
            throw new PinotException(ErrorCode.PINOT_INVALID_SQL_GENERATED, String.format("Parsing error with on %s, Error = %s", str2, e.getMessage()), e);
        }
    }

    private QueryRouter getNextAvailableQueryRouter() {
        QueryRouter poll = this._queryRouters.poll();
        while (true) {
            QueryRouter queryRouter = poll;
            if (queryRouter != null) {
                return queryRouter;
            }
            try {
                Thread.sleep(200L);
            } catch (InterruptedException e) {
            }
            poll = this._queryRouters.poll();
        }
    }

    private Map<ServerInstance, DataTable> gatherServerResponses(boolean z, Map<ServerInstance, List<String>> map, AsyncQueryResponse asyncQueryResponse, String str) {
        try {
            Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getResponse();
            if (z || response.size() == map.size()) {
                HashMap hashMap = new HashMap();
                response.entrySet().forEach(entry -> {
                    hashMap.put(new ServerInstance(new InstanceConfig(String.format("Server_%s_%d", ((ServerRoutingInstance) entry.getKey()).getHostname(), Integer.valueOf(((ServerRoutingInstance) entry.getKey()).getPort())))), ((ServerResponse) entry.getValue()).getDataTable());
                });
                return hashMap;
            }
            HashMap hashMap2 = new HashMap();
            map.entrySet().forEach(entry2 -> {
                hashMap2.put(((ServerInstance) entry2.getKey()).toString(), ((List) entry2.getValue()).size() > 10 ? String.format("%d segments", Integer.valueOf(((List) entry2.getValue()).size())) : ((List) entry2.getValue()).toString());
            });
            throw new PinotException(ErrorCode.PINOT_INSUFFICIENT_SERVER_RESPONSE, String.format("%d of %d servers responded with routing table servers: %s, query stats: %s", Integer.valueOf(response.size()), Integer.valueOf(map.size()), hashMap2, asyncQueryResponse.getStats()));
        } catch (InterruptedException e) {
            throw new PinotException(ErrorCode.PINOT_UNCLASSIFIED_ERROR, String.format("Caught exception while fetching responses for table: %s", str), e);
        }
    }
}
