package org.apache.pinot.core.transport;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.core.common.datatable.DataTableImplV2;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/core/transport/QueryRouterTest.class */
public class QueryRouterTest {
    private static final int TEST_PORT = 12345;
    private static final String SERVER_INSTANCE_NAME = "Server_localhost_12345";
    private static final Server OFFLINE_SERVER = new Server(SERVER_INSTANCE_NAME, CommonConstants.Helix.TableType.OFFLINE);
    private static final Server REALTIME_SERVER = new Server(SERVER_INSTANCE_NAME, CommonConstants.Helix.TableType.REALTIME);
    private static final BrokerRequest BROKER_REQUEST = new BrokerRequest();
    private static final Map<String, List<String>> ROUTING_TABLE = Collections.singletonMap(SERVER_INSTANCE_NAME, Collections.emptyList());
    private QueryRouter _queryRouter;

    @BeforeClass
    public void setUp() {
        this._queryRouter = new QueryRouter("testBroker", (BrokerMetrics) Mockito.mock(BrokerMetrics.class));
    }

    @Test
    public void testValidResponse() throws Exception {
        DataTableImplV2 dataTableImplV2 = new DataTableImplV2();
        dataTableImplV2.getMetadata().put("requestId", Long.toString(123L));
        DummyServer dummyServer = new DummyServer(TEST_PORT, 0L, dataTableImplV2.toBytes());
        Thread thread = new Thread(dummyServer);
        thread.start();
        while (!dummyServer.isReady()) {
            Thread.sleep(100L);
        }
        Map response = this._queryRouter.submitQuery(123L, "testTable", BROKER_REQUEST, ROUTING_TABLE, (BrokerRequest) null, (Map) null, 1000L).getResponse();
        Assert.assertEquals(response.size(), 1);
        Assert.assertTrue(response.containsKey(OFFLINE_SERVER));
        ServerResponse serverResponse = (ServerResponse) response.get(OFFLINE_SERVER);
        Assert.assertNotNull(serverResponse.getDataTable());
        Assert.assertEquals(serverResponse.getResponseSize(), r0.length);
        Map response2 = this._queryRouter.submitQuery(123L, "testTable", (BrokerRequest) null, (Map) null, BROKER_REQUEST, ROUTING_TABLE, 1000L).getResponse();
        Assert.assertEquals(response2.size(), 1);
        Assert.assertTrue(response2.containsKey(REALTIME_SERVER));
        ServerResponse serverResponse2 = (ServerResponse) response2.get(REALTIME_SERVER);
        Assert.assertNotNull(serverResponse2.getDataTable());
        Assert.assertEquals(serverResponse2.getResponseSize(), r0.length);
        Map response3 = this._queryRouter.submitQuery(123L, "testTable", BROKER_REQUEST, ROUTING_TABLE, BROKER_REQUEST, ROUTING_TABLE, 1000L).getResponse();
        Assert.assertEquals(response3.size(), 2);
        Assert.assertTrue(response3.containsKey(OFFLINE_SERVER));
        ServerResponse serverResponse3 = (ServerResponse) response3.get(OFFLINE_SERVER);
        Assert.assertNotNull(serverResponse3.getDataTable());
        Assert.assertEquals(serverResponse3.getResponseSize(), r0.length);
        Assert.assertTrue(response3.containsKey(REALTIME_SERVER));
        ServerResponse serverResponse4 = (ServerResponse) response3.get(REALTIME_SERVER);
        Assert.assertNotNull(serverResponse4.getDataTable());
        Assert.assertEquals(serverResponse4.getResponseSize(), r0.length);
        dummyServer.shutDown();
        thread.join();
    }

    @Test
    public void testInvalidResponse() throws Exception {
        DummyServer dummyServer = new DummyServer(TEST_PORT, 0L, new byte[0]);
        Thread thread = new Thread(dummyServer);
        thread.start();
        while (!dummyServer.isReady()) {
            Thread.sleep(100L);
        }
        long currentTimeMillis = System.currentTimeMillis();
        Map response = this._queryRouter.submitQuery(123L, "testTable", BROKER_REQUEST, ROUTING_TABLE, (BrokerRequest) null, (Map) null, 1000L).getResponse();
        Assert.assertEquals(response.size(), 1);
        Assert.assertTrue(response.containsKey(OFFLINE_SERVER));
        ServerResponse serverResponse = (ServerResponse) response.get(OFFLINE_SERVER);
        Assert.assertNull(serverResponse.getDataTable());
        Assert.assertEquals(serverResponse.getResponseDelayMs(), -1L);
        Assert.assertEquals(serverResponse.getResponseSize(), 0L);
        Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0L);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis >= 1000);
        dummyServer.shutDown();
        thread.join();
    }

    @Test
    public void testNonMatchingRequestId() throws Exception {
        DataTableImplV2 dataTableImplV2 = new DataTableImplV2();
        dataTableImplV2.getMetadata().put("requestId", Long.toString(123L));
        DummyServer dummyServer = new DummyServer(TEST_PORT, 0L, dataTableImplV2.toBytes());
        Thread thread = new Thread(dummyServer);
        thread.start();
        while (!dummyServer.isReady()) {
            Thread.sleep(100L);
        }
        long currentTimeMillis = System.currentTimeMillis();
        Map response = this._queryRouter.submitQuery(123 + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, (BrokerRequest) null, (Map) null, 1000L).getResponse();
        Assert.assertEquals(response.size(), 1);
        Assert.assertTrue(response.containsKey(OFFLINE_SERVER));
        ServerResponse serverResponse = (ServerResponse) response.get(OFFLINE_SERVER);
        Assert.assertNull(serverResponse.getDataTable());
        Assert.assertEquals(serverResponse.getResponseDelayMs(), -1L);
        Assert.assertEquals(serverResponse.getResponseSize(), 0L);
        Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0L);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis >= 1000);
        dummyServer.shutDown();
        thread.join();
    }

    @Test
    public void testServerDown() throws Exception {
        DataTableImplV2 dataTableImplV2 = new DataTableImplV2();
        dataTableImplV2.getMetadata().put("requestId", Long.toString(123L));
        DummyServer dummyServer = new DummyServer(TEST_PORT, 500L, dataTableImplV2.toBytes());
        Thread thread = new Thread(dummyServer);
        thread.start();
        while (!dummyServer.isReady()) {
            Thread.sleep(100L);
        }
        long currentTimeMillis = System.currentTimeMillis();
        AsyncQueryResponse submitQuery = this._queryRouter.submitQuery(123 + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, (BrokerRequest) null, (Map) null, 1000L);
        dummyServer.shutDown();
        thread.join();
        Map response = submitQuery.getResponse();
        Assert.assertEquals(response.size(), 1);
        Assert.assertTrue(response.containsKey(OFFLINE_SERVER));
        ServerResponse serverResponse = (ServerResponse) response.get(OFFLINE_SERVER);
        Assert.assertNull(serverResponse.getDataTable());
        Assert.assertEquals(serverResponse.getResponseDelayMs(), -1L);
        Assert.assertEquals(serverResponse.getResponseSize(), 0L);
        Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0L);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis < 1000);
        long currentTimeMillis2 = System.currentTimeMillis();
        Map response2 = this._queryRouter.submitQuery(123 + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, (BrokerRequest) null, (Map) null, 1000L).getResponse();
        Assert.assertEquals(response2.size(), 1);
        Assert.assertTrue(response2.containsKey(OFFLINE_SERVER));
        ServerResponse serverResponse2 = (ServerResponse) response2.get(OFFLINE_SERVER);
        Assert.assertNull(serverResponse2.getDataTable());
        Assert.assertEquals(serverResponse2.getSubmitDelayMs(), -1L);
        Assert.assertEquals(serverResponse2.getResponseDelayMs(), -1L);
        Assert.assertEquals(serverResponse2.getResponseSize(), 0L);
        Assert.assertEquals(serverResponse2.getDeserializationTimeMs(), 0L);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis2 < 1000);
    }

    @AfterClass
    public void tearDown() {
        this._queryRouter.shutDown();
    }
}
