package org.apache.helix.controller.stages;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/controller/stages/TestRebalancePipeline.class */
public class TestRebalancePipeline extends ZkUnitTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(TestRebalancePipeline.class.getName());
    final String _className = getShortClassName();

    @Test
    public void testDuplicateMsg() {
        String str = "CLUSTER_" + this._className + "_dup";
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        ZKHelixDataAccessor zKHelixDataAccessor = new ZKHelixDataAccessor(str, new ZkBaseDataAccessor(_gZkClient));
        refreshClusterConfig(str, zKHelixDataAccessor);
        DummyClusterManager dummyClusterManager = new DummyClusterManager(str, zKHelixDataAccessor);
        ClusterEvent clusterEvent = new ClusterEvent(ClusterEventType.Unknown);
        clusterEvent.addAttribute(AttributeName.helixmanager.name(), dummyClusterManager);
        setupIdealState(str, new int[]{0}, new String[]{"testResource_dup"}, 1, 1);
        setupLiveInstances(str, new int[]{0});
        setupStateModel(str);
        Pipeline pipeline = new Pipeline();
        pipeline.addStage(new ReadClusterDataStage());
        Pipeline pipeline2 = new Pipeline();
        pipeline2.addStage(new ResourceComputationStage());
        pipeline2.addStage(new CurrentStateComputationStage());
        pipeline2.addStage(new BestPossibleStateCalcStage());
        pipeline2.addStage(new IntermediateStateCalcStage());
        pipeline2.addStage(new MessageGenerationPhase());
        pipeline2.addStage(new MessageSelectionStage());
        pipeline2.addStage(new MessageThrottleStage());
        pipeline2.addStage(new TaskAssignmentStage());
        setCurrentState(str, "localhost_0", "testResource_dup", "testResource_dup_0", "session_0", "OFFLINE");
        runPipeline(clusterEvent, pipeline);
        runPipeline(clusterEvent, pipeline2);
        List messages = ((MessageSelectionStageOutput) clusterEvent.getAttribute(AttributeName.MESSAGES_SELECTED.name())).getMessages("testResource_dup", new Partition("testResource_dup_0"));
        Assert.assertEquals(messages.size(), 1, "Should output 1 message: OFFLINE-SLAVE for node0");
        Message message = (Message) messages.get(0);
        Assert.assertEquals(message.getFromState(), "OFFLINE");
        Assert.assertEquals(message.getToState(), "SLAVE");
        Assert.assertEquals(message.getTgtName(), "localhost_0");
        setCurrentState(str, "localhost_0", "testResource_dup", "testResource_dup_0", "session_1", "SLAVE");
        runPipeline(clusterEvent, pipeline);
        refreshClusterConfig(str, zKHelixDataAccessor);
        runPipeline(clusterEvent, pipeline2);
        Assert.assertEquals(((MessageSelectionStageOutput) clusterEvent.getAttribute(AttributeName.MESSAGES_SELECTED.name())).getMessages("testResource_dup", new Partition("testResource_dup_0")).size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node1");
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }

    @Test
    public void testMsgTriggeredRebalance() throws Exception {
        String str = "CLUSTER_" + this._className + "_msgTrigger";
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        ZKHelixDataAccessor zKHelixDataAccessor = new ZKHelixDataAccessor(str, new ZkBaseDataAccessor(_gZkClient));
        refreshClusterConfig(str, zKHelixDataAccessor);
        TestHelper.setupEmptyCluster(_gZkClient, str);
        setupIdealState(str, new int[]{0, 1}, new String[]{"testResource_dup"}, 1, 2);
        setupStateModel(str);
        setupInstances(str, new int[]{0, 1});
        setupLiveInstances(str, new int[]{0, 1});
        long j = MessageGenerationPhase.DEFAULT_OBSELETE_MSG_PURGE_DELAY;
        new ClusterControllerManager(ZkUnitTestBase.ZK_ADDR, str, "controller_0").syncStart();
        Thread.sleep(1000L);
        PropertyKey.Builder keyBuilder = zKHelixDataAccessor.keyBuilder();
        Assert.assertEquals(zKHelixDataAccessor.getChildNames(keyBuilder.messages("localhost_0")).size(), 1);
        Assert.assertEquals(zKHelixDataAccessor.getChildNames(keyBuilder.messages("localhost_1")).size(), 1);
        setCurrentState(str, "localhost_0", "testResource_dup", "testResource_dup_0", "session_0", "SLAVE", true);
        setCurrentState(str, "localhost_1", "testResource_dup", "testResource_dup_0", "session_1", "SLAVE", true);
        Thread.sleep(1000L);
        Assert.assertEquals(zKHelixDataAccessor.getChildValues(keyBuilder.messages("localhost_0")).size(), 1);
        Assert.assertEquals(zKHelixDataAccessor.getChildValues(keyBuilder.messages("localhost_1")).size(), 1);
        Thread.sleep(j);
        setCurrentState(str, "localhost_0", "testResource_dup", "testResource_dup_0", "session_0", "SLAVE");
        Thread.sleep(1000L);
        List childValues = zKHelixDataAccessor.getChildValues(keyBuilder.messages("localhost_0"));
        List childValues2 = zKHelixDataAccessor.getChildValues(keyBuilder.messages("localhost_1"));
        ArrayList arrayList = new ArrayList(childValues);
        arrayList.addAll(childValues2);
        Assert.assertEquals(arrayList.size(), 1);
        Assert.assertEquals(((Message) arrayList.get(0)).getToState(), "MASTER");
        Assert.assertEquals(((Message) arrayList.get(0)).getFromState(), "SLAVE");
        setCurrentState(str, "localhost_0", "testResource_dup", "testResource_dup_0", "session_0", "MASTER", true);
        Thread.sleep(j);
        setCurrentState(str, "localhost_0", "testResource_dup", "testResource_dup_0", "session_0", "MASTER", false);
        Thread.sleep(1000L);
        Assert.assertTrue(zKHelixDataAccessor.getChildNames(keyBuilder.messages("localhost_0")).isEmpty());
        Message message = (Message) arrayList.get(0);
        Message message2 = new Message(message.getMsgType(), UUID.randomUUID().toString());
        message2.getRecord().setSimpleFields(message.getRecord().getSimpleFields());
        message2.getRecord().setListFields(message.getRecord().getListFields());
        message2.getRecord().setMapFields(message.getRecord().getMapFields());
        zKHelixDataAccessor.setProperty(message2.getKey(zKHelixDataAccessor.keyBuilder(), message2.getTgtName()), message2);
        Thread.sleep(1000L);
        Assert.assertTrue(zKHelixDataAccessor.getChildNames(keyBuilder.messages("localhost_0")).isEmpty());
        message2.setFromState("SLAVE");
        message2.setToState("OFFLINE");
        zKHelixDataAccessor.setProperty(message2.getKey(zKHelixDataAccessor.keyBuilder(), message2.getTgtName()), message2);
        Thread.sleep(1000L);
        Assert.assertTrue(zKHelixDataAccessor.getChildNames(keyBuilder.messages("localhost_0")).isEmpty());
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }

    @Test
    public void testChangeIdealStateWithPendingMsg() {
        String str = "CLUSTER_" + this._className + "_pending";
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        ZKHelixDataAccessor zKHelixDataAccessor = new ZKHelixDataAccessor(str, new ZkBaseDataAccessor(_gZkClient));
        DummyClusterManager dummyClusterManager = new DummyClusterManager(str, zKHelixDataAccessor);
        ClusterEvent clusterEvent = new ClusterEvent(ClusterEventType.Unknown);
        clusterEvent.addAttribute(AttributeName.helixmanager.name(), dummyClusterManager);
        ClusterDataCache clusterDataCache = new ClusterDataCache();
        clusterEvent.addAttribute(AttributeName.ClusterDataCache.name(), clusterDataCache);
        refreshClusterConfig(str, zKHelixDataAccessor);
        setupIdealState(str, new int[]{0}, new String[]{"testResource_pending"}, 1, 1);
        setupLiveInstances(str, new int[]{0});
        setupStateModel(str);
        Pipeline pipeline = new Pipeline();
        pipeline.addStage(new ReadClusterDataStage());
        Pipeline pipeline2 = new Pipeline();
        pipeline2.addStage(new ResourceComputationStage());
        pipeline2.addStage(new CurrentStateComputationStage());
        pipeline2.addStage(new BestPossibleStateCalcStage());
        pipeline2.addStage(new IntermediateStateCalcStage());
        pipeline2.addStage(new MessageGenerationPhase());
        pipeline2.addStage(new MessageSelectionStage());
        pipeline2.addStage(new MessageThrottleStage());
        pipeline2.addStage(new TaskAssignmentStage());
        setCurrentState(str, "localhost_0", "testResource_pending", "testResource_pending_0", "session_0", "OFFLINE");
        runPipeline(clusterEvent, pipeline);
        runPipeline(clusterEvent, pipeline2);
        List messages = ((MessageSelectionStageOutput) clusterEvent.getAttribute(AttributeName.MESSAGES_SELECTED.name())).getMessages("testResource_pending", new Partition("testResource_pending_0"));
        Assert.assertEquals(messages.size(), 1, "Should output 1 message: OFFLINE-SLAVE for node0");
        Message message = (Message) messages.get(0);
        Assert.assertEquals(message.getFromState(), "OFFLINE");
        Assert.assertEquals(message.getToState(), "SLAVE");
        Assert.assertEquals(message.getTgtName(), "localhost_0");
        new ZKHelixAdmin(_gZkClient).dropResource(str, "testResource_pending");
        clusterDataCache.setIdealStates(zKHelixDataAccessor.getChildValues(zKHelixDataAccessor.keyBuilder().idealStates()));
        runPipeline(clusterEvent, pipeline);
        ((ClusterDataCache) clusterEvent.getAttribute(AttributeName.ClusterDataCache.name())).setClusterConfig(new ClusterConfig(str));
        runPipeline(clusterEvent, pipeline2);
        Assert.assertEquals(((MessageSelectionStageOutput) clusterEvent.getAttribute(AttributeName.MESSAGES_SELECTED.name())).getMessages("testResource_pending", new Partition("testResource_pending_0")).size(), 0, "Should not output only 1 message: OFFLINE->DROPPED for localhost_0");
        PropertyKey.Builder keyBuilder = zKHelixDataAccessor.keyBuilder();
        zKHelixDataAccessor.removeProperty(keyBuilder.message("localhost_0", (String) zKHelixDataAccessor.getChildNames(keyBuilder.messages("localhost_0")).get(0)));
        runPipeline(clusterEvent, pipeline);
        runPipeline(clusterEvent, pipeline2);
        List messages2 = ((MessageSelectionStageOutput) clusterEvent.getAttribute(AttributeName.MESSAGES_SELECTED.name())).getMessages("testResource_pending", new Partition("testResource_pending_0"));
        Assert.assertEquals(messages2.size(), 1, "Should output 1 message: OFFLINE->DROPPED for localhost_0");
        Message message2 = (Message) messages2.get(0);
        Assert.assertEquals(message2.getFromState(), "OFFLINE");
        Assert.assertEquals(message2.getToState(), "DROPPED");
        Assert.assertEquals(message2.getTgtName(), "localhost_0");
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }

    @Test
    public void testMasterXfer() {
        String str = "CLUSTER_" + this._className + "_xfer";
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        ZKHelixDataAccessor zKHelixDataAccessor = new ZKHelixDataAccessor(str, new ZkBaseDataAccessor(_gZkClient));
        DummyClusterManager dummyClusterManager = new DummyClusterManager(str, zKHelixDataAccessor);
        ClusterEvent clusterEvent = new ClusterEvent(ClusterEventType.Unknown);
        clusterEvent.addAttribute(AttributeName.helixmanager.name(), dummyClusterManager);
        refreshClusterConfig(str, zKHelixDataAccessor);
        setupIdealState(str, new int[]{0, 1}, new String[]{"testResource_xfer"}, 1, 2);
        setupLiveInstances(str, new int[]{1});
        setupStateModel(str);
        Pipeline pipeline = new Pipeline();
        pipeline.addStage(new ReadClusterDataStage());
        Pipeline pipeline2 = new Pipeline();
        pipeline2.addStage(new ResourceComputationStage());
        pipeline2.addStage(new CurrentStateComputationStage());
        pipeline2.addStage(new BestPossibleStateCalcStage());
        pipeline2.addStage(new IntermediateStateCalcStage());
        pipeline2.addStage(new MessageGenerationPhase());
        pipeline2.addStage(new MessageSelectionStage());
        pipeline2.addStage(new MessageThrottleStage());
        pipeline2.addStage(new TaskAssignmentStage());
        setCurrentState(str, "localhost_1", "testResource_xfer", "testResource_xfer_0", "session_1", "SLAVE");
        runPipeline(clusterEvent, pipeline);
        runPipeline(clusterEvent, pipeline2);
        List messages = ((MessageSelectionStageOutput) clusterEvent.getAttribute(AttributeName.MESSAGES_SELECTED.name())).getMessages("testResource_xfer", new Partition("testResource_xfer_0"));
        Assert.assertEquals(messages.size(), 1, "Should output 1 message: SLAVE-MASTER for node1");
        Message message = (Message) messages.get(0);
        Assert.assertEquals(message.getFromState(), "SLAVE");
        Assert.assertEquals(message.getToState(), "MASTER");
        Assert.assertEquals(message.getTgtName(), "localhost_1");
        setupLiveInstances(str, new int[]{0});
        setCurrentState(str, "localhost_0", "testResource_xfer", "testResource_xfer_0", "session_0", "SLAVE");
        runPipeline(clusterEvent, pipeline);
        runPipeline(clusterEvent, pipeline2);
        Assert.assertEquals(((MessageSelectionStageOutput) clusterEvent.getAttribute(AttributeName.MESSAGES_SELECTED.name())).getMessages("testResource_xfer", new Partition("testResource_xfer_0")).size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node0");
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }

    @Test
    public void testNoDuplicatedMaster() {
        String str = "CLUSTER_" + this._className + "_no_duplicated_master";
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        ZKHelixDataAccessor zKHelixDataAccessor = new ZKHelixDataAccessor(str, new ZkBaseDataAccessor(_gZkClient));
        DummyClusterManager dummyClusterManager = new DummyClusterManager(str, zKHelixDataAccessor);
        ClusterEvent clusterEvent = new ClusterEvent(ClusterEventType.Unknown);
        clusterEvent.addAttribute(AttributeName.helixmanager.name(), dummyClusterManager);
        refreshClusterConfig(str, zKHelixDataAccessor);
        setupIdealState(str, new int[]{0, 1}, new String[]{"testResource_no_duplicated_master"}, 1, 2);
        setupLiveInstances(str, new int[]{0, 1});
        setupStateModel(str);
        Pipeline pipeline = new Pipeline();
        pipeline.addStage(new ReadClusterDataStage());
        Pipeline pipeline2 = new Pipeline();
        pipeline2.addStage(new ResourceComputationStage());
        pipeline2.addStage(new CurrentStateComputationStage());
        pipeline2.addStage(new BestPossibleStateCalcStage());
        pipeline2.addStage(new IntermediateStateCalcStage());
        pipeline2.addStage(new MessageGenerationPhase());
        pipeline2.addStage(new MessageSelectionStage());
        pipeline2.addStage(new MessageThrottleStage());
        pipeline2.addStage(new TaskAssignmentStage());
        setCurrentState(str, "localhost_0", "testResource_no_duplicated_master", "testResource_no_duplicated_master_0", "session_0", "SLAVE");
        setCurrentState(str, "localhost_1", "testResource_no_duplicated_master", "testResource_no_duplicated_master_0", "session_1", "MASTER");
        runPipeline(clusterEvent, pipeline);
        runPipeline(clusterEvent, pipeline2);
        List messages = ((MessageSelectionStageOutput) clusterEvent.getAttribute(AttributeName.MESSAGES_SELECTED.name())).getMessages("testResource_no_duplicated_master", new Partition("testResource_no_duplicated_master_0"));
        Assert.assertEquals(messages.size(), 1, "Should output 1 message: MASTER-SLAVE for localhost_1");
        Message message = (Message) messages.get(0);
        Assert.assertEquals(message.getFromState(), "MASTER");
        Assert.assertEquals(message.getToState(), "SLAVE");
        Assert.assertEquals(message.getTgtName(), "localhost_1");
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }

    protected void setCurrentState(String str, String str2, String str3, String str4, String str5, String str6) {
        setCurrentState(str, str2, str3, str4, str5, str6, false);
    }

    private void setCurrentState(String str, String str2, String str3, String str4, String str5, String str6, boolean z) {
        ZKHelixDataAccessor zKHelixDataAccessor = new ZKHelixDataAccessor(str, new ZkBaseDataAccessor(_gZkClient));
        PropertyKey.Builder keyBuilder = zKHelixDataAccessor.keyBuilder();
        CurrentState currentState = new CurrentState(str3);
        currentState.setState(str4, str6);
        currentState.setSessionId(str5);
        currentState.setStateModelDefRef("MasterSlave");
        if (z) {
            currentState.setEndTime(str4, System.currentTimeMillis());
        }
        zKHelixDataAccessor.setProperty(keyBuilder.currentState(str2, str5, str3), currentState);
    }

    private void refreshClusterConfig(String str, HelixDataAccessor helixDataAccessor) {
        helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().clusterConfig(), new ClusterConfig(str));
    }
}
