package org.apache.druid.server.coordinator.rules;

import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.loading.SegmentAction;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.class */
public class BroadcastDistributionRuleTest {
    private static final String DS_WIKI = "wiki";
    private static final String TIER_1 = "tier1";
    private static final String TIER_2 = "tier2";
    private int serverId = 0;
    private final DataSegment wikiSegment = CreateDataSegments.ofDatasource(DS_WIKI).eachOfSizeInMb(100).get(0);

    @Before
    public void setUp() {
        this.serverId = 0;
    }

    @Test
    public void testSegmentIsBroadcastToAllTiers() {
        ServerHolder create10gbHistorical = create10gbHistorical(TIER_1, new DataSegment[0]);
        ServerHolder create10gbHistorical2 = create10gbHistorical(TIER_2, new DataSegment[0]);
        CoordinatorRunStats runRuleOnSegment = runRuleOnSegment(new ForeverBroadcastDistributionRule(), this.wikiSegment, makeParamsWithUsedSegments(DruidCluster.builder().add(create10gbHistorical).add(create10gbHistorical2).build(), this.wikiSegment));
        Assert.assertEquals(1L, runRuleOnSegment.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_WIKI));
        Assert.assertTrue(create10gbHistorical.isLoadingSegment(this.wikiSegment));
        Assert.assertEquals(1L, runRuleOnSegment.getSegmentStat(Stats.Segments.ASSIGNED, TIER_2, DS_WIKI));
        Assert.assertTrue(create10gbHistorical2.isLoadingSegment(this.wikiSegment));
    }

    @Test
    public void testSegmentIsNotBroadcastToServerIfAlreadyLoaded() {
        ServerHolder create10gbHistorical = create10gbHistorical(TIER_1, this.wikiSegment);
        ServerHolder create10gbHistorical2 = create10gbHistorical(TIER_1, new DataSegment[0]);
        Assert.assertEquals(1L, runRuleOnSegment(new ForeverBroadcastDistributionRule(), this.wikiSegment, makeParamsWithUsedSegments(DruidCluster.builder().add(create10gbHistorical).add(create10gbHistorical2).build(), this.wikiSegment)).getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_WIKI));
        Assert.assertFalse(create10gbHistorical.isLoadingSegment(this.wikiSegment));
        Assert.assertTrue(create10gbHistorical.isServingSegment(this.wikiSegment));
        Assert.assertTrue(create10gbHistorical2.isLoadingSegment(this.wikiSegment));
    }

    @Test
    public void testSegmentIsNotBroadcastToDecommissioningServer() {
        ServerHolder create10gbHistorical = create10gbHistorical(TIER_1, new DataSegment[0]);
        ServerHolder createDecommissioningHistorical = createDecommissioningHistorical(TIER_1, new DataSegment[0]);
        Assert.assertEquals(1L, runRuleOnSegment(new ForeverBroadcastDistributionRule(), this.wikiSegment, makeParamsWithUsedSegments(DruidCluster.builder().add(create10gbHistorical).add(createDecommissioningHistorical).build(), this.wikiSegment)).getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_WIKI));
        Assert.assertTrue(create10gbHistorical.isLoadingSegment(this.wikiSegment));
        Assert.assertTrue(createDecommissioningHistorical.getLoadingSegments().isEmpty());
    }

    @Test
    public void testBroadcastSegmentIsDroppedFromDecommissioningServer() {
        ServerHolder create10gbHistorical = create10gbHistorical(TIER_1, this.wikiSegment);
        ServerHolder createDecommissioningHistorical = createDecommissioningHistorical(TIER_1, this.wikiSegment);
        Assert.assertEquals(1L, runRuleOnSegment(new ForeverBroadcastDistributionRule(), this.wikiSegment, makeParamsWithUsedSegments(DruidCluster.builder().add(create10gbHistorical).add(createDecommissioningHistorical).build(), this.wikiSegment)).getSegmentStat(Stats.Segments.DROPPED, TIER_1, DS_WIKI));
        Assert.assertTrue(create10gbHistorical.getPeon().getSegmentsToDrop().isEmpty());
        Assert.assertTrue(createDecommissioningHistorical.getPeon().getSegmentsToDrop().contains(this.wikiSegment));
    }

    @Test
    public void testSegmentIsBroadcastToAllServerTypes() {
        ServerHolder serverHolder = new ServerHolder(create10gbServer(ServerType.BROKER, "broker_tier").toImmutableDruidServer(), new TestLoadQueuePeon());
        ServerHolder serverHolder2 = new ServerHolder(create10gbServer(ServerType.INDEXER_EXECUTOR, TIER_2).toImmutableDruidServer(), new TestLoadQueuePeon());
        ServerHolder create10gbHistorical = create10gbHistorical(TIER_1, new DataSegment[0]);
        CoordinatorRunStats runRuleOnSegment = runRuleOnSegment(new ForeverBroadcastDistributionRule(), this.wikiSegment, makeParamsWithUsedSegments(DruidCluster.builder().add(serverHolder).add(serverHolder2).add(create10gbHistorical).build(), this.wikiSegment));
        Assert.assertEquals(1L, runRuleOnSegment.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_WIKI));
        Assert.assertEquals(1L, runRuleOnSegment.getSegmentStat(Stats.Segments.ASSIGNED, TIER_2, DS_WIKI));
        Assert.assertEquals(1L, runRuleOnSegment.getSegmentStat(Stats.Segments.ASSIGNED, serverHolder.getServer().getTier(), DS_WIKI));
        Assert.assertTrue(create10gbHistorical.isLoadingSegment(this.wikiSegment));
        Assert.assertTrue(serverHolder2.isLoadingSegment(this.wikiSegment));
        Assert.assertTrue(serverHolder.isLoadingSegment(this.wikiSegment));
    }

    @Test
    public void testReasonForBroadcastFailure() {
        ServerHolder create10gbHistorical = create10gbHistorical(TIER_1, new DataSegment[0]);
        ServerHolder serverHolder = new ServerHolder(new DruidServer("server1", "server1", (String) null, 0L, ServerType.HISTORICAL, TIER_1, 0).toImmutableDruidServer(), new TestLoadQueuePeon());
        ServerHolder serverHolder2 = new ServerHolder(create10gbServer(ServerType.HISTORICAL, TIER_1).toImmutableDruidServer(), new TestLoadQueuePeon(), false, 5, 100);
        CreateDataSegments.ofDatasource("koala").forIntervals(5, Granularities.MONTH).withNumPartitions(1).eachOfSizeInMb(10L).forEach(dataSegment -> {
            serverHolder2.startOperation(SegmentAction.LOAD, dataSegment);
        });
        Assert.assertTrue(serverHolder2.isLoadQueueFull());
        CoordinatorRunStats runRuleOnSegment = runRuleOnSegment(new ForeverBroadcastDistributionRule(), this.wikiSegment, makeParamsWithUsedSegments(DruidCluster.builder().add(create10gbHistorical).add(serverHolder).add(serverHolder2).build(), this.wikiSegment));
        Assert.assertEquals(1L, runRuleOnSegment.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, DS_WIKI));
        Assert.assertEquals(1L, runRuleOnSegment.get(Stats.Segments.ASSIGN_SKIPPED, RowKey.with(Dimension.DATASOURCE, DS_WIKI).with(Dimension.TIER, TIER_1).and(Dimension.DESCRIPTION, "Not enough disk space")));
        Assert.assertEquals(1L, runRuleOnSegment.get(Stats.Segments.ASSIGN_SKIPPED, RowKey.with(Dimension.DATASOURCE, DS_WIKI).with(Dimension.TIER, TIER_1).and(Dimension.DESCRIPTION, "Load queue is full")));
    }

    private CoordinatorRunStats runRuleOnSegment(Rule rule, DataSegment dataSegment, DruidCoordinatorRuntimeParams druidCoordinatorRuntimeParams) {
        StrategicSegmentAssigner segmentAssigner = druidCoordinatorRuntimeParams.getSegmentAssigner();
        rule.run(dataSegment, segmentAssigner);
        return segmentAssigner.getStats();
    }

    private DruidCoordinatorRuntimeParams makeParamsWithUsedSegments(DruidCluster druidCluster, DataSegment... dataSegmentArr) {
        return DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc()).withDruidCluster(druidCluster).withUsedSegments(dataSegmentArr).withBalancerStrategy(new RandomBalancerStrategy()).withSegmentAssignerUsing(new SegmentLoadQueueManager((ServerInventoryView) null, (LoadQueueTaskMaster) null)).build();
    }

    private ServerHolder create10gbHistorical(String str, DataSegment... dataSegmentArr) {
        DruidServer create10gbServer = create10gbServer(ServerType.HISTORICAL, str);
        for (DataSegment dataSegment : dataSegmentArr) {
            create10gbServer.addDataSegment(dataSegment);
        }
        return new ServerHolder(create10gbServer.toImmutableDruidServer(), new TestLoadQueuePeon());
    }

    private ServerHolder createDecommissioningHistorical(String str, DataSegment... dataSegmentArr) {
        DruidServer create10gbServer = create10gbServer(ServerType.HISTORICAL, str);
        for (DataSegment dataSegment : dataSegmentArr) {
            create10gbServer.addDataSegment(dataSegment);
        }
        return new ServerHolder(create10gbServer.toImmutableDruidServer(), new TestLoadQueuePeon(), true);
    }

    private DruidServer create10gbServer(ServerType serverType, String str) {
        StringBuilder append = new StringBuilder().append("server_");
        int i = this.serverId;
        this.serverId = i + 1;
        String sb = append.append(i).toString();
        return new DruidServer(sb, sb, (String) null, 10737418240L, serverType, str, 0);
    }
}
