package org.apache.pinot.query.runtime.operator;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Stack;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.operator.blocks.results.MetadataResultsBlock;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.executor.ResultsBlockStreamer;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.physical.MailboxIdUtils;
import org.apache.pinot.query.routing.MailboxMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/query/runtime/operator/OpChainTest.class */
public class OpChainTest {
    private static int _numOperatorsInitialized = 0;
    private final List<TransferableBlock> _blockList = new ArrayList();
    private final ExecutorService _executorService = Executors.newCachedThreadPool();
    private final AtomicReference<LeafStageTransferableBlockOperator> _leafOpRef = new AtomicReference<>();
    private AutoCloseable _mocks;

    @Mock
    private MultiStageOperator _sourceOperator;

    @Mock
    private MailboxService _mailboxService1;

    @Mock
    private ReceivingMailbox _mailbox1;

    @Mock
    private MailboxService _mailboxService2;

    @Mock
    private ReceivingMailbox _mailbox2;

    @Mock
    private BlockExchange _exchange;
    private VirtualServerAddress _serverAddress;
    private StageMetadata _receivingStageMetadata;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pinot/query/runtime/operator/OpChainTest$DummyMultiStageCallableOperator.class */
    public static class DummyMultiStageCallableOperator extends MultiStageOperator {
        private final MultiStageOperator _upstream;
        private final long _sleepTimeInMillis;

        public DummyMultiStageCallableOperator(OpChainExecutionContext opChainExecutionContext, MultiStageOperator multiStageOperator, long j) {
            super(opChainExecutionContext);
            this._upstream = multiStageOperator;
            this._sleepTimeInMillis = j;
        }

        protected TransferableBlock getNextBlock() {
            try {
                Thread.sleep(this._sleepTimeInMillis);
                this._upstream.nextBlock();
            } catch (InterruptedException e) {
            }
            return TransferableBlockUtils.getEndOfStreamTransferableBlock();
        }

        public String toExplainString() {
            int i = OpChainTest._numOperatorsInitialized;
            OpChainTest._numOperatorsInitialized = i + 1;
            return "DUMMY_" + i;
        }
    }

    /* loaded from: input_file:org/apache/pinot/query/runtime/operator/OpChainTest$DummyMultiStageOperator.class */
    static class DummyMultiStageOperator extends MultiStageOperator {
        public DummyMultiStageOperator(OpChainExecutionContext opChainExecutionContext) {
            super(opChainExecutionContext);
        }

        protected TransferableBlock getNextBlock() {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
            return TransferableBlockUtils.getEndOfStreamTransferableBlock();
        }

        @Nullable
        public String toExplainString() {
            return "DUMMY";
        }
    }

    @BeforeMethod
    public void setUpMethod() {
        this._mocks = MockitoAnnotations.openMocks(this);
        this._serverAddress = new VirtualServerAddress("localhost", 123, 0);
        this._receivingStageMetadata = new StageMetadata.Builder().setWorkerMetadataList((List) Stream.of(this._serverAddress).map(virtualServerAddress -> {
            return new WorkerMetadata.Builder().setVirtualServerAddress(virtualServerAddress).addMailBoxInfoMap(0, new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), ImmutableList.of(virtualServerAddress), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), ImmutableList.of(virtualServerAddress), ImmutableMap.of())).addMailBoxInfoMap(2, new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), ImmutableList.of(virtualServerAddress), ImmutableMap.of())).build();
        }).collect(Collectors.toList())).build();
        Mockito.when(this._mailboxService1.getReceivingMailbox((String) ArgumentMatchers.any())).thenReturn(this._mailbox1);
        Mockito.when(this._mailboxService2.getReceivingMailbox((String) ArgumentMatchers.any())).thenReturn(this._mailbox2);
        try {
            ((BlockExchange) Mockito.doAnswer(invocationOnMock -> {
                this._blockList.add((TransferableBlock) invocationOnMock.getArgument(0));
                return true;
            }).when(this._exchange)).send((TransferableBlock) ArgumentMatchers.any(TransferableBlock.class));
            Mockito.when(this._mailbox2.poll()).then(invocationOnMock2 -> {
                return this._blockList.isEmpty() ? TransferableBlockUtils.getEndOfStreamTransferableBlock() : this._blockList.remove(0);
            });
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @AfterMethod
    public void tearDownMethod() throws Exception {
        this._mocks.close();
        this._exchange.close();
    }

    @AfterClass
    public void tearDown() {
        this._executorService.shutdown();
    }

    @Test
    public void testExecutionTimerStats() {
        Mockito.when(this._sourceOperator.nextBlock()).then(invocationOnMock -> {
            Thread.sleep(100L);
            return TransferableBlockUtils.getEndOfStreamTransferableBlock();
        });
        OpChain opChain = new OpChain(OperatorTestUtil.getDefaultContext(), this._sourceOperator);
        opChain.getStats().executing();
        opChain.getRoot().nextBlock();
        opChain.getStats().queued();
        Assert.assertTrue(opChain.getStats().getExecutionTime() >= 100);
        Mockito.when(this._sourceOperator.nextBlock()).then(invocationOnMock2 -> {
            Thread.sleep(20L);
            return TransferableBlockUtils.getEndOfStreamTransferableBlock();
        });
        OpChain opChain2 = new OpChain(OperatorTestUtil.getDefaultContext(), this._sourceOperator);
        opChain2.getStats().executing();
        opChain2.getRoot().nextBlock();
        opChain2.getStats().queued();
        Assert.assertTrue(opChain2.getStats().getExecutionTime() >= 20);
        Assert.assertTrue(opChain2.getStats().getExecutionTime() < 100);
    }

    @Test
    public void testStatsCollectionTracingEnabled() {
        OpChainExecutionContext defaultContext = OperatorTestUtil.getDefaultContext();
        DummyMultiStageOperator dummyMultiStageOperator = new DummyMultiStageOperator(defaultContext);
        OpChain opChain = new OpChain(defaultContext, dummyMultiStageOperator);
        opChain.getStats().executing();
        opChain.getRoot().nextBlock();
        opChain.getStats().queued();
        Assert.assertTrue(opChain.getStats().getExecutionTime() >= 1000);
        Assert.assertEquals(opChain.getStats().getOperatorStatsMap().size(), 1);
        Assert.assertTrue(opChain.getStats().getOperatorStatsMap().containsKey(dummyMultiStageOperator.getOperatorId()));
        long parseLong = Long.parseLong((String) ((OperatorStats) opChain.getStats().getOperatorStatsMap().get(dummyMultiStageOperator.getOperatorId())).getExecutionStats().get(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName()));
        Assert.assertTrue(parseLong >= 1000 && parseLong <= 2000, "Expected " + DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS + " to be in [1000, 2000] but found " + parseLong);
    }

    @Test
    public void testStatsCollectionTracingDisabled() {
        OpChainExecutionContext defaultContextWithTracingDisabled = OperatorTestUtil.getDefaultContextWithTracingDisabled();
        OpChain opChain = new OpChain(defaultContextWithTracingDisabled, new DummyMultiStageOperator(defaultContextWithTracingDisabled));
        opChain.getStats().executing();
        opChain.getRoot().nextBlock();
        opChain.getStats().queued();
        Assert.assertTrue(opChain.getStats().getExecutionTime() >= 1000);
        Assert.assertEquals(opChain.getStats().getOperatorStatsMap().size(), 0);
    }

    @Test
    public void testStatsCollectionTracingEnabledMultipleOperators() {
        OpChainExecutionContext opChainExecutionContext = new OpChainExecutionContext(this._mailboxService1, 1L, 1, this._serverAddress, Long.MAX_VALUE, Collections.singletonMap("trace", "true"), this._receivingStageMetadata, (PipelineBreakerResult) null);
        Stack<MultiStageOperator> fullOpchain = getFullOpchain(2, 1, opChainExecutionContext, 1000L);
        OpChain opChain = new OpChain(opChainExecutionContext, fullOpchain.peek());
        opChain.getStats().executing();
        do {
        } while (!opChain.getRoot().nextBlock().isEndOfStreamBlock());
        opChain.getStats().queued();
        OpChainExecutionContext opChainExecutionContext2 = new OpChainExecutionContext(this._mailboxService2, 1L, 1 + 1, this._serverAddress, Long.MAX_VALUE, Collections.singletonMap("trace", "true"), this._receivingStageMetadata, (PipelineBreakerResult) null);
        MailboxReceiveOperator mailboxReceiveOperator = new MailboxReceiveOperator(opChainExecutionContext2, RelDistribution.Type.BROADCAST_DISTRIBUTED, 1 + 1);
        Assert.assertTrue(opChain.getStats().getExecutionTime() >= 1000);
        int size = fullOpchain.size();
        Assert.assertEquals(opChain.getStats().getOperatorStatsMap().size(), size);
        while (!fullOpchain.isEmpty()) {
            Assert.assertTrue(opChain.getStats().getOperatorStatsMap().containsKey(fullOpchain.pop().getOperatorId()));
        }
        do {
        } while (!mailboxReceiveOperator.nextBlock().isEndOfStreamBlock());
        Assert.assertEquals(opChainExecutionContext2.getStats().getOperatorStatsMap().size(), size + 1);
    }

    @Test
    public void testStatsCollectionTracingDisableMultipleOperators() {
        OpChainExecutionContext opChainExecutionContext = new OpChainExecutionContext(this._mailboxService1, 1L, 1, this._serverAddress, Long.MAX_VALUE, Collections.emptyMap(), this._receivingStageMetadata, (PipelineBreakerResult) null);
        Stack<MultiStageOperator> fullOpchain = getFullOpchain(2, 1, opChainExecutionContext, 1000L);
        OpChain opChain = new OpChain(opChainExecutionContext, fullOpchain.peek());
        opChain.getStats().executing();
        opChain.getRoot().nextBlock();
        opChain.getStats().queued();
        OpChainExecutionContext opChainExecutionContext2 = new OpChainExecutionContext(this._mailboxService2, 1L, 1 + 1, this._serverAddress, Long.MAX_VALUE, Collections.emptyMap(), this._receivingStageMetadata, (PipelineBreakerResult) null);
        MailboxReceiveOperator mailboxReceiveOperator = new MailboxReceiveOperator(opChainExecutionContext2, RelDistribution.Type.BROADCAST_DISTRIBUTED, 1);
        Assert.assertTrue(opChain.getStats().getExecutionTime() >= 1000);
        Assert.assertEquals(opChain.getStats().getOperatorStatsMap().size(), 2);
        Assert.assertTrue(opChain.getStats().getOperatorStatsMap().containsKey(fullOpchain.pop().getOperatorId()));
        do {
        } while (!mailboxReceiveOperator.nextBlock().isEndOfStreamBlock());
        while (!fullOpchain.isEmpty()) {
            MultiStageOperator pop = fullOpchain.pop();
            if (pop.toExplainString().contains("SEND") || pop.toExplainString().contains("LEAF")) {
                Assert.assertTrue(opChain.getStats().getOperatorStatsMap().containsKey(pop.getOperatorId()));
            }
        }
        Assert.assertEquals(opChainExecutionContext2.getStats().getOperatorStatsMap().size(), 2);
    }

    /* JADX WARN: Type inference failed for: r2v13, types: [java.lang.Object[], java.lang.Object[][]] */
    private Stack<MultiStageOperator> getFullOpchain(int i, int i2, OpChainExecutionContext opChainExecutionContext, long j) {
        Stack<MultiStageOperator> stack = new Stack<>();
        DataSchema dataSchema = new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
        try {
            Mockito.when(this._mailbox1.poll()).thenReturn(OperatorTestUtil.block(dataSchema, new Object[]{new Object[]{1}}), new TransferableBlock[]{TransferableBlockUtils.getEndOfStreamTransferableBlock()});
        } catch (Exception e) {
            Assert.fail("Exception while mocking mailbox receive: " + e.getMessage());
        }
        LeafStageTransferableBlockOperator leafStageTransferableBlockOperator = new LeafStageTransferableBlockOperator(opChainExecutionContext, Collections.singletonList((ServerQueryRequest) Mockito.mock(ServerQueryRequest.class)), dataSchema, mockQueryExecutor(Collections.singletonList(new SelectionResultsBlock(dataSchema, Arrays.asList(new Object[]{1}, new Object[]{2}), QueryContextConverterUtils.getQueryContext("SELECT intCol FROM tbl"))), new InstanceResponseBlock(new MetadataResultsBlock())), this._executorService);
        this._leafOpRef.set(leafStageTransferableBlockOperator);
        TransformOperator transformOperator = new TransformOperator(opChainExecutionContext, leafStageTransferableBlockOperator, dataSchema, Collections.singletonList(new RexExpression.InputRef(0)), dataSchema);
        FilterOperator filterOperator = new FilterOperator(opChainExecutionContext, transformOperator, dataSchema, new RexExpression.Literal(DataSchema.ColumnDataType.BOOLEAN, 1));
        DummyMultiStageCallableOperator dummyMultiStageCallableOperator = new DummyMultiStageCallableOperator(opChainExecutionContext, filterOperator, j);
        MailboxSendOperator mailboxSendOperator = new MailboxSendOperator(opChainExecutionContext, dummyMultiStageCallableOperator, this._exchange, (List) null, (List) null, false);
        stack.push(leafStageTransferableBlockOperator);
        stack.push(transformOperator);
        stack.push(filterOperator);
        stack.push(dummyMultiStageCallableOperator);
        stack.push(mailboxSendOperator);
        return stack;
    }

    private QueryExecutor mockQueryExecutor(List<BaseResultsBlock> list, InstanceResponseBlock instanceResponseBlock) {
        QueryExecutor queryExecutor = (QueryExecutor) Mockito.mock(QueryExecutor.class);
        Mockito.when(queryExecutor.execute((ServerQueryRequest) ArgumentMatchers.any(), (ExecutorService) ArgumentMatchers.any(), (ResultsBlockStreamer) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            LeafStageTransferableBlockOperator leafStageTransferableBlockOperator = this._leafOpRef.get();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                leafStageTransferableBlockOperator.addResultsBlock((BaseResultsBlock) it.next());
            }
            return instanceResponseBlock;
        });
        return queryExecutor;
    }
}
