package org.apache.pinot.broker.broker;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
import org.apache.pinot.broker.routing.RoutingTableLookupRequest;
import org.apache.pinot.broker.routing.TimeBoundaryService;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/broker/broker/HelixBrokerStarterTest.class */
public class HelixBrokerStarterTest extends ControllerTest {
    private static final String RAW_TABLE_NAME = "testTable";
    private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
    private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
    private static final String TIME_COLUMN_NAME = "daysSinceEpoch";
    private static final int NUM_BROKERS = 3;
    private static final int NUM_SERVERS = 1;
    private static final int NUM_OFFLINE_SEGMENTS = 5;
    private HelixBrokerStarter _brokerStarter;

    @BeforeClass
    public void setUp() throws Exception {
        startZk();
        startController();
        BaseConfiguration baseConfiguration = new BaseConfiguration();
        baseConfiguration.addProperty("pinot.broker.client.queryPort", 18099);
        baseConfiguration.addProperty("pinot.broker.refresh.timeBoundaryInfo.sleepInterval", 100L);
        this._brokerStarter = new HelixBrokerStarter(baseConfiguration, getHelixClusterName(), "localhost:2191");
        this._brokerStarter.start();
        addFakeBrokerInstancesToAutoJoinHelixCluster(2, true);
        addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVERS, true);
        this._helixResourceManager.addOrUpdateSchema(new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addTime(TIME_COLUMN_NAME, TimeUnit.DAYS, FieldSpec.DataType.INT).build());
        this._helixResourceManager.addTable(new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME).setTimeType(TimeUnit.DAYS.name()).build());
        this._helixResourceManager.addTable(new TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME).setTimeType(TimeUnit.DAYS.name()).setStreamConfigs(getStreamConfigs()).build());
        for (int i = 0; i < NUM_OFFLINE_SEGMENTS; i += NUM_SERVERS) {
            this._helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), "downloadUrl");
        }
        TestUtils.waitForCondition(r5 -> {
            ExternalView resourceExternalView = this._helixAdmin.getResourceExternalView(getHelixClusterName(), OFFLINE_TABLE_NAME);
            return Boolean.valueOf(resourceExternalView != null && resourceExternalView.getPartitionSet().size() == NUM_OFFLINE_SEGMENTS);
        }, 30000L, "Failed to find all OFFLINE segments in the ExternalView");
    }

    private Map<String, String> getStreamConfigs() {
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put("stream.kafka.consumer.type", "highLevel");
        hashMap.put("stream.kafka.topic.name", "kafkaTopic");
        hashMap.put("stream.kafka.decoder.class.name", "org.apache.pinot.core.realtime.impl.kafka.KafkaAvroMessageDecoder");
        return hashMap;
    }

    @Test
    public void testResourceAndTagAssignment() throws Exception {
        Assert.assertEquals(this._helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), TagNameUtils.getBrokerTagForTenant((String) null)).size(), NUM_BROKERS);
        IdealState resourceIdealState = this._helixAdmin.getResourceIdealState(getHelixClusterName(), "brokerResource");
        Assert.assertEquals(resourceIdealState.getInstanceSet(OFFLINE_TABLE_NAME).size(), NUM_BROKERS);
        Assert.assertEquals(resourceIdealState.getInstanceSet(REALTIME_TABLE_NAME).size(), NUM_BROKERS);
        ExternalView resourceExternalView = this._helixAdmin.getResourceExternalView(getHelixClusterName(), "brokerResource");
        Assert.assertEquals(resourceExternalView.getStateMap(OFFLINE_TABLE_NAME).size(), NUM_BROKERS);
        Assert.assertEquals(resourceExternalView.getStateMap(REALTIME_TABLE_NAME).size(), NUM_BROKERS);
        HelixExternalViewBasedRouting helixExternalViewBasedRouting = this._brokerStarter.getHelixExternalViewBasedRouting();
        Assert.assertTrue(helixExternalViewBasedRouting.routingTableExists(OFFLINE_TABLE_NAME));
        Assert.assertTrue(helixExternalViewBasedRouting.routingTableExists(REALTIME_TABLE_NAME));
        RoutingTableLookupRequest routingTableLookupRequest = new RoutingTableLookupRequest(OFFLINE_TABLE_NAME);
        Map routingTable = helixExternalViewBasedRouting.getRoutingTable(routingTableLookupRequest);
        Assert.assertEquals(routingTable.size(), NUM_SERVERS);
        Assert.assertEquals(((List) routingTable.values().iterator().next()).size(), NUM_OFFLINE_SEGMENTS);
        this._helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), "downloadUrl");
        TestUtils.waitForCondition(r5 -> {
            return Boolean.valueOf(((List) helixExternalViewBasedRouting.getRoutingTable(routingTableLookupRequest).values().iterator().next()).size() == 6);
        }, 30000L, "Failed to add the new segment into the routing table");
        String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType("newTable");
        this._helixResourceManager.addTable(new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName("newTable").setBrokerTenant("testBroker").build());
        TableConfig tableConfig = this._helixResourceManager.getTableConfig(tableNameWithType);
        Assert.assertNotNull(tableConfig);
        Assert.assertEquals(tableConfig.getTenantConfig().getBroker(), "DefaultTenant");
        Assert.assertEquals(this._helixAdmin.getResourceIdealState(getHelixClusterName(), "brokerResource").getInstanceSet(tableNameWithType).size(), NUM_BROKERS);
        TestUtils.waitForCondition(r6 -> {
            Map stateMap = this._helixAdmin.getResourceExternalView(getHelixClusterName(), "brokerResource").getStateMap(tableNameWithType);
            return Boolean.valueOf(stateMap != null && stateMap.size() == NUM_BROKERS);
        }, 30000L, "Failed to find all brokers for the new table in the brokerResource ExternalView");
        Assert.assertTrue(helixExternalViewBasedRouting.routingTableExists(tableNameWithType));
    }

    @Test
    public void testTimeBoundaryUpdate() {
        TimeBoundaryService timeBoundaryService = this._brokerStarter.getHelixExternalViewBasedRouting().getTimeBoundaryService();
        Assert.assertEquals(timeBoundaryService.getTimeBoundaryInfoFor(OFFLINE_TABLE_NAME).getTimeValue(), Integer.toString(10 - NUM_SERVERS));
        String str = (String) this._helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME).get(0);
        int i = 10 + 10;
        this._helixResourceManager.refreshSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME, str, i), this._helixResourceManager.getOfflineSegmentZKMetadata(RAW_TABLE_NAME, str));
        TestUtils.waitForCondition(r6 -> {
            return Boolean.valueOf(timeBoundaryService.getTimeBoundaryInfoFor(OFFLINE_TABLE_NAME).getTimeValue().equals(Integer.toString(i - NUM_SERVERS)));
        }, 30000L, "Failed to update the time boundary for refreshed segment");
    }

    @AfterClass
    public void tearDown() {
        stopFakeInstances();
        this._brokerStarter.shutdown();
        stopController();
        stopZk();
    }
}
