package org.apache.pinot.core.query.scheduler;

import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metrics.PinotMetricUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.query.scheduler.resources.PolicyBasedResourceManager;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.core.query.scheduler.resources.UnboundedResourceManager;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/core/query/scheduler/MultiLevelPriorityQueueTest.class */
public class MultiLevelPriorityQueueTest {
    private static final ServerMetrics METRICS = new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
    private static final SchedulerGroupMapper GROUP_MAPPER = new TableBasedGroupMapper();
    private static final TestSchedulerGroupFactory GROUP_FACTORY = new TestSchedulerGroupFactory();
    private static final String GROUP_ONE = "1";
    private static final String GROUP_TWO = "2";

    /* loaded from: input_file:org/apache/pinot/core/query/scheduler/MultiLevelPriorityQueueTest$QueueReader.class */
    class QueueReader {
        private final MultiLevelPriorityQueue _queue;
        CyclicBarrier _startBarrier = new CyclicBarrier(2);
        CountDownLatch _readDoneSignal = new CountDownLatch(1);
        ConcurrentLinkedQueue<SchedulerQueryContext> _readQueries = new ConcurrentLinkedQueue<>();
        Thread _reader;

        QueueReader(final MultiLevelPriorityQueue multiLevelPriorityQueue) {
            Preconditions.checkNotNull(multiLevelPriorityQueue);
            this._queue = multiLevelPriorityQueue;
            this._reader = new Thread(new Runnable() { // from class: org.apache.pinot.core.query.scheduler.MultiLevelPriorityQueueTest.QueueReader.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        QueueReader.this._startBarrier.await();
                        QueueReader.this._readQueries.add(multiLevelPriorityQueue.take());
                        try {
                            QueueReader.this._readDoneSignal.countDown();
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    } catch (Exception e2) {
                        throw new RuntimeException(e2);
                    }
                }
            });
        }

        void startAndWaitForRead() throws BrokenBarrierException, InterruptedException {
            this._reader.start();
            this._startBarrier.await();
            this._readDoneSignal.await();
        }

        void startAndWaitForQueueWakeup() throws InterruptedException, BrokenBarrierException {
            this._reader.start();
            this._startBarrier.await();
            this._readDoneSignal.await(this._queue.getWakeupTimeMicros() + TimeUnit.MICROSECONDS.convert(10L, TimeUnit.MILLISECONDS), TimeUnit.MICROSECONDS);
        }
    }

    @BeforeMethod
    public void beforeMethod() {
        GROUP_FACTORY.reset();
    }

    @Test
    public void testSimplePutTake() throws OutOfCapacityException {
        MultiLevelPriorityQueue createQueue = createQueue();
        createQueue.put(TestHelper.createQueryRequest(GROUP_ONE, METRICS));
        createQueue.put(TestHelper.createQueryRequest(GROUP_TWO, METRICS));
        createQueue.put(TestHelper.createQueryRequest(GROUP_ONE, METRICS));
        Assert.assertEquals(GROUP_FACTORY._numCalls.get(), 2);
        Assert.assertEquals(createQueue.take().getSchedulerGroup().name(), GROUP_ONE);
        Assert.assertEquals(createQueue.take().getSchedulerGroup().name(), GROUP_ONE);
        Assert.assertEquals(createQueue.take().getSchedulerGroup().name(), GROUP_TWO);
    }

    @Test
    public void testPutOutOfCapacity() throws OutOfCapacityException {
        HashMap hashMap = new HashMap();
        hashMap.put("max_pending_per_group", 2);
        PinotConfiguration pinotConfiguration = new PinotConfiguration(hashMap);
        UnboundedResourceManager unboundedResourceManager = new UnboundedResourceManager(pinotConfiguration);
        MultiLevelPriorityQueue createQueue = createQueue(pinotConfiguration, unboundedResourceManager);
        createQueue.put(TestHelper.createQueryRequest(GROUP_ONE, METRICS));
        GROUP_FACTORY._groupMap.get(GROUP_ONE).addReservedThreads(unboundedResourceManager.getTableThreadsHardLimit());
        createQueue.put(TestHelper.createQueryRequest(GROUP_ONE, METRICS));
        Assert.assertTrue(true);
        try {
            createQueue.put(TestHelper.createQueryRequest(GROUP_ONE, METRICS));
            Assert.assertTrue(false);
        } catch (OutOfCapacityException e) {
            Assert.assertTrue(true);
        }
    }

    @Test
    public void testPutForBlockedReader() throws Exception {
        MultiLevelPriorityQueue createQueue = createQueue();
        QueueReader queueReader = new QueueReader(createQueue);
        queueReader.startAndWaitForQueueWakeup();
        Assert.assertTrue(queueReader._reader.isAlive());
        Assert.assertEquals(queueReader._readQueries.size(), 0);
        sleepForQueueWakeup(createQueue);
        createQueue.put(TestHelper.createQueryRequest(GROUP_ONE, METRICS));
        sleepForQueueWakeup(createQueue);
        Assert.assertEquals(queueReader._readQueries.size(), 1);
    }

    @Test
    public void testTakeWithLimits() throws OutOfCapacityException, BrokenBarrierException, InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("query_worker_threads", 40);
        hashMap.put("query_runner_threads", 10);
        hashMap.put("table_threads_soft_limit_pct", 20);
        hashMap.put("table_threads_hard_limit_pct", 80);
        PinotConfiguration pinotConfiguration = new PinotConfiguration(hashMap);
        PolicyBasedResourceManager policyBasedResourceManager = new PolicyBasedResourceManager(pinotConfiguration);
        MultiLevelPriorityQueue createQueue = createQueue(pinotConfiguration, policyBasedResourceManager);
        createQueue.put(TestHelper.createQueryRequest(GROUP_ONE, METRICS));
        createQueue.put(TestHelper.createQueryRequest(GROUP_ONE, METRICS));
        createQueue.put(TestHelper.createQueryRequest(GROUP_TWO, METRICS));
        TestSchedulerGroup testSchedulerGroup = GROUP_FACTORY._groupMap.get(GROUP_ONE);
        TestSchedulerGroup testSchedulerGroup2 = GROUP_FACTORY._groupMap.get(GROUP_TWO);
        testSchedulerGroup.addReservedThreads(policyBasedResourceManager.getTableThreadsSoftLimit() + 1);
        QueueReader queueReader = new QueueReader(createQueue);
        queueReader.startAndWaitForRead();
        Assert.assertEquals(queueReader._readQueries.size(), 1);
        Assert.assertEquals(queueReader._readQueries.poll().getSchedulerGroup().name(), GROUP_TWO);
        createQueue.put(TestHelper.createQueryRequest(GROUP_TWO, METRICS));
        QueueReader queueReader2 = new QueueReader(createQueue);
        queueReader2.startAndWaitForRead();
        Assert.assertEquals(queueReader2._readQueries.size(), 1);
        Assert.assertEquals(queueReader2._readQueries.poll().getSchedulerGroup().name(), GROUP_TWO);
        createQueue.put(TestHelper.createQueryRequest(GROUP_TWO, METRICS));
        testSchedulerGroup2.addReservedThreads(testSchedulerGroup.totalReservedThreads() + 1);
        QueueReader queueReader3 = new QueueReader(createQueue);
        queueReader3.startAndWaitForRead();
        Assert.assertEquals(queueReader3._readQueries.size(), 1);
        Assert.assertEquals(queueReader3._readQueries.poll().getSchedulerGroup().name(), GROUP_ONE);
        testSchedulerGroup.addReservedThreads(policyBasedResourceManager.getTableThreadsHardLimit());
        QueueReader queueReader4 = new QueueReader(createQueue);
        queueReader4.startAndWaitForRead();
        Assert.assertEquals(queueReader4._readQueries.size(), 1);
        Assert.assertEquals(queueReader4._readQueries.poll().getSchedulerGroup().name(), GROUP_TWO);
        createQueue.put(TestHelper.createQueryRequest(GROUP_TWO, METRICS));
        createQueue.put(TestHelper.createQueryRequest(GROUP_TWO, METRICS));
        createQueue.put(TestHelper.createQueryRequest(GROUP_ONE, METRICS));
        testSchedulerGroup2.addReservedThreads(policyBasedResourceManager.getTableThreadsHardLimit());
        QueueReader queueReader5 = new QueueReader(createQueue);
        queueReader5.startAndWaitForQueueWakeup();
        Assert.assertEquals(queueReader5._readQueries.size(), 0);
        sleepForQueueWakeup(createQueue);
        Assert.assertEquals(queueReader5._readQueries.size(), 0);
        testSchedulerGroup2.releasedReservedThreads(testSchedulerGroup2.totalReservedThreads());
        sleepForQueueWakeup(createQueue);
        Assert.assertEquals(queueReader5._readQueries.size(), 1);
    }

    private void sleepForQueueWakeup(MultiLevelPriorityQueue multiLevelPriorityQueue) throws InterruptedException {
        Thread.sleep((multiLevelPriorityQueue.getWakeupTimeMicros() / 1000) + 10);
    }

    @Test
    public void testNoPendingAfterTrim() throws OutOfCapacityException, BrokenBarrierException, InterruptedException {
        MultiLevelPriorityQueue createQueue = createQueue();
        long currentTimeMillis = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(100L);
        createQueue.put(TestHelper.createQueryRequest(GROUP_ONE, METRICS, currentTimeMillis));
        createQueue.put(TestHelper.createQueryRequest(GROUP_TWO, METRICS, currentTimeMillis));
        TestSchedulerGroup testSchedulerGroup = GROUP_FACTORY._groupMap.get(GROUP_ONE);
        TestSchedulerGroup testSchedulerGroup2 = GROUP_FACTORY._groupMap.get(GROUP_TWO);
        QueueReader queueReader = new QueueReader(createQueue);
        queueReader.startAndWaitForQueueWakeup();
        Assert.assertTrue(queueReader._readQueries.isEmpty());
        Assert.assertTrue(testSchedulerGroup.isEmpty());
        Assert.assertTrue(testSchedulerGroup2.isEmpty());
        createQueue.put(TestHelper.createQueryRequest(GROUP_ONE, METRICS));
        sleepForQueueWakeup(createQueue);
    }

    private MultiLevelPriorityQueue createQueue() {
        PinotConfiguration pinotConfiguration = new PinotConfiguration();
        return createQueue(pinotConfiguration, new UnboundedResourceManager(pinotConfiguration));
    }

    private MultiLevelPriorityQueue createQueue(PinotConfiguration pinotConfiguration, ResourceManager resourceManager) {
        return new MultiLevelPriorityQueue(pinotConfiguration, resourceManager, GROUP_FACTORY, GROUP_MAPPER);
    }
}
