package org.apache.pinot.query.runtime.operator;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.class */
public class MailboxReceiveOperatorTest {
    private AutoCloseable _mocks;

    @Mock
    private ReceivingMailbox<TransferableBlock> _mailbox;

    @Mock
    private ReceivingMailbox<TransferableBlock> _mailbox2;

    @Mock
    private MailboxService<TransferableBlock> _mailboxService;

    @Mock
    private ServerInstance _server1;

    @Mock
    private ServerInstance _server2;

    @BeforeMethod
    public void setUp() {
        this._mocks = MockitoAnnotations.openMocks(this);
    }

    @AfterMethod
    public void tearDown() throws Exception {
        this._mocks.close();
    }

    @Test
    public void shouldTimeoutOnExtraLongSleep() throws InterruptedException {
        MailboxReceiveOperator mailboxReceiveOperator = new MailboxReceiveOperator(this._mailboxService, new ArrayList(), RelDistribution.Type.SINGLETON, "test", 123, 456L, 789, 10L);
        Thread.sleep(200L);
        TransferableBlock nextBlock = mailboxReceiveOperator.nextBlock();
        Assert.assertTrue(nextBlock.isErrorBlock());
        Assert.assertTrue(nextBlock.getDataBlock().getExceptions().containsKey(250));
        MailboxReceiveOperator mailboxReceiveOperator2 = new MailboxReceiveOperator(this._mailboxService, new ArrayList(), RelDistribution.Type.SINGLETON, "test", 123, 456L, 789, 2000L);
        Thread.sleep(200L);
        Assert.assertFalse(mailboxReceiveOperator2.nextBlock().isErrorBlock());
        MailboxReceiveOperator mailboxReceiveOperator3 = new MailboxReceiveOperator(this._mailboxService, new ArrayList(), RelDistribution.Type.SINGLETON, "test", 123, 456L, 789, (Long) null);
        Thread.sleep(200L);
        Assert.assertFalse(mailboxReceiveOperator3.nextBlock().isErrorBlock());
    }

    @Test(expectedExceptions = {IllegalStateException.class}, expectedExceptionsMessageRegExp = ".*multiple instance found.*")
    public void shouldThrowReceiveSingletonFromMultiMatchMailboxServer() {
        Mockito.when(this._mailboxService.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._mailboxService.getMailboxPort())).thenReturn(123);
        Mockito.when(this._server1.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._server1.getQueryMailboxPort())).thenReturn(123);
        Mockito.when(this._server2.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._server2.getQueryMailboxPort())).thenReturn(123);
        new MailboxReceiveOperator(this._mailboxService, ImmutableList.of(this._server1, this._server2), RelDistribution.Type.SINGLETON, "test", 123, 456L, 789, (Long) null);
    }

    @Test(expectedExceptions = {IllegalStateException.class}, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
    public void shouldThrowRangeDistributionNotSupported() {
        Mockito.when(this._mailboxService.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._mailboxService.getMailboxPort())).thenReturn(123);
        Mockito.when(this._server1.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._server1.getQueryMailboxPort())).thenReturn(123);
        Mockito.when(this._server2.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._server2.getQueryMailboxPort())).thenReturn(123);
        new MailboxReceiveOperator(this._mailboxService, ImmutableList.of(this._server1, this._server2), RelDistribution.Type.RANGE_DISTRIBUTED, "test", 123, 456L, 789, (Long) null);
    }

    @Test
    public void shouldReceiveSingletonNoMatchMailboxServer() {
        Mockito.when(this._server1.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._server1.getQueryMailboxPort())).thenReturn(123);
        Mockito.when(this._server2.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._server2.getQueryMailboxPort())).thenReturn(456);
        Mockito.when(this._mailboxService.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._mailboxService.getMailboxPort())).thenReturn(789);
        Assert.assertTrue(new MailboxReceiveOperator(this._mailboxService, ImmutableList.of(this._server1, this._server2), RelDistribution.Type.SINGLETON, "toHost", 8888, 456, 0, (Long) null).nextBlock().isEndOfStreamBlock());
    }

    @Test
    public void shouldReceiveSingletonCloseMailbox() {
        Mockito.when(this._server1.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._server1.getQueryMailboxPort())).thenReturn(123);
        Mockito.when(this._server2.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._server2.getQueryMailboxPort())).thenReturn(456);
        Mockito.when(this._mailboxService.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._mailboxService.getMailboxPort())).thenReturn(456);
        Mockito.when(this._mailboxService.getReceivingMailbox(new StringMailboxIdentifier(String.format("%s_%s", 456, 0), "singleton", 456, "toHost", 8888))).thenReturn(this._mailbox);
        Mockito.when(Boolean.valueOf(this._mailbox.isClosed())).thenReturn(true);
        Assert.assertTrue(new MailboxReceiveOperator(this._mailboxService, ImmutableList.of(this._server1, this._server2), RelDistribution.Type.SINGLETON, "toHost", 8888, 456, 0, (Long) null).nextBlock().isEndOfStreamBlock());
    }

    @Test
    public void shouldReceiveSingletonNullMailbox() throws Exception {
        Mockito.when(this._server1.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._server1.getQueryMailboxPort())).thenReturn(123);
        Mockito.when(this._server2.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._server2.getQueryMailboxPort())).thenReturn(456);
        Mockito.when(this._mailboxService.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._mailboxService.getMailboxPort())).thenReturn(456);
        Mockito.when(this._mailboxService.getReceivingMailbox(new StringMailboxIdentifier(String.format("%s_%s", 456, 0), "singleton", 456, "toHost", 8888))).thenReturn(this._mailbox);
        Mockito.when(Boolean.valueOf(this._mailbox.isClosed())).thenReturn(false);
        Mockito.when((TransferableBlock) this._mailbox.receive()).thenReturn((Object) null);
        Assert.assertTrue(new MailboxReceiveOperator(this._mailboxService, ImmutableList.of(this._server1, this._server2), RelDistribution.Type.SINGLETON, "toHost", 8888, 456, 0, (Long) null).nextBlock().isNoOpBlock());
    }

    @Test
    public void shouldReceiveEosDirectlyFromSender() throws Exception {
        Mockito.when(this._server1.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._server1.getQueryMailboxPort())).thenReturn(123);
        Mockito.when(this._server2.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._server2.getQueryMailboxPort())).thenReturn(456);
        Mockito.when(this._mailboxService.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._mailboxService.getMailboxPort())).thenReturn(456);
        Mockito.when(this._mailboxService.getReceivingMailbox(new StringMailboxIdentifier(String.format("%s_%s", 456, 0), "singleton", 456, "toHost", 8888))).thenReturn(this._mailbox);
        Mockito.when(Boolean.valueOf(this._mailbox.isClosed())).thenReturn(false);
        Mockito.when((TransferableBlock) this._mailbox.receive()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
        Assert.assertTrue(new MailboxReceiveOperator(this._mailboxService, ImmutableList.of(this._server1, this._server2), RelDistribution.Type.SINGLETON, "toHost", 8888, 456, 0, (Long) null).nextBlock().isEndOfStreamBlock());
    }

    /* JADX WARN: Type inference failed for: r2v7, types: [java.lang.Object[], java.lang.Object[][]] */
    @Test
    public void shouldReceiveSingletonMailbox() throws Exception {
        Mockito.when(this._server1.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._server1.getQueryMailboxPort())).thenReturn(123);
        Mockito.when(this._server2.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._server2.getQueryMailboxPort())).thenReturn(456);
        Mockito.when(this._mailboxService.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._mailboxService.getMailboxPort())).thenReturn(456);
        Mockito.when(this._mailboxService.getReceivingMailbox(new StringMailboxIdentifier(String.format("%s_%s", 456, 0), "singleton", 456, "toHost", 8888))).thenReturn(this._mailbox);
        Mockito.when(Boolean.valueOf(this._mailbox.isClosed())).thenReturn(false);
        Object[] objArr = {1, 1};
        Mockito.when((TransferableBlock) this._mailbox.receive()).thenReturn(OperatorTestUtil.block(new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT}), new Object[]{objArr}));
        List container = new MailboxReceiveOperator(this._mailboxService, ImmutableList.of(this._server1, this._server2), RelDistribution.Type.SINGLETON, "toHost", 8888, 456, 0, (Long) null).nextBlock().getContainer();
        Assert.assertEquals(container.size(), 1);
        Assert.assertEquals((Object[]) container.get(0), objArr);
    }

    @Test
    public void shouldReceiveSingletonErrorMailbox() throws Exception {
        Mockito.when(this._server1.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._server1.getQueryMailboxPort())).thenReturn(123);
        Mockito.when(this._server2.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._server2.getQueryMailboxPort())).thenReturn(456);
        Mockito.when(this._mailboxService.getHostname()).thenReturn("singleton");
        Mockito.when(Integer.valueOf(this._mailboxService.getMailboxPort())).thenReturn(456);
        Mockito.when(this._mailboxService.getReceivingMailbox(new StringMailboxIdentifier(String.format("%s_%s", 456, 0), "singleton", 456, "toHost", 8888))).thenReturn(this._mailbox);
        Mockito.when(Boolean.valueOf(this._mailbox.isClosed())).thenReturn(false);
        Mockito.when((TransferableBlock) this._mailbox.receive()).thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("errorBlock")));
        TransferableBlock nextBlock = new MailboxReceiveOperator(this._mailboxService, ImmutableList.of(this._server1, this._server2), RelDistribution.Type.SINGLETON, "toHost", 8888, 456, 0, (Long) null).nextBlock();
        Assert.assertTrue(nextBlock.isErrorBlock());
        Assert.assertTrue(((String) nextBlock.getDataBlock().getExceptions().get(1000)).contains("errorBlock"));
    }

    /* JADX WARN: Type inference failed for: r2v9, types: [java.lang.Object[], java.lang.Object[][]] */
    @Test
    public void shouldReceiveMailboxFromTwoServersOneClose() throws Exception {
        Mockito.when(this._server1.getHostname()).thenReturn("hash1");
        Mockito.when(Integer.valueOf(this._server1.getQueryMailboxPort())).thenReturn(123);
        Mockito.when(this._server2.getHostname()).thenReturn("hash2");
        Mockito.when(Integer.valueOf(this._server2.getQueryMailboxPort())).thenReturn(456);
        Mockito.when(this._mailboxService.getReceivingMailbox(new StringMailboxIdentifier(String.format("%s_%s", 456, 0), "hash1", 123, "toHost", 8888))).thenReturn(this._mailbox);
        Mockito.when(Boolean.valueOf(this._mailbox.isClosed())).thenReturn(true);
        Mockito.when(this._mailboxService.getReceivingMailbox(new StringMailboxIdentifier(String.format("%s_%s", 456, 0), "hash2", 456, "toHost", 8888))).thenReturn(this._mailbox2);
        Mockito.when(Boolean.valueOf(this._mailbox2.isClosed())).thenReturn(false);
        Object[] objArr = {1, 1};
        Mockito.when((TransferableBlock) this._mailbox2.receive()).thenReturn(OperatorTestUtil.block(new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT}), new Object[]{objArr}));
        List container = new MailboxReceiveOperator(this._mailboxService, ImmutableList.of(this._server1, this._server2), RelDistribution.Type.HASH_DISTRIBUTED, "toHost", 8888, 456, 0, (Long) null).nextBlock().getContainer();
        Assert.assertEquals(container.size(), 1);
        Assert.assertEquals((Object[]) container.get(0), objArr);
    }

    /* JADX WARN: Type inference failed for: r2v9, types: [java.lang.Object[], java.lang.Object[][]] */
    @Test
    public void shouldReceiveMailboxFromTwoServersOneNull() throws Exception {
        Mockito.when(this._server1.getHostname()).thenReturn("hash1");
        Mockito.when(Integer.valueOf(this._server1.getQueryMailboxPort())).thenReturn(123);
        Mockito.when(this._server2.getHostname()).thenReturn("hash2");
        Mockito.when(Integer.valueOf(this._server2.getQueryMailboxPort())).thenReturn(456);
        Mockito.when(this._mailboxService.getReceivingMailbox(new StringMailboxIdentifier(String.format("%s_%s", 456, 0), "hash1", 123, "toHost", 8888))).thenReturn(this._mailbox);
        Mockito.when(Boolean.valueOf(this._mailbox.isClosed())).thenReturn(false);
        Mockito.when((TransferableBlock) this._mailbox.receive()).thenReturn((Object) null);
        Mockito.when(this._mailboxService.getReceivingMailbox(new StringMailboxIdentifier(String.format("%s_%s", 456, 0), "hash2", 456, "toHost", 8888))).thenReturn(this._mailbox2);
        Mockito.when(Boolean.valueOf(this._mailbox2.isClosed())).thenReturn(false);
        Object[] objArr = {1, 1};
        Mockito.when((TransferableBlock) this._mailbox2.receive()).thenReturn(OperatorTestUtil.block(new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT}), new Object[]{objArr}));
        List container = new MailboxReceiveOperator(this._mailboxService, ImmutableList.of(this._server1, this._server2), RelDistribution.Type.HASH_DISTRIBUTED, "toHost", 8888, 456, 0, (Long) null).nextBlock().getContainer();
        Assert.assertEquals(container.size(), 1);
        Assert.assertEquals((Object[]) container.get(0), objArr);
    }

    /* JADX WARN: Type inference failed for: r2v17, types: [java.lang.Object[], java.lang.Object[][]] */
    /* JADX WARN: Type inference failed for: r2v9, types: [java.lang.Object[], java.lang.Object[][]] */
    /* JADX WARN: Type inference failed for: r6v8, types: [java.lang.Object[], java.lang.Object[][]] */
    @Test
    public void shouldReceiveMailboxFromTwoServers() throws Exception {
        Mockito.when(this._server1.getHostname()).thenReturn("hash1");
        Mockito.when(Integer.valueOf(this._server1.getQueryMailboxPort())).thenReturn(123);
        Mockito.when(this._server2.getHostname()).thenReturn("hash2");
        Mockito.when(Integer.valueOf(this._server2.getQueryMailboxPort())).thenReturn(456);
        DataSchema dataSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
        Mockito.when(this._mailboxService.getReceivingMailbox(new StringMailboxIdentifier(String.format("%s_%s", 456, 0), "hash1", 123, "toHost", 8888))).thenReturn(this._mailbox);
        Mockito.when(Boolean.valueOf(this._mailbox.isClosed())).thenReturn(false);
        Object[] objArr = {1, 1};
        Object[] objArr2 = {2, 2};
        Mockito.when((TransferableBlock) this._mailbox.receive()).thenReturn(OperatorTestUtil.block(dataSchema, new Object[]{objArr}), new TransferableBlock[]{OperatorTestUtil.block(dataSchema, new Object[]{objArr2}), TransferableBlockUtils.getEndOfStreamTransferableBlock()});
        Object[] objArr3 = {3, 3};
        Mockito.when(this._mailboxService.getReceivingMailbox(new StringMailboxIdentifier(String.format("%s_%s", 456, 0), "hash2", 456, "toHost", 8888))).thenReturn(this._mailbox2);
        Mockito.when(Boolean.valueOf(this._mailbox2.isClosed())).thenReturn(false);
        Mockito.when((TransferableBlock) this._mailbox2.receive()).thenReturn(OperatorTestUtil.block(dataSchema, new Object[]{objArr3}));
        MailboxReceiveOperator mailboxReceiveOperator = new MailboxReceiveOperator(this._mailboxService, ImmutableList.of(this._server1, this._server2), RelDistribution.Type.HASH_DISTRIBUTED, "toHost", 8888, 456, 0, (Long) null);
        List container = mailboxReceiveOperator.nextBlock().getContainer();
        Assert.assertEquals(container.size(), 1);
        Assert.assertEquals((Object[]) container.get(0), objArr);
        List container2 = mailboxReceiveOperator.nextBlock().getContainer();
        Assert.assertEquals(container2.size(), 1);
        Assert.assertEquals((Object[]) container2.get(0), objArr2);
        List container3 = mailboxReceiveOperator.nextBlock().getContainer();
        Assert.assertEquals(container3.size(), 1);
        Assert.assertEquals((Object[]) container3.get(0), objArr3);
    }

    /* JADX WARN: Type inference failed for: r2v10, types: [java.lang.Object[], java.lang.Object[][]] */
    @Test
    public void shouldGetReceptionReceiveErrorMailbox() throws Exception {
        Mockito.when(this._server1.getHostname()).thenReturn("hash1");
        Mockito.when(Integer.valueOf(this._server1.getQueryMailboxPort())).thenReturn(123);
        Mockito.when(this._server2.getHostname()).thenReturn("hash2");
        Mockito.when(Integer.valueOf(this._server2.getQueryMailboxPort())).thenReturn(456);
        DataSchema dataSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
        Mockito.when(this._mailboxService.getReceivingMailbox(new StringMailboxIdentifier(String.format("%s_%s", 456, 0), "hash1", 123, "toHost", 8888))).thenReturn(this._mailbox);
        Mockito.when(Boolean.valueOf(this._mailbox.isClosed())).thenReturn(false);
        Mockito.when((TransferableBlock) this._mailbox.receive()).thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("mailboxError")));
        Mockito.when(this._mailboxService.getReceivingMailbox(new StringMailboxIdentifier(String.format("%s_%s", 456, 0), "hash2", 456, "toHost", 8888))).thenReturn(this._mailbox2);
        Mockito.when(Boolean.valueOf(this._mailbox2.isClosed())).thenReturn(false);
        Mockito.when((TransferableBlock) this._mailbox2.receive()).thenReturn(OperatorTestUtil.block(dataSchema, new Object[]{new Object[]{3, 3}}));
        TransferableBlock nextBlock = new MailboxReceiveOperator(this._mailboxService, ImmutableList.of(this._server1, this._server2), RelDistribution.Type.HASH_DISTRIBUTED, "toHost", 8888, 456, 0, (Long) null).nextBlock();
        Assert.assertTrue(nextBlock.isErrorBlock());
        Assert.assertTrue(((String) nextBlock.getDataBlock().getExceptions().get(1000)).contains("mailboxError"));
    }

    /* JADX WARN: Type inference failed for: r2v10, types: [java.lang.Object[], java.lang.Object[][]] */
    @Test
    public void shouldThrowReceiveWhenOneServerReceiveThrowException() throws Exception {
        Mockito.when(this._server1.getHostname()).thenReturn("hash1");
        Mockito.when(Integer.valueOf(this._server1.getQueryMailboxPort())).thenReturn(123);
        Mockito.when(this._server2.getHostname()).thenReturn("hash2");
        Mockito.when(Integer.valueOf(this._server2.getQueryMailboxPort())).thenReturn(456);
        DataSchema dataSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
        Mockito.when(this._mailboxService.getReceivingMailbox(new StringMailboxIdentifier(String.format("%s_%s", 456, 0), "hash1", 123, "toHost", 8888))).thenReturn(this._mailbox);
        Mockito.when(Boolean.valueOf(this._mailbox.isClosed())).thenReturn(false);
        Mockito.when((TransferableBlock) this._mailbox.receive()).thenThrow(new Throwable[]{new Exception("mailboxError")});
        Object[] objArr = {3, 3};
        Mockito.when(this._mailboxService.getReceivingMailbox(new StringMailboxIdentifier(String.format("%s_%s", 456, 0), "hash2", 456, "toHost", 8888))).thenReturn(this._mailbox2);
        Mockito.when(Boolean.valueOf(this._mailbox2.isClosed())).thenReturn(false);
        Mockito.when((TransferableBlock) this._mailbox2.receive()).thenReturn(OperatorTestUtil.block(dataSchema, new Object[]{objArr}));
        List container = new MailboxReceiveOperator(this._mailboxService, ImmutableList.of(this._server1, this._server2), RelDistribution.Type.HASH_DISTRIBUTED, "toHost", 8888, 456, 0, (Long) null).nextBlock().getContainer();
        Assert.assertEquals(container.size(), 1);
        Assert.assertEquals((Object[]) container.get(0), objArr);
    }
}
