package org.apache.pinot.broker.broker;

import com.google.common.util.concurrent.Uninterruptibles;
import com.yammer.metrics.core.MetricsRegistry;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.HelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.pinot.broker.broker.helix.DefaultHelixBrokerConfig;
import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
import org.apache.pinot.broker.routing.builder.RoutingTableBuilder;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.testng.Assert;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/broker/broker/HelixBrokerStarterTest.class */
public class HelixBrokerStarterTest {
    private static final int SEGMENT_COUNT = 6;
    private PinotHelixResourceManager _pinotResourceManager;
    private static final String HELIX_CLUSTER_NAME = "TestHelixBrokerStarter";
    private static final String RAW_DINING_TABLE_NAME = "dining";
    private static final String DINING_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_DINING_TABLE_NAME);
    private static final String COFFEE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType("coffee");
    private final Configuration _pinotHelixBrokerProperties = DefaultHelixBrokerConfig.getDefaultBrokerConf();
    private ZkClient _zkClient;
    private HelixAdmin _helixAdmin;
    private HelixBrokerStarter _helixBrokerStarter;
    private ZkStarter.ZookeeperInstance _zookeeperInstance;

    @BeforeTest
    public void setUp() throws Exception {
        this._zookeeperInstance = ZkStarter.startLocalZkServer();
        this._zkClient = new ZkClient("localhost:2191");
        this._pinotResourceManager = new PinotHelixResourceManager("localhost:2191", HELIX_CLUSTER_NAME, "localhost_helixController", (String) null, 10000L, true, false, true);
        this._pinotResourceManager.start();
        this._helixAdmin = this._pinotResourceManager.getHelixAdmin();
        this._pinotHelixBrokerProperties.addProperty("pinot.broker.client.queryPort", 8943);
        this._pinotHelixBrokerProperties.addProperty("pinot.broker.refresh.timeBoundaryInfo.sleepInterval", 100L);
        this._helixBrokerStarter = new HelixBrokerStarter(HELIX_CLUSTER_NAME, "localhost:2191", this._pinotHelixBrokerProperties);
        ControllerRequestBuilderUtil.addFakeBrokerInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, "localhost:2191", 5, true);
        ControllerRequestBuilderUtil.addFakeDataInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, "localhost:2191", 1, true);
        while (true) {
            if (this._helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_OFFLINE").size() != 0 && this._helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_BROKER").size() != 0) {
                break;
            } else {
                Thread.sleep(100L);
            }
        }
        this._pinotResourceManager.addTable(new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(RAW_DINING_TABLE_NAME).setTimeColumnName("timeColumn").setTimeType("DAYS").build());
        setupRealtimeTable();
        for (int i = 0; i < 5; i++) {
            this._pinotResourceManager.addNewSegment(SegmentMetadataMockUtils.mockSegmentMetadata(RAW_DINING_TABLE_NAME), "downloadUrl");
        }
        Thread.sleep(1000L);
        Assert.assertEquals(this._helixAdmin.getResourceExternalView(HELIX_CLUSTER_NAME, DINING_TABLE_NAME).getPartitionSet().size(), 5);
    }

    private void setupRealtimeTable() throws IOException {
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put("stream.kafka.consumer.type", "highLevel");
        hashMap.put("stream.kafka.topic.name", "kafkaTopic");
        hashMap.put("stream.kafka.decoder.class.name", "org.apache.pinot.core.realtime.impl.kafka.KafkaAvroMessageDecoder");
        hashMap.put("stream.kafka.hlc.zk.connect.string", "localhost:1111/zkConnect");
        hashMap.put("stream.kafka.decoder.prop.schema.registry.rest.url", "http://localhost:2222/schemaRegistry");
        TableConfig build = new TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(RAW_DINING_TABLE_NAME).setTimeColumnName("timeColumn").setTimeType("DAYS").setStreamConfigs(hashMap).build();
        Schema schema = new Schema();
        schema.setSchemaName(RAW_DINING_TABLE_NAME);
        this._pinotResourceManager.addOrUpdateSchema(schema);
        PinotLLCRealtimeSegmentManager.create(this._pinotResourceManager, new ControllerConf(), new ControllerMetrics(new MetricsRegistry()));
        this._pinotResourceManager.addTable(build);
        this._helixBrokerStarter.getHelixExternalViewBasedRouting().markDataResourceOnline(build, (ExternalView) null, new ArrayList());
    }

    @AfterTest
    public void tearDown() {
        this._pinotResourceManager.stop();
        this._zkClient.close();
        ZkStarter.stopLocalZkServer(this._zookeeperInstance);
    }

    @Test
    public void testResourceAndTagAssignment() throws Exception {
        Assert.assertEquals(this._helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_BROKER").size(), SEGMENT_COUNT);
        Assert.assertEquals(this._helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, "brokerResource").getInstanceSet(DINING_TABLE_NAME).size(), SEGMENT_COUNT);
        Assert.assertEquals(this._helixAdmin.getResourceExternalView(HELIX_CLUSTER_NAME, "brokerResource").getStateMap(DINING_TABLE_NAME).size(), SEGMENT_COUNT);
        HelixExternalViewBasedRouting helixExternalViewBasedRouting = this._helixBrokerStarter.getHelixExternalViewBasedRouting();
        Field declaredField = HelixExternalViewBasedRouting.class.getDeclaredField("_routingTableBuilderMap");
        declaredField.setAccessible(true);
        final Map map = (Map) declaredField.get(helixExternalViewBasedRouting);
        waitForPredicate(new Callable<Boolean>() { // from class: org.apache.pinot.broker.broker.HelixBrokerStarterTest.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(map.size() == 1);
            }
        }, 30000L);
        Assert.assertEquals(Arrays.toString(map.keySet().toArray()), "[dining_OFFLINE, dining_REALTIME]");
        this._pinotResourceManager.addTable(new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName("coffee").setBrokerTenant("testBroker").setServerTenant("testServer").build());
        Assert.assertEquals(this._helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_BROKER").size(), SEGMENT_COUNT);
        IdealState resourceIdealState = this._helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, "brokerResource");
        Assert.assertEquals(resourceIdealState.getInstanceSet(COFFEE_TABLE_NAME).size(), SEGMENT_COUNT);
        Assert.assertEquals(resourceIdealState.getInstanceSet(DINING_TABLE_NAME).size(), SEGMENT_COUNT);
        waitForPredicate(new Callable<Boolean>() { // from class: org.apache.pinot.broker.broker.HelixBrokerStarterTest.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(HelixBrokerStarterTest.this._helixAdmin.getResourceExternalView(HelixBrokerStarterTest.HELIX_CLUSTER_NAME, "brokerResource").getStateMap(HelixBrokerStarterTest.COFFEE_TABLE_NAME).size() == HelixBrokerStarterTest.SEGMENT_COUNT);
            }
        }, 30000L);
        Assert.assertEquals(this._helixAdmin.getResourceExternalView(HELIX_CLUSTER_NAME, "brokerResource").getStateMap(COFFEE_TABLE_NAME).size(), SEGMENT_COUNT);
        waitForPredicate(new Callable<Boolean>() { // from class: org.apache.pinot.broker.broker.HelixBrokerStarterTest.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(map.size() == 2);
            }
        }, 30000L);
        Object[] array = map.keySet().toArray();
        Arrays.sort(array);
        Assert.assertEquals(Arrays.toString(array), "[coffee_OFFLINE, dining_OFFLINE, dining_REALTIME]");
        Assert.assertEquals(((List) ((Map) ((RoutingTableBuilder) map.get(DINING_TABLE_NAME)).getRoutingTables().get(0)).values().iterator().next()).size(), 5);
        this._pinotResourceManager.addNewSegment(SegmentMetadataMockUtils.mockSegmentMetadata(RAW_DINING_TABLE_NAME), "downloadUrl");
        waitForPredicate(new Callable<Boolean>() { // from class: org.apache.pinot.broker.broker.HelixBrokerStarterTest.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(HelixBrokerStarterTest.this._helixAdmin.getResourceExternalView(HelixBrokerStarterTest.HELIX_CLUSTER_NAME, HelixBrokerStarterTest.DINING_TABLE_NAME).getPartitionSet().size() == HelixBrokerStarterTest.SEGMENT_COUNT);
            }
        }, 30000L);
        Assert.assertEquals(this._helixAdmin.getResourceExternalView(HELIX_CLUSTER_NAME, DINING_TABLE_NAME).getPartitionSet().size(), SEGMENT_COUNT);
        Object[] array2 = map.keySet().toArray();
        Arrays.sort(array2);
        Assert.assertEquals(Arrays.toString(array2), "[coffee_OFFLINE, dining_OFFLINE, dining_REALTIME]");
        waitForPredicate(new Callable<Boolean>() { // from class: org.apache.pinot.broker.broker.HelixBrokerStarterTest.5
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(((List) ((Map) ((RoutingTableBuilder) map.get(HelixBrokerStarterTest.DINING_TABLE_NAME)).getRoutingTables().get(0)).values().iterator().next()).size() == HelixBrokerStarterTest.SEGMENT_COUNT);
            }
        }, 30000L);
        Assert.assertEquals(((List) ((Map) ((RoutingTableBuilder) map.get(DINING_TABLE_NAME)).getRoutingTables().get(0)).values().iterator().next()).size(), SEGMENT_COUNT);
    }

    @Test
    public void testTimeBoundaryUpdate() throws Exception {
        Assert.assertEquals(this._helixBrokerStarter.getHelixExternalViewBasedRouting().getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME).getTimeValue(), Long.toString(10L));
        long j = 20;
        Iterator it = this._pinotResourceManager.getSegmentsFor(DINING_TABLE_NAME).iterator();
        while (it.hasNext()) {
            OfflineSegmentZKMetadata offlineSegmentZKMetadata = this._pinotResourceManager.getOfflineSegmentZKMetadata(RAW_DINING_TABLE_NAME, (String) it.next());
            Assert.assertNotNull(offlineSegmentZKMetadata);
            PinotHelixResourceManager pinotHelixResourceManager = this._pinotResourceManager;
            long j2 = j;
            j = j2 + 1;
            pinotHelixResourceManager.refreshSegment(SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_DINING_TABLE_NAME, pinotHelixResourceManager, j2), offlineSegmentZKMetadata);
        }
        waitForPredicate(() -> {
            return Boolean.valueOf(10 < Long.parseLong(this._helixBrokerStarter.getHelixExternalViewBasedRouting().getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME).getTimeValue()));
        }, 5 * this._pinotHelixBrokerProperties.getLong("pinot.broker.refresh.timeBoundaryInfo.sleepInterval"));
        Assert.assertTrue(10 < Long.parseLong(this._helixBrokerStarter.getHelixExternalViewBasedRouting().getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME).getTimeValue()));
    }

    private void waitForPredicate(Callable<Boolean> callable, long j) {
        long currentTimeMillis = System.currentTimeMillis() + j;
        while (System.currentTimeMillis() < currentTimeMillis) {
            if (callable.call().booleanValue()) {
                return;
            } else {
                Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
            }
        }
    }
}
