package org.apache.pinot.query.runtime.executor;

import com.google.common.collect.ImmutableList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.class */
public class OpChainSchedulerServiceTest {
    private ExecutorService _executor;
    private AutoCloseable _mocks;

    @Mock
    private Operator<TransferableBlock> _operatorA;

    @Mock
    private Operator<TransferableBlock> _operatorB;

    @Mock
    private OpChainScheduler _scheduler;

    @BeforeClass
    public void beforeClass() {
        this._mocks = MockitoAnnotations.openMocks(this);
    }

    @AfterClass
    public void afterClass() throws Exception {
        this._mocks.close();
    }

    @AfterMethod
    public void afterMethod() {
        this._executor.shutdownNow();
    }

    private void initExecutor(int i) {
        this._executor = Executors.newFixedThreadPool(i);
    }

    private OpChain getChain(Operator<TransferableBlock> operator) {
        return new OpChain(operator, ImmutableList.of(), 123L, 1);
    }

    @Test
    public void shouldScheduleSingleOpChainRegisteredAfterStart() throws InterruptedException {
        initExecutor(1);
        Mockito.when(Boolean.valueOf(this._scheduler.hasNext())).thenReturn(true);
        Mockito.when(this._scheduler.next()).thenReturn(getChain(this._operatorA));
        OpChainSchedulerService opChainSchedulerService = new OpChainSchedulerService(this._scheduler, this._executor);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Mockito.when(this._operatorA.nextBlock()).thenAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            return TransferableBlockUtils.getEndOfStreamTransferableBlock();
        });
        opChainSchedulerService.startAsync().awaitRunning();
        opChainSchedulerService.register(new OpChain(this._operatorA, ImmutableList.of(), 123L, 1));
        Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
        opChainSchedulerService.stopAsync().awaitTerminated();
    }

    @Test
    public void shouldScheduleSingleOpChainRegisteredBeforeStart() throws InterruptedException {
        initExecutor(1);
        Mockito.when(Boolean.valueOf(this._scheduler.hasNext())).thenReturn(true);
        Mockito.when(this._scheduler.next()).thenReturn(getChain(this._operatorA));
        OpChainSchedulerService opChainSchedulerService = new OpChainSchedulerService(this._scheduler, this._executor);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Mockito.when(this._operatorA.nextBlock()).thenAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            return TransferableBlockUtils.getEndOfStreamTransferableBlock();
        });
        opChainSchedulerService.register(new OpChain(this._operatorA, ImmutableList.of(), 123L, 1));
        opChainSchedulerService.startAsync().awaitRunning();
        Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
        opChainSchedulerService.stopAsync().awaitTerminated();
    }

    @Test
    public void shouldReRegisterOpChainOnNoOpBlock() throws InterruptedException {
        initExecutor(1);
        Mockito.when(Boolean.valueOf(this._scheduler.hasNext())).thenReturn(true);
        Mockito.when(this._scheduler.next()).thenReturn(getChain(this._operatorA));
        OpChainSchedulerService opChainSchedulerService = new OpChainSchedulerService(this._scheduler, this._executor);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Mockito.when(this._operatorA.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()).thenAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            return TransferableBlockUtils.getEndOfStreamTransferableBlock();
        });
        opChainSchedulerService.startAsync().awaitRunning();
        opChainSchedulerService.register(new OpChain(this._operatorA, ImmutableList.of(), 123L, 1));
        Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
        ((OpChainScheduler) Mockito.verify(this._scheduler, Mockito.times(2))).register((OpChain) Mockito.any(OpChain.class), ((Boolean) Mockito.any(Boolean.TYPE)).booleanValue());
        opChainSchedulerService.stopAsync().awaitTerminated();
    }

    @Test
    public void shouldYieldOpChainsWhenNoWorkCanBeDone() throws InterruptedException {
        initExecutor(1);
        Mockito.when(Boolean.valueOf(this._scheduler.hasNext())).thenReturn(true);
        Mockito.when(this._scheduler.next()).thenReturn(getChain(this._operatorA)).thenReturn(getChain(this._operatorB)).thenReturn(getChain(this._operatorA));
        OpChainSchedulerService opChainSchedulerService = new OpChainSchedulerService(this._scheduler, this._executor);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Mockito.when(this._operatorA.nextBlock()).thenAnswer(invocationOnMock -> {
            if (atomicBoolean2.get()) {
                countDownLatch.countDown();
                return TransferableBlockUtils.getEndOfStreamTransferableBlock();
            }
            atomicBoolean.set(true);
            return TransferableBlockUtils.getNoOpTransferableBlock();
        });
        Mockito.when(this._operatorB.nextBlock()).thenAnswer(invocationOnMock2 -> {
            atomicBoolean2.set(true);
            return TransferableBlockUtils.getEndOfStreamTransferableBlock();
        });
        opChainSchedulerService.startAsync().awaitRunning();
        opChainSchedulerService.register(new OpChain(this._operatorA, ImmutableList.of(), 123L, 1));
        opChainSchedulerService.register(new OpChain(this._operatorB, ImmutableList.of(), 123L, 1));
        Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
        Assert.assertTrue(atomicBoolean.get(), "expected opA to be scheduled first");
        opChainSchedulerService.stopAsync().awaitTerminated();
    }

    @Test
    public void shouldNotCallSchedulerNextWhenHasNextReturnsFalse() throws InterruptedException {
        initExecutor(1);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Mockito.when(Boolean.valueOf(this._scheduler.hasNext())).thenAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            return false;
        });
        OpChainSchedulerService opChainSchedulerService = new OpChainSchedulerService(this._scheduler, this._executor);
        opChainSchedulerService.startAsync().awaitRunning();
        Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "expected hasNext to be called");
        opChainSchedulerService.stopAsync().awaitTerminated();
        ((OpChainScheduler) Mockito.verify(this._scheduler, Mockito.never())).next();
    }

    @Test
    public void shouldReevaluateHasNextWhenOnDataAvailableIsCalled() throws InterruptedException {
        initExecutor(1);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Mockito.when(Boolean.valueOf(this._scheduler.hasNext())).thenAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            return false;
        }).then(invocationOnMock2 -> {
            countDownLatch2.countDown();
            return false;
        });
        OpChainSchedulerService opChainSchedulerService = new OpChainSchedulerService(this._scheduler, this._executor);
        opChainSchedulerService.startAsync().awaitRunning();
        Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "expected hasNext to be called");
        opChainSchedulerService.onDataAvailable((MailboxIdentifier) null);
        Assert.assertTrue(countDownLatch2.await(10L, TimeUnit.SECONDS), "expected hasNext to be called again");
        opChainSchedulerService.stopAsync().awaitTerminated();
    }
}
