package org.apache.pinot.core.transport;

import com.google.common.util.concurrent.Futures;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.core.common.datatable.DataTableImplV2;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.pql.parsers.Pql2Compiler;
import org.apache.pinot.spi.config.table.TableType;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/core/transport/QueryRoutingTest.class */
public class QueryRoutingTest {
    private static final int TEST_PORT = 12345;
    private static final ServerInstance SERVER_INSTANCE = new ServerInstance("localhost", TEST_PORT);
    private static final ServerRoutingInstance OFFLINE_SERVER_ROUTING_INSTANCE = SERVER_INSTANCE.toServerRoutingInstance(TableType.OFFLINE);
    private static final ServerRoutingInstance REALTIME_SERVER_ROUTING_INSTANCE = SERVER_INSTANCE.toServerRoutingInstance(TableType.REALTIME);
    private static final BrokerRequest BROKER_REQUEST = new Pql2Compiler().compileToBrokerRequest("SELECT * FROM testTable");
    private static final Map<ServerInstance, List<String>> ROUTING_TABLE = Collections.singletonMap(SERVER_INSTANCE, Collections.emptyList());
    private QueryRouter _queryRouter;

    @BeforeClass
    public void setUp() {
        this._queryRouter = new QueryRouter("testBroker", (BrokerMetrics) Mockito.mock(BrokerMetrics.class));
    }

    private QueryServer getQueryServer(int i, byte[] bArr) {
        return new QueryServer(TEST_PORT, mockQueryScheduler(i, bArr), (ServerMetrics) Mockito.mock(ServerMetrics.class));
    }

    private QueryScheduler mockQueryScheduler(int i, byte[] bArr) {
        QueryScheduler queryScheduler = (QueryScheduler) Mockito.mock(QueryScheduler.class);
        Mockito.when(queryScheduler.submit((ServerQueryRequest) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            Thread.sleep(i);
            return Futures.immediateFuture(bArr);
        });
        return queryScheduler;
    }

    @Test
    public void testValidResponse() throws Exception {
        DataTableImplV2 dataTableImplV2 = new DataTableImplV2();
        dataTableImplV2.getMetadata().put("requestId", Long.toString(123L));
        byte[] bytes = dataTableImplV2.toBytes();
        QueryServer queryServer = getQueryServer(0, bytes);
        queryServer.start();
        Map response = this._queryRouter.submitQuery(123L, "testTable", BROKER_REQUEST, ROUTING_TABLE, (BrokerRequest) null, (Map) null, 1000L).getResponse();
        Assert.assertEquals(response.size(), 1);
        Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
        ServerResponse serverResponse = (ServerResponse) response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
        Assert.assertNotNull(serverResponse.getDataTable());
        Assert.assertEquals(serverResponse.getResponseSize(), bytes.length);
        Map response2 = this._queryRouter.submitQuery(123L, "testTable", (BrokerRequest) null, (Map) null, BROKER_REQUEST, ROUTING_TABLE, 1000L).getResponse();
        Assert.assertEquals(response2.size(), 1);
        Assert.assertTrue(response2.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
        ServerResponse serverResponse2 = (ServerResponse) response2.get(REALTIME_SERVER_ROUTING_INSTANCE);
        Assert.assertNotNull(serverResponse2.getDataTable());
        Assert.assertEquals(serverResponse2.getResponseSize(), bytes.length);
        Map response3 = this._queryRouter.submitQuery(123L, "testTable", BROKER_REQUEST, ROUTING_TABLE, BROKER_REQUEST, ROUTING_TABLE, 1000L).getResponse();
        Assert.assertEquals(response3.size(), 2);
        Assert.assertTrue(response3.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
        ServerResponse serverResponse3 = (ServerResponse) response3.get(OFFLINE_SERVER_ROUTING_INSTANCE);
        Assert.assertNotNull(serverResponse3.getDataTable());
        Assert.assertEquals(serverResponse3.getResponseSize(), bytes.length);
        Assert.assertTrue(response3.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
        ServerResponse serverResponse4 = (ServerResponse) response3.get(REALTIME_SERVER_ROUTING_INSTANCE);
        Assert.assertNotNull(serverResponse4.getDataTable());
        Assert.assertEquals(serverResponse4.getResponseSize(), bytes.length);
        queryServer.shutDown();
    }

    @Test
    public void testInvalidResponse() throws Exception {
        QueryServer queryServer = getQueryServer(0, new byte[0]);
        queryServer.start();
        long currentTimeMillis = System.currentTimeMillis();
        Map response = this._queryRouter.submitQuery(123L, "testTable", BROKER_REQUEST, ROUTING_TABLE, (BrokerRequest) null, (Map) null, 1000L).getResponse();
        Assert.assertEquals(response.size(), 1);
        Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
        ServerResponse serverResponse = (ServerResponse) response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
        Assert.assertNull(serverResponse.getDataTable());
        Assert.assertEquals(serverResponse.getResponseDelayMs(), -1);
        Assert.assertEquals(serverResponse.getResponseSize(), 0);
        Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis >= 1000);
        queryServer.shutDown();
    }

    @Test
    public void testNonMatchingRequestId() throws Exception {
        DataTableImplV2 dataTableImplV2 = new DataTableImplV2();
        dataTableImplV2.getMetadata().put("requestId", Long.toString(123L));
        QueryServer queryServer = getQueryServer(0, dataTableImplV2.toBytes());
        queryServer.start();
        long currentTimeMillis = System.currentTimeMillis();
        Map response = this._queryRouter.submitQuery(123 + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, (BrokerRequest) null, (Map) null, 1000L).getResponse();
        Assert.assertEquals(response.size(), 1);
        Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
        ServerResponse serverResponse = (ServerResponse) response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
        Assert.assertNull(serverResponse.getDataTable());
        Assert.assertEquals(serverResponse.getResponseDelayMs(), -1);
        Assert.assertEquals(serverResponse.getResponseSize(), 0);
        Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis >= 1000);
        queryServer.shutDown();
    }

    @Test
    public void testServerDown() throws Exception {
        DataTableImplV2 dataTableImplV2 = new DataTableImplV2();
        dataTableImplV2.getMetadata().put("requestId", Long.toString(123L));
        QueryServer queryServer = getQueryServer(500, dataTableImplV2.toBytes());
        queryServer.start();
        long currentTimeMillis = System.currentTimeMillis();
        AsyncQueryResponse submitQuery = this._queryRouter.submitQuery(123 + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, (BrokerRequest) null, (Map) null, 1000L);
        queryServer.shutDown();
        Map response = submitQuery.getResponse();
        Assert.assertEquals(response.size(), 1);
        Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
        ServerResponse serverResponse = (ServerResponse) response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
        Assert.assertNull(serverResponse.getDataTable());
        Assert.assertEquals(serverResponse.getResponseDelayMs(), -1);
        Assert.assertEquals(serverResponse.getResponseSize(), 0);
        Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis < 1000);
        long currentTimeMillis2 = System.currentTimeMillis();
        Map response2 = this._queryRouter.submitQuery(123 + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, (BrokerRequest) null, (Map) null, 1000L).getResponse();
        Assert.assertEquals(response2.size(), 1);
        Assert.assertTrue(response2.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
        ServerResponse serverResponse2 = (ServerResponse) response2.get(OFFLINE_SERVER_ROUTING_INSTANCE);
        Assert.assertNull(serverResponse2.getDataTable());
        Assert.assertEquals(serverResponse2.getSubmitDelayMs(), -1);
        Assert.assertEquals(serverResponse2.getResponseDelayMs(), -1);
        Assert.assertEquals(serverResponse2.getResponseSize(), 0);
        Assert.assertEquals(serverResponse2.getDeserializationTimeMs(), 0);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis2 < 1000);
    }

    @AfterClass
    public void tearDown() {
        this._queryRouter.shutDown();
    }
}
