package org.apache.pinot.integration.tests;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.ValidationMetrics;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TagOverrideConfig;
import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.class */
public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrationTestSet {
    private static final int PERIODIC_TASK_INITIAL_DELAY_SECONDS = 30;
    private static final int PERIODIC_TASK_FREQUENCY_SECONDS = 5;
    private static final String PERIODIC_TASK_FREQUENCY = "5s";
    private static final int NUM_REPLICAS = 2;
    private static final String TENANT_NAME = "TestTenant";
    private static final int NUM_BROKERS = 1;
    private static final int NUM_OFFLINE_SERVERS = 2;
    private static final int NUM_REALTIME_SERVERS = 2;
    private static final int NUM_OFFLINE_AVRO_FILES = 8;
    private static final int NUM_REALTIME_AVRO_FILES = 6;
    private String _currentTable = "mytable";

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public String getTableName() {
        return this._currentTable;
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    protected boolean useLlc() {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public int getNumReplicas() {
        return 2;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public String getBrokerTenant() {
        return TENANT_NAME;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public String getServerTenant() {
        return TENANT_NAME;
    }

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        startZk();
        startKafka();
        Map defaultControllerConfiguration = getDefaultControllerConfiguration();
        defaultControllerConfiguration.put("cluster.tenant.isolation.enable", false);
        defaultControllerConfiguration.put("controller.statusChecker.initialDelayInSeconds", Integer.valueOf(PERIODIC_TASK_INITIAL_DELAY_SECONDS));
        defaultControllerConfiguration.put("controller.statuschecker.frequencyInSeconds", 5);
        defaultControllerConfiguration.put("controller.realtimeSegmentRelocation.initialDelayInSeconds", Integer.valueOf(PERIODIC_TASK_INITIAL_DELAY_SECONDS));
        defaultControllerConfiguration.put("controller.realtime.segment.relocator.frequency", PERIODIC_TASK_FREQUENCY);
        defaultControllerConfiguration.put("controller.broker.resource.validation.initialDelayInSeconds", Integer.valueOf(PERIODIC_TASK_INITIAL_DELAY_SECONDS));
        defaultControllerConfiguration.put("controller.broker.resource.validation.frequencyInSeconds", 5);
        defaultControllerConfiguration.put("controller.offlineSegmentIntervalChecker.initialDelayInSeconds", Integer.valueOf(PERIODIC_TASK_INITIAL_DELAY_SECONDS));
        defaultControllerConfiguration.put("controller.offline.segment.interval.checker.frequencyInSeconds", 5);
        startController(defaultControllerConfiguration);
        startBrokers(NUM_BROKERS);
        startServers(4);
        createBrokerTenant(TENANT_NAME, NUM_BROKERS);
        createServerTenant(TENANT_NAME, 2, 2);
        int size = unpackAvroData(this._tempDir).size();
        ArrayList arrayList = new ArrayList(size);
        for (int i = NUM_BROKERS; i <= size; i += NUM_BROKERS) {
            arrayList.add(new File(this._tempDir, "On_Time_On_Time_Performance_2014_" + i + ".avro"));
        }
        List subList = arrayList.subList(0, NUM_OFFLINE_AVRO_FILES);
        List<File> subList2 = arrayList.subList(size - NUM_REALTIME_AVRO_FILES, size);
        Schema createSchema = createSchema();
        addSchema(createSchema);
        TableConfig createOfflineTableConfig = createOfflineTableConfig();
        addTableConfig(createOfflineTableConfig);
        addTableConfig(createRealtimeTableConfig(subList2.get(0)));
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(subList, createOfflineTableConfig, createSchema, 0, this._segmentDir, this._tarDir);
        uploadSegments(getTableName(), this._tarDir);
        pushAvroIntoKafka(subList2);
        waitForAllDocsLoaded(600000L);
    }

    @AfterClass
    public void tearDown() throws Exception {
        String tableName = getTableName();
        dropOfflineTable(tableName);
        dropRealtimeTable(tableName);
        stopServer();
        stopBroker();
        stopController();
        stopKafka();
        stopZk();
        FileUtils.deleteDirectory(this._tempDir);
    }

    @Test
    public void testSegmentStatusChecker() throws Exception {
        String str = "emptyTable";
        String str2 = "disabledTable";
        String str3 = "tableWithOfflineSegment";
        this._currentTable = "emptyTable";
        addTableConfig(createOfflineTableConfig());
        this._currentTable = "disabledTable";
        addTableConfig(createOfflineTableConfig());
        this._helixAdmin.enableResource(getHelixClusterName(), TableNameBuilder.OFFLINE.tableNameWithType("disabledTable"), false);
        this._currentTable = "tableWithOfflineSegment";
        addTableConfig(createOfflineTableConfig());
        uploadSegments(this._currentTable, this._tarDir);
        HelixHelper.updateIdealState(this._helixManager, TableNameBuilder.OFFLINE.tableNameWithType("tableWithOfflineSegment"), idealState -> {
            Assert.assertNotNull(idealState);
            ((Map.Entry) ((Map) idealState.getRecord().getMapFields().values().iterator().next()).entrySet().iterator().next()).setValue("OFFLINE");
            return idealState;
        }, RetryPolicies.fixedDelayRetryPolicy(2, 10L));
        this._currentTable = "mytable";
        int i = 5;
        ControllerMetrics controllerMetrics = this._controllerStarter.getControllerMetrics();
        TestUtils.waitForCondition(r19 -> {
            if (controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, "SegmentStatusChecker") == i && checkSegmentStatusCheckerMetrics(controllerMetrics, TableNameBuilder.OFFLINE.tableNameWithType(str), null, 2L, 100L, 0L, 100L) && checkSegmentStatusCheckerMetrics(controllerMetrics, TableNameBuilder.OFFLINE.tableNameWithType(str2), null, Long.MIN_VALUE, Long.MIN_VALUE, Long.MIN_VALUE, Long.MIN_VALUE)) {
                String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
                if (!checkSegmentStatusCheckerMetrics(controllerMetrics, tableNameWithType, this._helixResourceManager.getTableIdealState(tableNameWithType), 2L, 100L, 0L, 100L)) {
                    return false;
                }
                String tableNameWithType2 = TableNameBuilder.OFFLINE.tableNameWithType(str3);
                if (!checkSegmentStatusCheckerMetrics(controllerMetrics, tableNameWithType2, this._helixResourceManager.getTableIdealState(tableNameWithType2), 1L, 50L, 0L, 100L)) {
                    return false;
                }
                String tableNameWithType3 = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
                if (checkSegmentStatusCheckerMetrics(controllerMetrics, tableNameWithType3, this._helixResourceManager.getTableIdealState(tableNameWithType3), 2L, 100L, 0L, 100L)) {
                    return Boolean.valueOf(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT) == 4 && controllerMetrics.getValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT) == 1 && controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT) == 1);
                }
                return false;
            }
            return false;
        }, 60000L, "Timed out waiting for SegmentStatusChecker");
        dropOfflineTable("emptyTable");
        dropOfflineTable("disabledTable");
        dropOfflineTable("tableWithOfflineSegment");
    }

    private boolean checkSegmentStatusCheckerMetrics(ControllerMetrics controllerMetrics, String str, IdealState idealState, long j, long j2, long j3, long j4) {
        return (idealState == null || (controllerMetrics.getValueOfTableGauge(str, ControllerGauge.IDEALSTATE_ZNODE_SIZE) == ((long) idealState.toString().length()) && controllerMetrics.getValueOfTableGauge(str, ControllerGauge.SEGMENT_COUNT) == ((long) idealState.getPartitionSet().size()))) && controllerMetrics.getValueOfTableGauge(str, ControllerGauge.NUMBER_OF_REPLICAS) == j && controllerMetrics.getValueOfTableGauge(str, ControllerGauge.PERCENT_OF_REPLICAS) == j2 && controllerMetrics.getValueOfTableGauge(str, ControllerGauge.SEGMENTS_IN_ERROR_STATE) == j3 && controllerMetrics.getValueOfTableGauge(str, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE) == j4;
    }

    @Test
    public void testRealtimeSegmentRelocator() throws Exception {
        TableConfig realtimeTableConfig = getRealtimeTableConfig();
        realtimeTableConfig.setTenantConfig(new TenantConfig(TENANT_NAME, TENANT_NAME, new TagOverrideConfig(TagNameUtils.getRealtimeTagForTenant(TENANT_NAME), TagNameUtils.getOfflineTagForTenant(TENANT_NAME))));
        updateTableConfig(realtimeTableConfig);
        TestUtils.waitForCondition(r5 -> {
            HashSet hashSet = new HashSet();
            HashSet hashSet2 = new HashSet();
            IdealState tableIdealState = this._helixResourceManager.getTableIdealState(TableNameBuilder.REALTIME.tableNameWithType(getTableName()));
            Assert.assertNotNull(tableIdealState);
            Iterator it = tableIdealState.getRecord().getMapFields().values().iterator();
            while (it.hasNext()) {
                for (Map.Entry entry : ((Map) it.next()).entrySet()) {
                    String str = (String) entry.getValue();
                    if (str.equals("CONSUMING")) {
                        hashSet.add((String) entry.getKey());
                    } else if (str.equals("ONLINE")) {
                        hashSet2.add((String) entry.getKey());
                    }
                }
            }
            return Boolean.valueOf(Collections.disjoint(hashSet, hashSet2));
        }, 60000L, "Timed out waiting for RealtimeSegmentRelocation");
    }

    @Test
    public void testBrokerResourceValidationManager() {
        InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig("Broker_localhost_1234");
        instanceConfig.addTag(TagNameUtils.getBrokerTagForTenant(TENANT_NAME));
        String helixClusterName = getHelixClusterName();
        this._helixAdmin.addInstance(helixClusterName, instanceConfig);
        Set allInstancesForBrokerTenant = this._helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME);
        Assert.assertTrue(allInstancesForBrokerTenant.contains("Broker_localhost_1234"));
        String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
        TestUtils.waitForCondition(r7 -> {
            IdealState brokerIdealStates = HelixHelper.getBrokerIdealStates(this._helixAdmin, helixClusterName);
            Assert.assertNotNull(brokerIdealStates);
            return Boolean.valueOf(brokerIdealStates.getInstanceSet(tableNameWithType).equals(allInstancesForBrokerTenant));
        }, 60000L, "Timeout when waiting for BrokerResourceValidationManager");
        this._helixAdmin.dropInstance(helixClusterName, instanceConfig);
        Set allInstancesForBrokerTenant2 = this._helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME);
        Assert.assertFalse(allInstancesForBrokerTenant2.contains("Broker_localhost_1234"));
        TestUtils.waitForCondition(r72 -> {
            IdealState brokerIdealStates = HelixHelper.getBrokerIdealStates(this._helixAdmin, helixClusterName);
            Assert.assertNotNull(brokerIdealStates);
            return Boolean.valueOf(brokerIdealStates.getInstanceSet(tableNameWithType).equals(allInstancesForBrokerTenant2));
        }, 60000L, "Timeout when waiting for BrokerResourceValidationManager");
    }

    @Test
    public void testOfflineSegmentIntervalChecker() {
        ValidationMetrics validationMetrics = this._controllerStarter.getOfflineSegmentIntervalChecker().getValidationMetrics();
        String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
        TestUtils.waitForCondition(r7 -> {
            return Boolean.valueOf(validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType, "SegmentCount")) == 8 && validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType, "missingSegmentCount")) == 0 && validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType, "TotalDocumentCount")) == 79003);
        }, 60000L, "Timed out waiting for OfflineSegmentIntervalChecker");
    }
}
