package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/HybridClusterIntegrationTest.class */
public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet {
    private static final String TENANT_NAME = "TestTenant";
    private static final int NUM_OFFLINE_SEGMENTS = 8;
    private static final int NUM_REALTIME_SEGMENTS = 6;
    private Schema _schema;

    protected int getNumOfflineSegments() {
        return NUM_OFFLINE_SEGMENTS;
    }

    protected int getNumRealtimeSegments() {
        return NUM_REALTIME_SEGMENTS;
    }

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        startHybridCluster();
        List<File> allAvroFiles = getAllAvroFiles();
        List<File> offlineAvroFiles = getOfflineAvroFiles(allAvroFiles);
        List<File> realtimeAvroFiles = getRealtimeAvroFiles(allAvroFiles);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        this._schema = Schema.fromFile(getSchemaFile());
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(offlineAvroFiles, 0, this._segmentDir, this._tarDir, getTableName(), null, getRawIndexColumns(), this._schema, newCachedThreadPool);
        pushAvroIntoKafka(realtimeAvroFiles, getKafkaTopic(), newCachedThreadPool);
        setUpH2Connection(allAvroFiles, newCachedThreadPool);
        setUpQueryGenerator(allAvroFiles, newCachedThreadPool);
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(10L, TimeUnit.MINUTES);
        setUpTable(allAvroFiles.get(0));
        uploadSegments(getTableName(), this._tarDir);
        waitForAllDocsLoaded(600000L);
    }

    protected void startHybridCluster() throws Exception {
        startZk();
        startKafka();
        ControllerConf defaultControllerConfiguration = getDefaultControllerConfiguration();
        defaultControllerConfiguration.setTenantIsolationEnabled(false);
        startController(defaultControllerConfiguration);
        startBroker();
        startServers(2);
        createBrokerTenant(TENANT_NAME, 1);
        createServerTenant(TENANT_NAME, 1, 1);
    }

    protected void setUpTable(File file) throws Exception {
        String schemaName = this._schema.getSchemaName();
        addSchema(getSchemaFile(), schemaName);
        String timeColumnName = this._schema.getTimeColumnName();
        Assert.assertNotNull(timeColumnName);
        TimeUnit outgoingTimeUnit = this._schema.getOutgoingTimeUnit();
        Assert.assertNotNull(outgoingTimeUnit);
        addHybridTable(getTableName(), useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, "localhost:2191/kafka", getKafkaTopic(), getRealtimeSegmentFlushSize(), file, timeColumnName, outgoingTimeUnit.toString(), schemaName, TENANT_NAME, TENANT_NAME, getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName(), getSegmentPartitionConfig());
    }

    @Override // org.apache.pinot.integration.tests.ClusterTest
    protected void overrideServerConf(Configuration configuration) {
        configuration.setProperty("pinot.server.instance.reload.consumingSegment", true);
    }

    protected List<File> getAllAvroFiles() throws Exception {
        int size = unpackAvroData(this._tempDir).size();
        ArrayList arrayList = new ArrayList(size);
        for (int i = 1; i <= size; i++) {
            arrayList.add(new File(this._tempDir, "On_Time_On_Time_Performance_2014_" + i + ".avro"));
        }
        return arrayList;
    }

    protected List<File> getOfflineAvroFiles(List<File> list) {
        int numOfflineSegments = getNumOfflineSegments();
        ArrayList arrayList = new ArrayList(numOfflineSegments);
        for (int i = 0; i < numOfflineSegments; i++) {
            arrayList.add(list.get(i));
        }
        return arrayList;
    }

    protected List<File> getRealtimeAvroFiles(List<File> list) {
        int size = list.size();
        int numRealtimeSegments = getNumRealtimeSegments();
        ArrayList arrayList = new ArrayList(numRealtimeSegments);
        for (int i = size - numRealtimeSegments; i < size; i++) {
            arrayList.add(list.get(i));
        }
        return arrayList;
    }

    @Test
    public void testSegmentListApi() throws Exception {
        Assert.assertEquals(JsonUtils.stringToJsonNode(sendGetRequest(this._controllerRequestURLBuilder.forSegmentListAPIWithTableType(getTableName(), CommonConstants.Helix.TableType.OFFLINE.toString()))).get(0).get("OFFLINE").size(), NUM_OFFLINE_SEGMENTS);
        Assert.assertEquals(JsonUtils.stringToJsonNode(sendGetRequest(this._controllerRequestURLBuilder.forSegmentListAPIWithTableType(getTableName(), CommonConstants.Helix.TableType.REALTIME.toString()))).get(0).get("REALTIME").size(), 3);
        JsonNode stringToJsonNode = JsonUtils.stringToJsonNode(sendGetRequest(this._controllerRequestURLBuilder.forSegmentListAPI(getTableName())));
        int i = 0;
        int i2 = 1;
        if (!stringToJsonNode.get(0).has("REALTIME")) {
            i = 1;
            i2 = 0;
        }
        JsonNode jsonNode = stringToJsonNode.get(i2);
        Assert.assertEquals(stringToJsonNode.get(i).get("REALTIME").size(), 3);
        Assert.assertEquals(jsonNode.get("OFFLINE").size(), NUM_OFFLINE_SEGMENTS);
    }

    @Test
    public void testReload() throws Exception {
        super.testReload(true);
    }

    @Test
    public void testBrokerDebugOutput() throws Exception {
        String tableName = getTableName();
        Assert.assertNotNull(getDebugInfo("debug/timeBoundary/" + tableName));
        Assert.assertNotNull(getDebugInfo("debug/timeBoundary/" + TableNameBuilder.OFFLINE.tableNameWithType(tableName)));
        Assert.assertNotNull(getDebugInfo("debug/timeBoundary/" + TableNameBuilder.REALTIME.tableNameWithType(tableName)));
        Assert.assertNotNull(getDebugInfo("debug/routingTable/" + tableName));
        Assert.assertNotNull(getDebugInfo("debug/routingTable/" + TableNameBuilder.OFFLINE.tableNameWithType(tableName)));
        Assert.assertNotNull(getDebugInfo("debug/routingTable/" + TableNameBuilder.REALTIME.tableNameWithType(tableName)));
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test
    public void testQueriesFromQueryFile() throws Exception {
        super.testQueriesFromQueryFile();
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test
    public void testGeneratedQueriesWithMultiValues() throws Exception {
        super.testGeneratedQueriesWithMultiValues();
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test
    public void testQueryExceptions() throws Exception {
        super.testQueryExceptions();
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test
    public void testInstanceShutdown() throws Exception {
        super.testInstanceShutdown();
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test
    public void testBrokerResponseMetadata() throws Exception {
        super.testBrokerResponseMetadata();
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test
    public void testVirtualColumnQueries() {
        super.testVirtualColumnQueries();
    }

    @AfterClass
    public void tearDown() throws Exception {
        String tableName = getTableName();
        dropOfflineTable(tableName);
        dropRealtimeTable(tableName);
        TestUtils.waitForCondition(r6 -> {
            try {
                getDebugInfo("debug/routingTable/" + tableName);
                return false;
            } catch (FileNotFoundException e) {
                return true;
            } catch (Exception e2) {
                return null;
            }
        }, 60000L, "Routing table is not empty after dropping all tables");
        stopServer();
        stopBroker();
        stopController();
        stopKafka();
        stopZk();
        cleanup();
    }

    protected void cleanup() throws Exception {
        FileUtils.deleteDirectory(this._tempDir);
    }
}
