package org.apache.pinot.core.query.executor;

import com.google.common.base.Preconditions;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang.StringUtils;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.function.TransformFunctionType;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.FunctionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.plan.Plan;
import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.core.plan.maker.PlanMaker;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.config.QueryExecutorConfig;
import org.apache.pinot.core.query.pruner.SegmentPrunerService;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.TimerContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.core.query.utils.idset.IdSet;
import org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
import org.apache.pinot.core.util.QueryOptionsUtils;
import org.apache.pinot.core.util.trace.TraceContext;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.class */
public class ServerQueryExecutorV1Impl implements QueryExecutor {
    public static final String ENABLE_PREFETCH = "enable.prefetch";
    private static final String IN_PARTITIONED_SUBQUERY = "inPartitionedSubquery";
    private InstanceDataManager _instanceDataManager;
    private ServerMetrics _serverMetrics;
    private SegmentPrunerService _segmentPrunerService;
    private PlanMaker _planMaker;
    private long _defaultTimeoutMs = 15000;
    private boolean _enablePrefetch;
    private static final Logger LOGGER = LoggerFactory.getLogger(ServerQueryExecutorV1Impl.class);
    private static final DataTable EXPLAIN_PLAN_RESULTS_NO_MATCHING_SEGMENT = getExplainPlanResultsForNoMatchingSegment();

    @Override // org.apache.pinot.core.query.executor.QueryExecutor
    public synchronized void init(PinotConfiguration pinotConfiguration, InstanceDataManager instanceDataManager, ServerMetrics serverMetrics) throws ConfigurationException {
        this._instanceDataManager = instanceDataManager;
        this._serverMetrics = serverMetrics;
        QueryExecutorConfig queryExecutorConfig = new QueryExecutorConfig(pinotConfiguration);
        LOGGER.info("Trying to build SegmentPrunerService");
        this._segmentPrunerService = new SegmentPrunerService(queryExecutorConfig.getPrunerConfig());
        LOGGER.info("Trying to build QueryPlanMaker");
        this._planMaker = new InstancePlanMakerImplV2(queryExecutorConfig);
        if (queryExecutorConfig.getTimeOut() > 0) {
            this._defaultTimeoutMs = queryExecutorConfig.getTimeOut();
        }
        this._enablePrefetch = Boolean.parseBoolean(pinotConfiguration.getProperty(ENABLE_PREFETCH));
        LOGGER.info("Initialized query executor with defaultTimeoutMs: {}, enablePrefetch: {}", Long.valueOf(this._defaultTimeoutMs), Boolean.valueOf(this._enablePrefetch));
    }

    @Override // org.apache.pinot.core.query.executor.QueryExecutor
    public synchronized void start() {
        LOGGER.info("Query executor started");
    }

    @Override // org.apache.pinot.core.query.executor.QueryExecutor
    public synchronized void shutDown() {
        LOGGER.info("Query executor shut down");
    }

    @Override // org.apache.pinot.core.query.executor.QueryExecutor
    public DataTable processQuery(ServerQueryRequest serverQueryRequest, ExecutorService executorService, @Nullable StreamObserver<Server.ServerResponse> streamObserver) {
        TimerContext timerContext = serverQueryRequest.getTimerContext();
        TimerContext.Timer phaseTimer = timerContext.getPhaseTimer(ServerQueryPhase.SCHEDULER_WAIT);
        if (phaseTimer != null) {
            phaseTimer.stopAndRecord();
        }
        TimerContext.Timer startNewPhaseTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PROCESSING);
        long requestId = serverQueryRequest.getRequestId();
        String tableNameWithType = serverQueryRequest.getTableNameWithType();
        QueryContext queryContext = serverQueryRequest.getQueryContext();
        LOGGER.debug("Incoming request Id: {}, query: {}", Long.valueOf(requestId), queryContext);
        long j = this._defaultTimeoutMs;
        Long timeoutMs = QueryOptionsUtils.getTimeoutMs(queryContext.getQueryOptions());
        if (timeoutMs != null) {
            j = timeoutMs.longValue();
        }
        long queryArrivalTimeMs = timerContext.getQueryArrivalTimeMs();
        queryContext.setEndTimeMs(timerContext.getQueryArrivalTimeMs() + j);
        queryContext.setEnablePrefetch(this._enablePrefetch);
        long currentTimeMillis = System.currentTimeMillis() - queryArrivalTimeMs;
        if (currentTimeMillis >= j) {
            this._serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.SCHEDULING_TIMEOUT_EXCEPTIONS, 1L);
            String format = String.format("Query scheduling took %dms (longer than query timeout of %dms) on server: %s", Long.valueOf(currentTimeMillis), Long.valueOf(j), this._instanceDataManager.getInstanceId());
            DataTable emptyDataTable = DataTableBuilder.getEmptyDataTable();
            emptyDataTable.addException(QueryException.getException(QueryException.QUERY_SCHEDULING_TIMEOUT_ERROR, format));
            LOGGER.error("{} while processing requestId: {}", format, Long.valueOf(requestId));
            return emptyDataTable;
        }
        TableDataManager tableDataManager = this._instanceDataManager.getTableDataManager(tableNameWithType);
        if (tableDataManager == null) {
            String format2 = String.format("Failed to find table: %s on server: %s", tableNameWithType, this._instanceDataManager.getInstanceId());
            DataTable emptyDataTable2 = DataTableBuilder.getEmptyDataTable();
            emptyDataTable2.addException(QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR, format2));
            LOGGER.error("{} while processing requestId: {}", format2, Long.valueOf(requestId));
            return emptyDataTable2;
        }
        List<String> segmentsToQuery = serverQueryRequest.getSegmentsToQuery();
        ArrayList arrayList = new ArrayList();
        List acquireSegments = tableDataManager.acquireSegments(segmentsToQuery, arrayList);
        int size = acquireSegments.size();
        ArrayList arrayList2 = new ArrayList(size);
        Iterator it = acquireSegments.iterator();
        while (it.hasNext()) {
            arrayList2.add(((SegmentDataManager) it.next()).getSegment());
        }
        int i = 0;
        long j2 = Long.MAX_VALUE;
        long j3 = Long.MAX_VALUE;
        for (IndexSegment indexSegment : arrayList2) {
            if (indexSegment instanceof MutableSegment) {
                i++;
                SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
                long lastIndexedTimestamp = segmentMetadata.getLastIndexedTimestamp();
                if (lastIndexedTimestamp != Long.MIN_VALUE && lastIndexedTimestamp < j2) {
                    j2 = lastIndexedTimestamp;
                }
                long latestIngestionTimestamp = segmentMetadata.getLatestIngestionTimestamp();
                if (latestIngestionTimestamp != Long.MIN_VALUE && latestIngestionTimestamp < j3) {
                    j3 = latestIngestionTimestamp;
                }
            }
        }
        boolean isEnableTrace = serverQueryRequest.isEnableTrace();
        if (isEnableTrace) {
            TraceContext.register(requestId);
        }
        DataTable dataTable = null;
        try {
            try {
                dataTable = processQuery(arrayList2, queryContext, timerContext, executorService, streamObserver, serverQueryRequest.isEnableStreaming(), serverQueryRequest.isExplain());
                Iterator it2 = acquireSegments.iterator();
                while (it2.hasNext()) {
                    tableDataManager.releaseSegment((SegmentDataManager) it2.next());
                }
                if (isEnableTrace) {
                    if (dataTable != null) {
                        dataTable.getMetadata().put(DataTable.MetadataKey.TRACE_INFO.getName(), TraceContext.getTraceInfo());
                    }
                    TraceContext.unregister();
                }
            } catch (Exception e) {
                this._serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1L);
                if (e instanceof BadQueryRequestException) {
                    LOGGER.info("Caught BadQueryRequestException while processing requestId: {}, {}", Long.valueOf(requestId), e.getMessage());
                } else {
                    LOGGER.error("Exception processing requestId {}", Long.valueOf(requestId), e);
                }
                dataTable = DataTableBuilder.getEmptyDataTable();
                dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
                Iterator it3 = acquireSegments.iterator();
                while (it3.hasNext()) {
                    tableDataManager.releaseSegment((SegmentDataManager) it3.next());
                }
                if (isEnableTrace) {
                    if (dataTable != null) {
                        dataTable.getMetadata().put(DataTable.MetadataKey.TRACE_INFO.getName(), TraceContext.getTraceInfo());
                    }
                    TraceContext.unregister();
                }
            }
            startNewPhaseTimer.stopAndRecord();
            long durationMs = startNewPhaseTimer.getDurationMs();
            Map metadata = dataTable.getMetadata();
            metadata.put(DataTable.MetadataKey.NUM_SEGMENTS_QUERIED.getName(), Integer.toString(size));
            metadata.put(DataTable.MetadataKey.TIME_USED_MS.getName(), Long.toString(durationMs));
            int size2 = arrayList.size();
            if (size2 != 0) {
                dataTable.addException(QueryException.getException(QueryException.SERVER_SEGMENT_MISSING_ERROR, String.format("%d segments %s missing on server: %s", Integer.valueOf(size2), arrayList, this._instanceDataManager.getInstanceId())));
                this._serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_MISSING_SEGMENTS, size2);
            }
            if (i > 0) {
                long j4 = j3 != Long.MAX_VALUE ? j3 : j2;
                LOGGER.debug("Request {} queried {} consuming segments with minConsumingFreshnessTimeMs: {}", new Object[]{Long.valueOf(requestId), Integer.valueOf(i), Long.valueOf(j4)});
                metadata.put(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), Integer.toString(i));
                metadata.put(DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), Long.toString(j4));
            }
            LOGGER.debug("Query processing time for request Id - {}: {}", Long.valueOf(requestId), Long.valueOf(durationMs));
            LOGGER.debug("InstanceResponse for request Id - {}: {}", Long.valueOf(requestId), dataTable);
            return dataTable;
        } catch (Throwable th) {
            Iterator it4 = acquireSegments.iterator();
            while (it4.hasNext()) {
                tableDataManager.releaseSegment((SegmentDataManager) it4.next());
            }
            if (isEnableTrace) {
                if (dataTable != null) {
                    dataTable.getMetadata().put(DataTable.MetadataKey.TRACE_INFO.getName(), TraceContext.getTraceInfo());
                }
                TraceContext.unregister();
            }
            throw th;
        }
    }

    private DataTable processQuery(List<IndexSegment> list, QueryContext queryContext, TimerContext timerContext, ExecutorService executorService, @Nullable StreamObserver<Server.ServerResponse> streamObserver, boolean z, boolean z2) throws Exception {
        handleSubquery(queryContext, list, timerContext, executorService);
        long j = 0;
        while (list.iterator().hasNext()) {
            j += r0.next().getSegmentMetadata().getTotalDocs();
        }
        TimerContext.Timer startNewPhaseTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
        List<IndexSegment> prune = this._segmentPrunerService.prune(list, queryContext);
        startNewPhaseTimer.stopAndRecord();
        int size = prune.size();
        LOGGER.debug("Matched {} segments after pruning", Integer.valueOf(size));
        if (size != 0) {
            TimerContext.Timer startNewPhaseTimer2 = timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
            Plan makeStreamingInstancePlan = z ? this._planMaker.makeStreamingInstancePlan(prune, queryContext, executorService, streamObserver) : this._planMaker.makeInstancePlan(prune, queryContext, executorService);
            startNewPhaseTimer2.stopAndRecord();
            TimerContext.Timer startNewPhaseTimer3 = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
            DataTable processExplainPlanQueries = z2 ? processExplainPlanQueries(makeStreamingInstancePlan) : makeStreamingInstancePlan.execute();
            startNewPhaseTimer3.stopAndRecord();
            processExplainPlanQueries.getMetadata().put(DataTable.MetadataKey.TOTAL_DOCS.getName(), Long.toString(j));
            return processExplainPlanQueries;
        }
        if (z2) {
            return EXPLAIN_PLAN_RESULTS_NO_MATCHING_SEGMENT;
        }
        DataTable buildEmptyDataTable = DataTableUtils.buildEmptyDataTable(queryContext);
        Map metadata = buildEmptyDataTable.getMetadata();
        metadata.put(DataTable.MetadataKey.TOTAL_DOCS.getName(), String.valueOf(j));
        metadata.put(DataTable.MetadataKey.NUM_DOCS_SCANNED.getName(), TimeHandler.DEFAULT_PARTITION);
        metadata.put(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), TimeHandler.DEFAULT_PARTITION);
        metadata.put(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), TimeHandler.DEFAULT_PARTITION);
        metadata.put(DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), TimeHandler.DEFAULT_PARTITION);
        metadata.put(DataTable.MetadataKey.NUM_SEGMENTS_MATCHED.getName(), TimeHandler.DEFAULT_PARTITION);
        return buildEmptyDataTable;
    }

    private static DataTable getExplainPlanResultsForNoMatchingSegment() {
        DataTableBuilder dataTableBuilder = new DataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
        try {
            dataTableBuilder.startRow();
            dataTableBuilder.setColumn(0, "NO_MATCHING_SEGMENT");
            dataTableBuilder.setColumn(1, 1);
            dataTableBuilder.setColumn(2, 0);
            dataTableBuilder.finishRow();
        } catch (IOException e) {
            LOGGER.error("Unable to create EXPLAIN PLAN result table.", e);
        }
        return dataTableBuilder.build();
    }

    public static DataTable processExplainPlanQueries(Plan plan) {
        DataTableBuilder dataTableBuilder = new DataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
        try {
            addOperatorToTable(dataTableBuilder, plan.getPlanNode().run().getChildOperators().get(0), new int[]{1}, 0);
        } catch (IOException e) {
            LOGGER.error("Unable to create EXPLAIN PLAN result table.", e);
        }
        return dataTableBuilder.build();
    }

    public static void addOperatorToTable(DataTableBuilder dataTableBuilder, Operator operator, int[] iArr, int i) throws IOException {
        if (operator == null) {
            return;
        }
        String explainString = operator.toExplainString();
        if (explainString != null) {
            dataTableBuilder.startRow();
            dataTableBuilder.setColumn(0, explainString);
            dataTableBuilder.setColumn(1, iArr[0]);
            dataTableBuilder.setColumn(2, i);
            dataTableBuilder.finishRow();
            int i2 = iArr[0];
            iArr[0] = i2 + 1;
            i = i2;
        }
        Iterator<Operator> it = operator.getChildOperators().iterator();
        while (it.hasNext()) {
            addOperatorToTable(dataTableBuilder, it.next(), iArr, i);
        }
    }

    private void handleSubquery(QueryContext queryContext, List<IndexSegment> list, TimerContext timerContext, ExecutorService executorService) throws Exception {
        FilterContext filter = queryContext.getFilter();
        if (filter != null) {
            handleSubquery(filter, list, timerContext, executorService, queryContext.getEndTimeMs());
        }
    }

    private void handleSubquery(FilterContext filterContext, List<IndexSegment> list, TimerContext timerContext, ExecutorService executorService, long j) throws Exception {
        List children = filterContext.getChildren();
        if (children == null) {
            handleSubquery(filterContext.getPredicate().getLhs(), list, timerContext, executorService, j);
            return;
        }
        Iterator it = children.iterator();
        while (it.hasNext()) {
            handleSubquery((FilterContext) it.next(), list, timerContext, executorService, j);
        }
    }

    private void handleSubquery(ExpressionContext expressionContext, List<IndexSegment> list, TimerContext timerContext, ExecutorService executorService, long j) throws Exception {
        FunctionContext function = expressionContext.getFunction();
        if (function == null) {
            return;
        }
        List arguments = function.getArguments();
        if (!StringUtils.remove(function.getFunctionName(), '_').equalsIgnoreCase(IN_PARTITIONED_SUBQUERY)) {
            Iterator it = arguments.iterator();
            while (it.hasNext()) {
                handleSubquery((ExpressionContext) it.next(), list, timerContext, executorService, j);
            }
            return;
        }
        Preconditions.checkState(arguments.size() == 2, "IN_PARTITIONED_SUBQUERY requires 2 arguments: expression, subquery");
        ExpressionContext expressionContext2 = (ExpressionContext) arguments.get(1);
        Preconditions.checkState(expressionContext2.getType() == ExpressionContext.Type.LITERAL, "Second argument of IN_PARTITIONED_SUBQUERY must be a literal (subquery)");
        QueryContext queryContextFromSQL = QueryContextConverterUtils.getQueryContextFromSQL(expressionContext2.getLiteral());
        AggregationFunction[] aggregationFunctions = queryContextFromSQL.getAggregationFunctions();
        Preconditions.checkState(aggregationFunctions != null && aggregationFunctions.length == 1 && aggregationFunctions[0].getType() == AggregationFunctionType.IDSET && queryContextFromSQL.getGroupByExpressions() == null, "Subquery in IN_PARTITIONED_SUBQUERY should be an ID_SET aggregation only query, found: %s", expressionContext2.getLiteral());
        queryContextFromSQL.setEndTimeMs(j);
        String base64String = ((IdSet) processQuery(list, queryContextFromSQL, timerContext, executorService, null, false, false).getObject(0, 0)).toBase64String();
        function.setFunctionName(TransformFunctionType.INIDSET.name());
        arguments.set(1, ExpressionContext.forLiteral(base64String));
    }
}
