package org.apache.pinot.integration.tests;

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.config.TagOverrideConfig;
import org.apache.pinot.common.config.TenantConfig;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.ValidationMetrics;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.ITestContext;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterGroups;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeGroups;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.class */
public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrationTestSet {
    private static final String TENANT_NAME = "TestTenant";
    private static final String DEFAULT_TABLE_NAME = "mytable";
    private static final int PERIODIC_TASK_INITIAL_DELAY_SECONDS = 60;
    private static final int PERIODIC_TASK_FREQ_SECONDS = 5;
    private static final String PERIODIC_TASK_FREQ = "5s";
    private String _currentTableName;
    private List<File> _avroFiles;

    /* loaded from: input_file:org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests$TestControllerConf.class */
    private class TestControllerConf extends ControllerConf {
        private TestControllerConf(ControllerConf controllerConf) {
            copy(controllerConf);
        }

        public long getRealtimeSegmentValidationManagerInitialDelaySeconds() {
            return 60L;
        }

        public long getStatusCheckerInitialDelayInSeconds() {
            return 60L;
        }

        public long getRealtimeSegmentRelocationInitialDelayInSeconds() {
            return 60L;
        }

        public long getBrokerResourceValidationInitialDelayInSeconds() {
            return 60L;
        }

        public long getOfflineSegmentIntervalCheckerInitialDelayInSeconds() {
            return 60L;
        }
    }

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        startZk();
        startKafka();
        TestControllerConf testControllerConf = new TestControllerConf(getDefaultControllerConfiguration());
        testControllerConf.setTenantIsolationEnabled(false);
        testControllerConf.setStatusCheckerFrequencyInSeconds(5);
        testControllerConf.setRealtimeSegmentRelocatorFrequency(PERIODIC_TASK_FREQ);
        testControllerConf.setBrokerResourceValidationFrequencyInSeconds(5);
        testControllerConf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(5);
        testControllerConf.setRealtimeSegmentValidationFrequencyInSeconds(5);
        startController(testControllerConf);
        startBroker();
        startServers(6);
        createBrokerTenant(TENANT_NAME, 1);
        createServerTenant(TENANT_NAME, 3, 3);
        this._avroFiles = unpackAvroData(this._tempDir);
        setupOfflineTableAndSegments(DEFAULT_TABLE_NAME, this._avroFiles);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        pushAvroIntoKafka(this._avroFiles, getKafkaTopic(), newCachedThreadPool);
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(10L, TimeUnit.MINUTES);
    }

    private void setupOfflineTable(String str) throws Exception {
        addOfflineTable(str, null, null, TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null, null, null, null);
    }

    private void setupOfflineTableAndSegments(String str, List<File> list) throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._segmentDir, this._tarDir});
        setTableName(str);
        File schemaFile = getSchemaFile();
        Schema fromFile = Schema.fromFile(schemaFile);
        addSchema(schemaFile, fromFile.getSchemaName());
        String timeColumnName = fromFile.getTimeColumnName();
        Assert.assertNotNull(timeColumnName);
        TimeUnit outgoingTimeUnit = fromFile.getOutgoingTimeUnit();
        Assert.assertNotNull(outgoingTimeUnit);
        addOfflineTable(str, timeColumnName, outgoingTimeUnit.toString(), TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null, null, null, null);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(list, 0, this._segmentDir, this._tarDir, str, null, null, null, newCachedThreadPool);
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(10L, TimeUnit.MINUTES);
        uploadSegments(getTableName(), this._tarDir);
        waitForAllDocsLoaded(600000L);
    }

    private void setupRealtimeTable(String str, String str2, File file) throws Exception {
        File schemaFile = getSchemaFile();
        Schema fromFile = Schema.fromFile(schemaFile);
        String schemaName = fromFile.getSchemaName();
        addSchema(schemaFile, schemaName);
        String timeColumnName = fromFile.getTimeColumnName();
        Assert.assertNotNull(timeColumnName);
        TimeUnit outgoingTimeUnit = fromFile.getOutgoingTimeUnit();
        Assert.assertNotNull(outgoingTimeUnit);
        addRealtimeTable(str, useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, "localhost:2191/kafka", str2, getRealtimeSegmentFlushSize(), file, timeColumnName, outgoingTimeUnit.toString(), schemaName, TENANT_NAME, TENANT_NAME, getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName());
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public String getTableName() {
        return this._currentTableName;
    }

    private void setTableName(String str) {
        this._currentTableName = str;
    }

    @BeforeGroups(groups = {"segmentStatusChecker"})
    public void beforeTestSegmentStatusCheckerTest(ITestContext iTestContext) throws Exception {
        String defaultOfflineTableName = getDefaultOfflineTableName();
        String defaultRealtimeTableName = getDefaultRealtimeTableName();
        iTestContext.setAttribute("emptyTable", "table1_OFFLINE");
        iTestContext.setAttribute("disabledOfflineTable", "table2_OFFLINE");
        iTestContext.setAttribute("basicOfflineTable", defaultOfflineTableName);
        iTestContext.setAttribute("errorOfflineTable", "table4_OFFLINE");
        iTestContext.setAttribute("basicRealtimeTable", defaultRealtimeTableName);
        iTestContext.setAttribute("numTables", 5);
        setupOfflineTable("table1_OFFLINE");
        setupOfflineTable("table2_OFFLINE");
        this._helixAdmin.enableResource(getHelixClusterName(), "table2_OFFLINE", false);
        setupOfflineTableAndSegments("table4_OFFLINE", this._avroFiles);
        HelixHelper.updateIdealState(this._helixManager, "table4_OFFLINE", new Function<IdealState, IdealState>() { // from class: org.apache.pinot.integration.tests.ControllerPeriodicTasksIntegrationTests.1
            @Nullable
            public IdealState apply(@Nullable IdealState idealState) {
                ArrayList newArrayList = Lists.newArrayList(idealState.getPartitionSet());
                Collections.sort(newArrayList);
                Map instanceStateMap = idealState.getInstanceStateMap((String) newArrayList.get(0));
                Iterator it = instanceStateMap.keySet().iterator();
                if (it.hasNext()) {
                    instanceStateMap.put((String) it.next(), "OFFLINE");
                }
                return idealState;
            }
        }, RetryPolicies.fixedDelayRetryPolicy(2, 10L));
        setupRealtimeTable(defaultRealtimeTableName, getKafkaTopic(), this._avroFiles.get(0));
    }

    @Test(groups = {"segmentStatusChecker"})
    public void testSegmentStatusChecker(ITestContext iTestContext) throws Exception {
        String str = (String) iTestContext.getAttribute("emptyTable");
        String str2 = (String) iTestContext.getAttribute("disabledOfflineTable");
        String str3 = (String) iTestContext.getAttribute("basicOfflineTable");
        String str4 = (String) iTestContext.getAttribute("errorOfflineTable");
        String str5 = (String) iTestContext.getAttribute("basicRealtimeTable");
        int intValue = ((Integer) iTestContext.getAttribute("numTables")).intValue();
        ControllerMetrics controllerMetrics = this._controllerStarter.getControllerMetrics();
        TestUtils.waitForCondition(r7 -> {
            return Boolean.valueOf(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, "SegmentStatusChecker") >= ((long) intValue));
        }, 240000L, "Timed out waiting for SegmentStatusChecker");
        checkSegmentStatusCheckerMetrics(controllerMetrics, str, null, 3L, 100L, 0L, 100L);
        checkSegmentStatusCheckerMetrics(controllerMetrics, str2, null, Long.MIN_VALUE, Long.MIN_VALUE, Long.MIN_VALUE, Long.MIN_VALUE);
        checkSegmentStatusCheckerMetrics(controllerMetrics, str3, this._helixResourceManager.getTableIdealState(str3), 3L, 100L, 0L, 100L);
        checkSegmentStatusCheckerMetrics(controllerMetrics, str4, this._helixResourceManager.getTableIdealState(str4), 2L, 66L, 0L, 100L);
        checkSegmentStatusCheckerMetrics(controllerMetrics, str5, this._helixResourceManager.getTableIdealState(str5), 1L, 100L, 0L, 100L);
        Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT), 4L);
        Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT), 1L);
        Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 1L);
    }

    private void checkSegmentStatusCheckerMetrics(ControllerMetrics controllerMetrics, String str, IdealState idealState, long j, long j2, long j3, long j4) {
        if (idealState != null) {
            Assert.assertEquals(controllerMetrics.getValueOfTableGauge(str, ControllerGauge.IDEALSTATE_ZNODE_SIZE), idealState.toString().length());
            Assert.assertEquals(controllerMetrics.getValueOfTableGauge(str, ControllerGauge.SEGMENT_COUNT), idealState.getPartitionSet().size());
        }
        Assert.assertEquals(controllerMetrics.getValueOfTableGauge(str, ControllerGauge.NUMBER_OF_REPLICAS), j);
        Assert.assertEquals(controllerMetrics.getValueOfTableGauge(str, ControllerGauge.PERCENT_OF_REPLICAS), j2);
        Assert.assertEquals(controllerMetrics.getValueOfTableGauge(str, ControllerGauge.SEGMENTS_IN_ERROR_STATE), j3);
        Assert.assertEquals(controllerMetrics.getValueOfTableGauge(str, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), j4);
    }

    @AfterGroups(groups = {"segmentStatusChecker"})
    public void afterTestSegmentStatusChecker(ITestContext iTestContext) throws Exception {
        String str = (String) iTestContext.getAttribute("emptyTable");
        String str2 = (String) iTestContext.getAttribute("disabledOfflineTable");
        String str3 = (String) iTestContext.getAttribute("errorOfflineTable");
        dropOfflineTable(str);
        dropOfflineTable(str2);
        dropOfflineTable(str3);
    }

    @BeforeGroups(groups = {"realtimeSegmentRelocator"}, dependsOnGroups = {"segmentStatusChecker"})
    public void beforeRealtimeSegmentRelocatorTest(ITestContext iTestContext) throws Exception {
        String defaultRealtimeTableName = getDefaultRealtimeTableName();
        iTestContext.setAttribute("relocationTable", defaultRealtimeTableName);
        updateRealtimeTableTenant(TableNameBuilder.extractRawTableName(defaultRealtimeTableName), new TenantConfig(TENANT_NAME, TENANT_NAME, new TagOverrideConfig("TestTenant_REALTIME", "TestTenant_OFFLINE")));
    }

    @Test(groups = {"realtimeSegmentRelocator"}, dependsOnGroups = {"segmentStatusChecker"})
    public void testRealtimeSegmentRelocator(ITestContext iTestContext) throws Exception {
        String str = (String) iTestContext.getAttribute("relocationTable");
        ControllerMetrics controllerMetrics = this._controllerStarter.getControllerMetrics();
        long count = controllerMetrics.getMeteredTableValue("RealtimeSegmentRelocator", ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN).count();
        TestUtils.waitForCondition(r8 -> {
            return Boolean.valueOf(controllerMetrics.getMeteredTableValue("RealtimeSegmentRelocator", ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN).count() > count);
        }, 60000L, "Timed out waiting for RealtimeSegmentRelocation to run");
        Assert.assertTrue(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, "RealtimeSegmentRelocator") > 0);
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        IdealState tableIdealState = this._helixResourceManager.getTableIdealState(str);
        Iterator it = tableIdealState.getPartitionSet().iterator();
        while (it.hasNext()) {
            Map instanceStateMap = tableIdealState.getInstanceStateMap((String) it.next());
            if (instanceStateMap.containsValue("CONSUMING")) {
                hashSet.addAll(instanceStateMap.keySet());
            }
            if (instanceStateMap.containsValue("ONLINE")) {
                hashSet2.addAll(instanceStateMap.keySet());
            }
        }
        Assert.assertTrue(Collections.disjoint(hashSet, hashSet2));
    }

    @BeforeGroups(groups = {"brokerResourceValidationManager"}, dependsOnGroups = {"realtimeSegmentRelocator"})
    public void beforeBrokerResourceValidationManagerTest(ITestContext iTestContext) throws Exception {
        iTestContext.setAttribute("testTableOne", "testTable");
        iTestContext.setAttribute("testTableTwo", "testTable2");
        setupOfflineTable("testTable");
    }

    @Test(groups = {"brokerResourceValidationManager"}, dependsOnGroups = {"realtimeSegmentRelocator"})
    public void testBrokerResourceValidationManager(ITestContext iTestContext) throws Exception {
        String str = (String) iTestContext.getAttribute("testTableOne");
        String str2 = (String) iTestContext.getAttribute("testTableTwo");
        String tableName = new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(str).build().getTableName();
        TestUtils.waitForCondition(r6 -> {
            return Boolean.valueOf(HelixHelper.getBrokerIdealStates(this._helixAdmin, getHelixClusterName()).getInstanceSet(tableName).equals(this._helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME)));
        }, 60000L, "Timeout when waiting for broker resource to be rebuilt");
        TableConfig build = new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(str2).setBrokerTenant(TENANT_NAME).setServerTenant(TENANT_NAME).build();
        this._helixResourceManager.addTable(build);
        String tableName2 = build.getTableName();
        InstanceConfig instanceConfig = new InstanceConfig("Broker_localhost_2");
        instanceConfig.setInstanceEnabled(true);
        instanceConfig.setHostName("Broker_localhost");
        instanceConfig.setPort("2");
        this._helixAdmin.addInstance(getHelixClusterName(), instanceConfig);
        this._helixAdmin.addInstanceTag(getHelixClusterName(), instanceConfig.getInstanceName(), TagNameUtils.getBrokerTagForTenant(TENANT_NAME));
        AtomicInteger atomicInteger = new AtomicInteger();
        TestUtils.waitForCondition(r7 -> {
            atomicInteger.getAndIncrement();
            return Boolean.valueOf(HelixHelper.getBrokerIdealStates(this._helixAdmin, getHelixClusterName()).getInstanceSet(tableName2).equals(this._helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME)));
        }, 60000L, "Timeout when waiting for broker resource to be rebuilt");
        Assert.assertTrue(atomicInteger.get() > 1);
        this._helixAdmin.dropInstance(getHelixClusterName(), instanceConfig);
        atomicInteger.set(0);
        TestUtils.waitForCondition(r72 -> {
            atomicInteger.getAndIncrement();
            return Boolean.valueOf(HelixHelper.getBrokerIdealStates(this._helixAdmin, getHelixClusterName()).getInstanceSet(tableName2).equals(this._helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME)));
        }, 60000L, "Timeout when waiting for broker resource to be rebuilt");
        Assert.assertTrue(atomicInteger.get() > 1);
    }

    @AfterGroups(groups = {"brokerResourceValidationManager"}, dependsOnGroups = {"realtimeSegmentRelocator"})
    public void afterBrokerResourceValidationManagerTest(ITestContext iTestContext) throws Exception {
        String str = (String) iTestContext.getAttribute("testTableOne");
        String str2 = (String) iTestContext.getAttribute("testTableTwo");
        dropOfflineTable(str);
        dropOfflineTable(str2);
    }

    @Test(groups = {"offlineSegmentIntervalChecker"}, dependsOnGroups = {"brokerResourceValidationManager"})
    public void testOfflineSegmentIntervalChecker() throws Exception {
        ValidationMetrics validationMetrics = this._controllerStarter.getOfflineSegmentIntervalChecker().getValidationMetrics();
        String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(DEFAULT_TABLE_NAME);
        TestUtils.waitForCondition(r7 -> {
            return Boolean.valueOf(validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType, "SegmentCount")) > 0);
        }, 60000L, "Timed out waiting for OfflineSegmentIntervalChecker");
        Assert.assertEquals(validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType, "SegmentCount")), 12L);
        Assert.assertEquals(validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType, "missingSegmentCount")), 0L);
        Assert.assertEquals(validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType, "TotalDocumentCount")), 115545L);
    }

    @Test(groups = {"realtimeSegmentValidationManager"}, dependsOnGroups = {"offlineSegmentIntervalChecker"})
    public void testRealtimeSegmentValidationManager(ITestContext iTestContext) throws Exception {
        ControllerMetrics controllerMetrics = this._controllerStarter.getControllerMetrics();
        long count = controllerMetrics.getMeteredTableValue("RealtimeSegmentValidationManager", ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN).count();
        TestUtils.waitForCondition(r8 -> {
            return Boolean.valueOf(controllerMetrics.getMeteredTableValue("RealtimeSegmentValidationManager", ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN).count() > count);
        }, 60000L, "Timed out waiting for RealtimeSegmentValidationManager to run");
        Assert.assertTrue(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, "RealtimeSegmentValidationManager") > 0);
        Assert.assertTrue(this._controllerStarter.getRealtimeSegmentValidationManager().getValidationMetrics().getValueOfGauge(ValidationMetrics.makeGaugeName(getDefaultRealtimeTableName(), "TotalDocumentCount")) > 0);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public boolean useLlc() {
        return true;
    }

    private String getDefaultOfflineTableName() {
        return "mytable_OFFLINE";
    }

    private String getDefaultRealtimeTableName() {
        return "mytable_REALTIME";
    }

    @AfterClass
    public void tearDown() throws Exception {
        stopServer();
        stopBroker();
        stopController();
        stopKafka();
        stopZk();
        FileUtils.deleteDirectory(this._tempDir);
    }
}
