/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator.duty;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
import org.apache.druid.server.coordinator.balancer.CostBalancerStrategy;
import org.apache.druid.server.coordinator.duty.BalanceSegments;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.ReadableInstant;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class BalanceSegmentsTest {
    private SegmentLoadQueueManager loadQueueManager;
    private DataSegment segment1;
    private DataSegment segment2;
    private DataSegment segment3;
    private DataSegment segment4;
    private DataSegment segment5;
    private DataSegment[] allSegments;
    private ListeningExecutorService balancerStrategyExecutor;
    private BalancerStrategy balancerStrategy;
    private Set<String> broadcastDatasources;
    private DruidServer server1;
    private DruidServer server2;
    private DruidServer server3;
    private DruidServer server4;

    @Before
    public void setUp() {
        this.loadQueueManager = new SegmentLoadQueueManager(null, null);
        DateTime start1 = DateTimes.of((String)"2012-01-01");
        DateTime start2 = DateTimes.of((String)"2012-02-01");
        String version = DateTimes.of((String)"2012-03-01").toString();
        this.segment1 = this.createHourlySegment("datasource1", start1, version);
        this.segment2 = this.createHourlySegment("datasource1", start2, version);
        this.segment3 = this.createHourlySegment("datasource2", start1, version);
        this.segment4 = this.createHourlySegment("datasource2", start2, version);
        this.segment5 = this.createHourlySegment("datasourceBroadcast", start2, version);
        this.allSegments = new DataSegment[]{this.segment1, this.segment2, this.segment3, this.segment4, this.segment5};
        this.server1 = new DruidServer("server1", "server1", null, 100L, ServerType.HISTORICAL, "normal", 0);
        this.server2 = new DruidServer("server2", "server2", null, 100L, ServerType.HISTORICAL, "normal", 0);
        this.server3 = new DruidServer("server3", "server3", null, 100L, ServerType.HISTORICAL, "normal", 0);
        this.server4 = new DruidServer("server4", "server4", null, 100L, ServerType.HISTORICAL, "normal", 0);
        this.balancerStrategyExecutor = MoreExecutors.listeningDecorator((ExecutorService)Execs.multiThreaded((int)1, (String)"BalanceSegmentsTest-%d"));
        this.balancerStrategy = new CostBalancerStrategy(this.balancerStrategyExecutor);
        this.broadcastDatasources = Collections.singleton("datasourceBroadcast");
    }

    @After
    public void tearDown() {
        this.balancerStrategyExecutor.shutdownNow();
    }

    @Test
    public void testMoveToEmptyServerBalancer() {
        ServerHolder serverHolder1 = this.createHolder(this.server1, this.segment1, this.segment2, this.segment3, this.segment4);
        ServerHolder serverHolder2 = this.createHolder(this.server2, new DataSegment[0]);
        DruidCoordinatorRuntimeParams params = this.defaultRuntimeParamsBuilder(serverHolder1, serverHolder2).withBalancerStrategy(this.balancerStrategy).withBroadcastDatasources(this.broadcastDatasources).build();
        CoordinatorRunStats stats = this.runBalancer(params);
        long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource1") + stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource2");
        Assert.assertEquals((long)2L, (long)totalMoved);
    }

    @Test
    public void testMaxDecommissioningMaxPercentOfMaxSegmentsToMove() {
        ServerHolder holder1 = this.createHolder(this.server1, false, this.segment1, this.segment2);
        ServerHolder holder2 = this.createHolder(this.server2, true, this.segment3, this.segment4);
        ServerHolder holder3 = this.createHolder(this.server3, false, new DataSegment[0]);
        CoordinatorDynamicConfig dynamicConfig = CoordinatorDynamicConfig.builder().withSmartSegmentLoading(false).withMaxSegmentsToMove(1).build();
        DruidCoordinatorRuntimeParams params = this.defaultRuntimeParamsBuilder(holder1, holder2, holder3).withDynamicConfigs(dynamicConfig).build();
        CoordinatorRunStats stats = this.runBalancer(params);
        Assert.assertEquals((long)1L, (long)stats.getSegmentStat(Stats.Segments.MOVED, "normal", this.segment3.getDataSource()));
        DataSegment movingSegment = (DataSegment)holder3.getPeon().getSegmentsToLoad().iterator().next();
        Assert.assertEquals((Object)this.segment3.getDataSource(), (Object)movingSegment.getDataSource());
    }

    @Test
    public void testMoveWithNoDecommissioning() {
        ServerHolder serverHolder1 = this.createHolder(this.server1, this.segment1, this.segment2, this.segment3, this.segment4);
        ServerHolder serverHolder2 = this.createHolder(this.server2, new DataSegment[0]);
        CoordinatorDynamicConfig dynamicConfig = CoordinatorDynamicConfig.builder().withSmartSegmentLoading(false).withMaxSegmentsToMove(4).build();
        DruidCoordinatorRuntimeParams params = this.defaultRuntimeParamsBuilder(serverHolder1, serverHolder2).withDynamicConfigs(dynamicConfig).withBalancerStrategy(this.balancerStrategy).withSegmentAssignerUsing(this.loadQueueManager).build();
        CoordinatorRunStats stats = this.runBalancer(params);
        long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource1") + stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource2");
        Assert.assertEquals((long)2L, (long)totalMoved);
        Assert.assertEquals((long)2L, (long)serverHolder2.getPeon().getSegmentsToLoad().size());
    }

    @Test
    public void testMoveToDecommissioningServer() {
        ServerHolder activeServer = this.createHolder(this.server1, false, this.allSegments);
        ServerHolder decommissioningServer = this.createHolder(this.server2, true, new DataSegment[0]);
        DruidCoordinatorRuntimeParams params = this.defaultRuntimeParamsBuilder(activeServer, decommissioningServer).withBalancerStrategy(this.balancerStrategy).withBroadcastDatasources(this.broadcastDatasources).build();
        CoordinatorRunStats stats = this.runBalancer(params);
        Assert.assertFalse((boolean)stats.hasStat(Stats.Segments.MOVED));
    }

    @Test
    public void testMoveFromDecommissioningServer() {
        ServerHolder decommissioningServer = this.createHolder(this.server1, true, this.allSegments);
        ServerHolder activeServer = this.createHolder(this.server2, new DataSegment[0]);
        DruidCoordinatorRuntimeParams params = this.defaultRuntimeParamsBuilder(decommissioningServer, activeServer).withDynamicConfigs(CoordinatorDynamicConfig.builder().withSmartSegmentLoading(false).withMaxSegmentsToMove(3).build()).withBalancerStrategy(this.balancerStrategy).withBroadcastDatasources(this.broadcastDatasources).build();
        this.runBalancer(params);
        Assert.assertEquals((long)0L, (long)decommissioningServer.getPeon().getSegmentsToLoad().size());
        Assert.assertEquals((long)3L, (long)activeServer.getPeon().getSegmentsToLoad().size());
    }

    @Test
    public void testMoveMaxLoadQueueServerBalancer() {
        boolean maxSegmentsInQueue = true;
        ServerHolder holder1 = this.createHolder(this.server1, 1, false, this.allSegments);
        ServerHolder holder2 = this.createHolder(this.server2, 1, false, new DataSegment[0]);
        CoordinatorDynamicConfig dynamicConfig = CoordinatorDynamicConfig.builder().withSmartSegmentLoading(false).withMaxSegmentsInNodeLoadingQueue(1).build();
        DruidCoordinatorRuntimeParams params = this.defaultRuntimeParamsBuilder(holder1, holder2).withDynamicConfigs(dynamicConfig).build();
        CoordinatorRunStats stats = this.runBalancer(params);
        Assert.assertEquals((long)1L, (long)stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource1"));
    }

    @Test
    public void testRun1() {
        DruidCoordinatorRuntimeParams params = this.defaultRuntimeParamsBuilder(this.createHolder(this.server1, this.allSegments), this.createHolder(this.server2, new DataSegment[0])).build();
        CoordinatorRunStats stats = this.runBalancer(params);
        Assert.assertTrue((stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource1") > 0L ? 1 : 0) != 0);
    }

    @Test
    public void testRun2() {
        DruidCoordinatorRuntimeParams params = this.defaultRuntimeParamsBuilder(this.createHolder(this.server1, this.allSegments), this.createHolder(this.server2, new DataSegment[0]), this.createHolder(this.server3, new DataSegment[0]), this.createHolder(this.server4, new DataSegment[0])).build();
        CoordinatorRunStats stats = this.runBalancer(params);
        Assert.assertTrue((stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource1") > 0L ? 1 : 0) != 0);
    }

    @Test
    public void testMaxSegmentsToMoveIsHonored() {
        ServerHolder holder1 = this.createHolder(this.server1, this.segment1, this.segment2);
        ServerHolder holder2 = this.createHolder(this.server2, this.segment3, this.segment4);
        ServerHolder holder3 = this.createHolder(this.server3, new DataSegment[0]);
        DruidCoordinatorRuntimeParams params = this.defaultRuntimeParamsBuilder(holder1, holder2, holder3).withDynamicConfigs(CoordinatorDynamicConfig.builder().withSmartSegmentLoading(false).withMaxSegmentsToMove(1).build()).withBalancerStrategy(this.balancerStrategy).withBroadcastDatasources(this.broadcastDatasources).build();
        CoordinatorRunStats stats = this.runBalancer(params);
        long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource1") + stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource2");
        Assert.assertEquals((long)1L, (long)totalMoved);
        Assert.assertEquals((long)1L, (long)holder3.getPeon().getSegmentsToLoad().size());
    }

    @Test
    public void testMoveForMultipleDatasources() {
        DruidCoordinatorRuntimeParams params = this.defaultRuntimeParamsBuilder(this.createHolder(this.server1, this.allSegments), this.createHolder(this.server2, new DataSegment[0]), this.createHolder(this.server3, new DataSegment[0]), this.createHolder(this.server4, new DataSegment[0])).withDynamicConfigs(CoordinatorDynamicConfig.builder().withSmartSegmentLoading(false).withMaxSegmentsToMove(2).build()).withBroadcastDatasources(this.broadcastDatasources).build();
        CoordinatorRunStats stats = this.runBalancer(params);
        long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource1") + stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource2");
        Assert.assertEquals((long)2L, (long)totalMoved);
    }

    private CoordinatorRunStats runBalancer(DruidCoordinatorRuntimeParams params) {
        params = new BalanceSegments(Duration.standardMinutes((long)1L)).run(params);
        if (params == null) {
            Assert.fail((String)"BalanceSegments duty returned null params");
            return new CoordinatorRunStats();
        }
        return params.getCoordinatorStats();
    }

    private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder(ServerHolder ... servers) {
        return DruidCoordinatorRuntimeParams.builder().withDruidCluster(DruidCluster.builder().addTier("normal", servers).build()).withUsedSegments(this.allSegments).withBroadcastDatasources(this.broadcastDatasources).withBalancerStrategy(this.balancerStrategy).withSegmentAssignerUsing(this.loadQueueManager);
    }

    private ServerHolder createHolder(DruidServer server, DataSegment ... loadedSegments) {
        return this.createHolder(server, false, loadedSegments);
    }

    private ServerHolder createHolder(DruidServer server, boolean isDecommissioning, DataSegment ... loadedSegments) {
        return this.createHolder(server, 0, isDecommissioning, loadedSegments);
    }

    private ServerHolder createHolder(DruidServer server, int maxSegmentsInLoadQueue, boolean isDecommissioning, DataSegment ... loadedSegments) {
        for (DataSegment segment : loadedSegments) {
            server.addDataSegment(segment);
        }
        return new ServerHolder(server.toImmutableDruidServer(), (LoadQueuePeon)new TestLoadQueuePeon(), isDecommissioning, maxSegmentsInLoadQueue, 10);
    }

    private DataSegment createHourlySegment(String datasource, DateTime start, String version) {
        return new DataSegment(datasource, new Interval((ReadableInstant)start, (ReadableInstant)start.plusHours(1)), version, Collections.emptyMap(), Collections.emptyList(), Collections.emptyList(), (ShardSpec)NoneShardSpec.instance(), Integer.valueOf(0), 8L);
    }
}

