package org.apache.druid.query.scan;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.derby.impl.store.raw.log.LogCounter;
import org.apache.druid.collections.StableLimitingSorter;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.SinkQueryRunners;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.Segment;
import org.joda.time.Interval;

/* loaded from: input_file:org/apache/druid/query/scan/ScanQueryRunnerFactory.class */
public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValue, ScanQuery> {
    private final ScanQueryQueryToolChest toolChest;
    private final ScanQueryEngine engine;
    private final ScanQueryConfig scanQueryConfig;

    /* loaded from: input_file:org/apache/druid/query/scan/ScanQueryRunnerFactory$ScanQueryRunner.class */
    private static class ScanQueryRunner implements QueryRunner<ScanResultValue> {
        private final ScanQueryEngine engine;
        private final Segment segment;

        public ScanQueryRunner(ScanQueryEngine scanQueryEngine, Segment segment) {
            this.engine = scanQueryEngine;
            this.segment = segment;
        }

        @Override // org.apache.druid.query.QueryRunner
        public Sequence<ScanResultValue> run(QueryPlus<ScanResultValue> queryPlus, ResponseContext responseContext) {
            Query<ScanResultValue> query = queryPlus.getQuery();
            if (!(query instanceof ScanQuery)) {
                throw new ISE("Got a [%s] which isn't a %s", query.getClass(), ScanQuery.class);
            }
            Number number = (Number) responseContext.get(ResponseContext.Key.TIMEOUT_AT);
            if (number == null || number.longValue() == 0) {
                responseContext.put(ResponseContext.Key.TIMEOUT_AT, Long.valueOf(JodaUtils.MAX_INSTANT));
            }
            return this.engine.process((ScanQuery) query, this.segment, responseContext);
        }
    }

    @Inject
    public ScanQueryRunnerFactory(ScanQueryQueryToolChest scanQueryQueryToolChest, ScanQueryEngine scanQueryEngine, ScanQueryConfig scanQueryConfig) {
        this.toolChest = scanQueryQueryToolChest;
        this.engine = scanQueryEngine;
        this.scanQueryConfig = scanQueryConfig;
    }

    @Override // org.apache.druid.query.QueryRunnerFactory
    public QueryRunner<ScanResultValue> createRunner(Segment segment) {
        return new ScanQueryRunner(this.engine, segment);
    }

    @Override // org.apache.druid.query.QueryRunnerFactory
    public QueryRunner<ScanResultValue> mergeRunners(ExecutorService executorService, Iterable<QueryRunner<ScanResultValue>> iterable) {
        return (queryPlus, responseContext) -> {
            ScanQuery scanQuery = (ScanQuery) queryPlus.getQuery();
            responseContext.put(ResponseContext.Key.TIMEOUT_AT, Long.valueOf(System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery())));
            if (scanQuery.getOrder().equals(ScanQuery.Order.NONE)) {
                Sequence concat = Sequences.concat(Sequences.map(Sequences.simple(iterable), queryRunner -> {
                    return queryRunner.run(queryPlus, responseContext);
                }));
                return scanQuery.getScanRowsLimit() <= LogCounter.MAX_LOGFILE_NUMBER ? concat.limit(Math.toIntExact(scanQuery.getScanRowsLimit())) : concat;
            }
            List<Interval> intervalsFromSpecificQuerySpec = getIntervalsFromSpecificQuerySpec(scanQuery.getQuerySegmentSpec());
            ArrayList newArrayList = Lists.newArrayList(iterable);
            if (scanQuery.getOrder().equals(ScanQuery.Order.DESCENDING)) {
                intervalsFromSpecificQuerySpec = Lists.reverse(intervalsFromSpecificQuerySpec);
                newArrayList = Lists.reverse(newArrayList);
            }
            int maxRowsQueuedForOrdering = scanQuery.getMaxRowsQueuedForOrdering() == null ? this.scanQueryConfig.getMaxRowsQueuedForOrdering() : scanQuery.getMaxRowsQueuedForOrdering().intValue();
            if (scanQuery.getScanRowsLimit() <= maxRowsQueuedForOrdering) {
                try {
                    return stableLimitingSort(Sequences.concat(Sequences.map(Sequences.simple(newArrayList), queryRunner2 -> {
                        return queryRunner2.run(queryPlus, responseContext);
                    })), scanQuery, intervalsFromSpecificQuerySpec);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            ArrayList arrayList = new ArrayList();
            if (intervalsFromSpecificQuerySpec.size() == newArrayList.size()) {
                for (int i = 0; i < newArrayList.size(); i++) {
                    arrayList.add(new Pair(intervalsFromSpecificQuerySpec.get(i), newArrayList.get(i)));
                }
            } else {
                if (!(iterable instanceof SinkQueryRunners)) {
                    throw new ISE("Number of segment descriptors does not equal number of query runners...something went wrong!", new Object[0]);
                }
                Iterator runnerIntervalMappingIterator = ((SinkQueryRunners) iterable).runnerIntervalMappingIterator();
                arrayList.getClass();
                runnerIntervalMappingIterator.forEachRemaining((v1) -> {
                    r1.add(v1);
                });
            }
            LinkedHashMap linkedHashMap = (LinkedHashMap) arrayList.stream().collect(Collectors.groupingBy(pair -> {
                return (Interval) pair.lhs;
            }, LinkedHashMap::new, Collectors.toList()));
            int intValue = ((Integer) linkedHashMap.values().stream().map(list -> {
                return Integer.valueOf(list.size());
            }).max(Comparator.comparing((v0) -> {
                return Integer.valueOf(v0);
            })).get()).intValue();
            int maxSegmentPartitionsOrderedInMemory = scanQuery.getMaxSegmentPartitionsOrderedInMemory() == null ? this.scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory() : scanQuery.getMaxSegmentPartitionsOrderedInMemory().intValue();
            if (intValue <= maxSegmentPartitionsOrderedInMemory) {
                return nWayMergeAndLimit((List) linkedHashMap.entrySet().stream().map(entry -> {
                    return (List) ((List) entry.getValue()).stream().map(pair2 -> {
                        return (QueryRunner) pair2.rhs;
                    }).collect(Collectors.toList());
                }).collect(Collectors.toList()), queryPlus, responseContext);
            }
            throw ResourceLimitExceededException.withMessage("Time ordering is not supported for a Scan query with %,d segments per time chunk and a row limit of %,d. Try reducing your query limit below maxRowsQueuedForOrdering (currently %,d), or using compaction to reduce the number of segments per time chunk, or raising maxSegmentPartitionsOrderedInMemory (currently %,d) above the number of segments you have per time chunk.", Integer.valueOf(intValue), Long.valueOf(scanQuery.getScanRowsLimit()), Integer.valueOf(maxRowsQueuedForOrdering), Integer.valueOf(maxSegmentPartitionsOrderedInMemory));
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public Sequence<ScanResultValue> stableLimitingSort(Sequence<ScanResultValue> sequence, ScanQuery scanQuery, List<Interval> list) throws IOException {
        Ordering<ScanResultValue> resultOrdering = scanQuery.getResultOrdering();
        if (scanQuery.getScanRowsLimit() > LogCounter.MAX_LOGFILE_NUMBER) {
            throw new UOE("Limit of %,d rows not supported for priority queue strategy of time-ordering scan results", Long.valueOf(scanQuery.getScanRowsLimit()));
        }
        int intExact = Math.toIntExact(scanQuery.getScanRowsLimit());
        StableLimitingSorter stableLimitingSorter = new StableLimitingSorter(resultOrdering, intExact);
        Yielder each = Yielders.each(sequence);
        try {
            boolean isDone = each.isDone();
            int i = 0;
            Interval interval = null;
            while (!isDone) {
                ScanResultValue scanResultValue = (ScanResultValue) each.get();
                for (ScanResultValue scanResultValue2 : scanResultValue.toSingleEventScanResultValues()) {
                    i++;
                    stableLimitingSorter.add(scanResultValue2);
                    if (i > intExact && interval == null) {
                        long firstEventTimestamp = scanResultValue2.getFirstEventTimestamp(scanQuery.getResultFormat());
                        for (Interval interval2 : list) {
                            if (interval2.contains(firstEventTimestamp)) {
                                interval = interval2;
                            }
                        }
                        if (interval == null) {
                            throw new ISE("Row came from an unscanned interval", new Object[0]);
                        }
                    }
                }
                each = each.next(null);
                isDone = each.isDone() || !(interval == null || interval.contains(scanResultValue.getFirstEventTimestamp(scanQuery.getResultFormat())));
            }
            ArrayList arrayList = new ArrayList(stableLimitingSorter.size());
            Iterators.addAll(arrayList, stableLimitingSorter.drain());
            Sequence<ScanResultValue> simple = Sequences.simple(arrayList);
            each.close();
            return simple;
        } catch (Throwable th) {
            each.close();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public List<Interval> getIntervalsFromSpecificQuerySpec(QuerySegmentSpec querySegmentSpec) {
        List<Interval> singletonList;
        if (querySegmentSpec instanceof MultipleSpecificSegmentSpec) {
            singletonList = (List) ((MultipleSpecificSegmentSpec) querySegmentSpec).getDescriptors().stream().map((v0) -> {
                return v0.getInterval();
            }).collect(Collectors.toList());
        } else {
            if (!(querySegmentSpec instanceof SpecificSegmentSpec)) {
                throw new UOE("Time-ordering on scan queries is only supported for queries with segment specs of type MultipleSpecificSegmentSpec or SpecificSegmentSpec...a [%s] was received instead.", querySegmentSpec.getClass().getSimpleName());
            }
            singletonList = Collections.singletonList(((SpecificSegmentSpec) querySegmentSpec).getDescriptor().getInterval());
        }
        return singletonList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public Sequence<ScanResultValue> nWayMergeAndLimit(List<List<QueryRunner<ScanResultValue>>> list, QueryPlus<ScanResultValue> queryPlus, ResponseContext responseContext) {
        Sequence<ScanResultValue> concat = Sequences.concat(Sequences.map(Sequences.simple(list), list2 -> {
            return Sequences.map(Sequences.simple(list2), queryRunner -> {
                return Sequences.concat(Sequences.map(queryRunner.run(queryPlus, responseContext), scanResultValue -> {
                    return Sequences.simple(scanResultValue.toSingleEventScanResultValues());
                }));
            }).flatMerge(sequence -> {
                return sequence;
            }, queryPlus.getQuery().getResultOrdering());
        }));
        long scanRowsLimit = ((ScanQuery) queryPlus.getQuery()).getScanRowsLimit();
        return scanRowsLimit == Long.MAX_VALUE ? concat : concat.limit(scanRowsLimit);
    }

    @Override // org.apache.druid.query.QueryRunnerFactory
    public QueryToolChest<ScanResultValue, ScanQuery> getToolchest() {
        return this.toolChest;
    }
}
