package org.apache.pinot.integration.tests;

import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.class */
public class OfflineGRPCServerIntegrationTest extends BaseClusterIntegrationTest {
    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        startZk();
        startController();
        startBroker();
        startServer();
        Schema createSchema = createSchema();
        addSchema(createSchema);
        TableConfig createOfflineTableConfig = createOfflineTableConfig();
        addTableConfig(createOfflineTableConfig);
        List unpackAvroData = unpackAvroData(this._tempDir);
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(unpackAvroData, createOfflineTableConfig, createSchema, 0, this._segmentDir, this._tarDir);
        uploadSegments(getTableName(), this._tarDir);
        setUpH2Connection(unpackAvroData);
        setUpQueryGenerator(unpackAvroData);
        waitForAllDocsLoaded(600000L);
    }

    public GrpcQueryClient getGrpcQueryClient() {
        return new GrpcQueryClient("localhost", 8090);
    }

    @Test
    public void testGrpcQueryServer() throws Exception {
        GrpcQueryClient grpcQueryClient = getGrpcQueryClient();
        BrokerRequest compileToBrokerRequest = CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM mytable_OFFLINE LIMIT 1000000 OPTION(timeoutMs=30000)");
        GrpcRequestBuilder segments = new GrpcRequestBuilder().setSegments(this._helixResourceManager.getSegmentsFor("mytable_OFFLINE", false));
        testNonStreamingRequest(grpcQueryClient.submit(segments.setSql("SELECT * FROM mytable_OFFLINE LIMIT 1000000 OPTION(timeoutMs=30000)").build()));
        testNonStreamingRequest(grpcQueryClient.submit(segments.setBrokerRequest(compileToBrokerRequest).build()));
        segments.setEnableStreaming(true);
        testStreamingRequest(grpcQueryClient.submit(segments.setSql("SELECT * FROM mytable_OFFLINE LIMIT 1000000 OPTION(timeoutMs=30000)").build()));
        testStreamingRequest(grpcQueryClient.submit(segments.setBrokerRequest(compileToBrokerRequest).build()));
        grpcQueryClient.close();
    }

    @Test(dataProvider = "provideSqlTestCases")
    public void testQueryingGrpcServer(String str) throws Exception {
        GrpcQueryClient grpcQueryClient = getGrpcQueryClient();
        GrpcRequestBuilder segments = new GrpcRequestBuilder().setSegments(this._helixResourceManager.getSegmentsFor("mytable_OFFLINE", false));
        DataTable collectNonStreamingRequestResult = collectNonStreamingRequestResult(grpcQueryClient.submit(segments.setSql(str).build()));
        segments.setEnableStreaming(true);
        collectAndCompareResult(grpcQueryClient.submit(segments.setSql(str).build()), collectNonStreamingRequestResult);
        grpcQueryClient.close();
    }

    @DataProvider(name = "provideSqlTestCases")
    public Object[][] provideSqlAndResultRowsAndNumDocScanTestCases() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Object[]{"SELECT * FROM mytable_OFFLINE LIMIT 10000000"});
        arrayList.add(new Object[]{"SELECT * FROM mytable_OFFLINE WHERE DaysSinceEpoch > 16312 LIMIT 10000000"});
        arrayList.add(new Object[]{"SELECT timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable_OFFLINE LIMIT 10000000"});
        arrayList.add(new Object[]{"SELECT count(*) FROM mytable_OFFLINE"});
        arrayList.add(new Object[]{"SELECT count(*) FROM mytable_OFFLINE GROUP BY arrayLength(DivAirports)"});
        arrayList.add(new Object[]{"SELECT DISTINCTCOUNT(AirlineID) FROM mytable_OFFLINE GROUP BY Carrier"});
        arrayList.add(new Object[]{"SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable_OFFLINE ORDER BY DaysSinceEpoch limit 10000"});
        return (Object[][]) arrayList.toArray(new Object[arrayList.size()]);
    }

    private DataTable collectNonStreamingRequestResult(Iterator<Server.ServerResponse> it) throws Exception {
        Assert.assertTrue(it.hasNext());
        Server.ServerResponse next = it.next();
        Assert.assertEquals((String) next.getMetadataMap().get("responseType"), "nonStreaming");
        DataTable dataTable = DataTableFactory.getDataTable(next.getPayload().asReadOnlyByteBuffer());
        Assert.assertNotNull(dataTable.getDataSchema());
        return dataTable;
    }

    private void collectAndCompareResult(Iterator<Server.ServerResponse> it, DataTable dataTable) throws Exception {
        int i = 0;
        while (it.hasNext()) {
            Server.ServerResponse next = it.next();
            DataTable dataTable2 = DataTableFactory.getDataTable(next.getPayload().asReadOnlyByteBuffer());
            String str = (String) next.getMetadataMap().get("responseType");
            if (str.equals("data")) {
                Assert.assertTrue(dataTable2.getMetadata().containsKey(DataTable.MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()));
                Assert.assertNotNull(dataTable2.getDataSchema());
                i += dataTable2.getNumberOfRows();
            } else {
                Assert.assertEquals(str, "metadata");
                Assert.assertFalse(it.hasNext());
                Assert.assertEquals(i, dataTable.getNumberOfRows());
                Assert.assertNull(dataTable2.getDataSchema());
                Assert.assertEquals(dataTable2.getNumberOfRows(), 0);
                Assert.assertEquals((String) dataTable2.getMetadata().get(DataTable.MetadataKey.NUM_DOCS_SCANNED.getName()), (String) dataTable.getMetadata().get(DataTable.MetadataKey.NUM_DOCS_SCANNED.getName()));
            }
        }
    }

    private void testNonStreamingRequest(Iterator<Server.ServerResponse> it) throws Exception {
        int countStarResult = (int) getCountStarResult();
        DataTable collectNonStreamingRequestResult = collectNonStreamingRequestResult(it);
        Assert.assertEquals(collectNonStreamingRequestResult.getNumberOfRows(), countStarResult);
        Assert.assertEquals((String) collectNonStreamingRequestResult.getMetadata().get(DataTable.MetadataKey.NUM_DOCS_SCANNED.getName()), Integer.toString(countStarResult));
    }

    private void testStreamingRequest(Iterator<Server.ServerResponse> it) throws Exception {
        int countStarResult = (int) getCountStarResult();
        int i = 0;
        while (it.hasNext()) {
            Server.ServerResponse next = it.next();
            DataTable dataTable = DataTableFactory.getDataTable(next.getPayload().asReadOnlyByteBuffer());
            String str = (String) next.getMetadataMap().get("responseType");
            if (str.equals("data")) {
                Map metadata = dataTable.getMetadata();
                Assert.assertTrue(metadata.size() == 1 && metadata.containsKey(DataTable.MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()));
                Assert.assertNotNull(dataTable.getDataSchema());
                i += dataTable.getNumberOfRows();
            } else {
                Assert.assertEquals(str, "metadata");
                Assert.assertFalse(it.hasNext());
                Assert.assertEquals(i, countStarResult);
                Assert.assertNull(dataTable.getDataSchema());
                Assert.assertEquals(dataTable.getNumberOfRows(), 0);
                Assert.assertEquals((String) dataTable.getMetadata().get(DataTable.MetadataKey.NUM_DOCS_SCANNED.getName()), Integer.toString(countStarResult));
            }
        }
    }

    @AfterClass
    public void tearDown() throws Exception {
        dropOfflineTable(getTableName());
        stopServer();
        stopBroker();
        stopController();
        stopZk();
        FileUtils.deleteDirectory(this._tempDir);
    }
}
