package org.apache.helix.messaging.p2pMessage;

import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import org.apache.helix.HelixConstants;
import org.apache.helix.controller.common.PartitionStateMap;
import org.apache.helix.controller.common.ResourcesStateMap;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BaseStageTest;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageOutput;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageThrottleStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.class */
public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
    String _db = "testDB";
    int _numPartition = 1;
    int _numReplica = 3;
    Partition _partition = new Partition(this._db + "_0");
    ResourceControllerDataProvider _dataCache;
    Pipeline _fullPipeline;
    Pipeline _messagePipeline;
    ResourcesStateMap _bestpossibleState;

    private void preSetup() throws Exception {
        setupIdealState(3, new String[]{this._db}, this._numPartition, this._numReplica, IdealState.RebalanceMode.SEMI_AUTO, BuiltInStateModelDefinitions.MasterSlave.name());
        setupStateModel();
        setupInstances(3);
        setupLiveInstances(3);
        ClusterConfig clusterConfig = new ClusterConfig(this._clusterName);
        clusterConfig.enableP2PMessage(true);
        setClusterConfig(clusterConfig);
        Map<String, Resource> resourceMap = getResourceMap(new String[]{this._db}, this._numPartition, BuiltInStateModelDefinitions.MasterSlave.name(), clusterConfig, null);
        this._dataCache = new ResourceControllerDataProvider();
        this._dataCache.setAsyncTasksThreadPool(Executors.newSingleThreadExecutor());
        this.event.addAttribute(AttributeName.ControllerDataProvider.name(), this._dataCache);
        this.event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
        this.event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
        this.event.addAttribute(AttributeName.CURRENT_STATE.name(), new CurrentStateOutput());
        this.event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), new CurrentStateOutput());
        this.event.addAttribute(AttributeName.helixmanager.name(), this.manager);
        this._fullPipeline = new Pipeline("FullPipeline");
        this._fullPipeline.addStage(new ReadClusterDataStage());
        this._fullPipeline.addStage(new BestPossibleStateCalcStage());
        this._fullPipeline.addStage(new MessageGenerationPhase());
        this._fullPipeline.addStage(new MessageSelectionStage());
        this._fullPipeline.addStage(new IntermediateStateCalcStage());
        this._fullPipeline.addStage(new MessageThrottleStage());
        this._messagePipeline = new Pipeline("MessagePipeline");
        this._messagePipeline.addStage(new MessageGenerationPhase());
        this._messagePipeline.addStage(new MessageSelectionStage());
        this._messagePipeline.addStage(new IntermediateStateCalcStage());
        this._messagePipeline.addStage(new MessageThrottleStage());
        this._fullPipeline.handle(this.event);
        this._bestpossibleState = (ResourcesStateMap) this.event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
    }

    @Test
    public void testP2PAvoidDuplicatedMessage() throws Exception {
        preSetup();
        String topStateInstance = getTopStateInstance(this._bestpossibleState.getInstanceStateMap(this._db, this._partition), MasterSlaveSMD.States.MASTER.name());
        Assert.assertNotNull(topStateInstance);
        this.admin.enableInstance(this._clusterName, topStateInstance, false);
        this._dataCache = (ResourceControllerDataProvider) this.event.getAttribute(AttributeName.ControllerDataProvider.name());
        this._dataCache.notifyDataChange(HelixConstants.ChangeType.INSTANCE_CONFIG);
        CurrentStateOutput populateCurrentStateFromBestPossible = populateCurrentStateFromBestPossible(this._bestpossibleState);
        this.event.addAttribute(AttributeName.CURRENT_STATE.name(), populateCurrentStateFromBestPossible);
        this.event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), populateCurrentStateFromBestPossible);
        this._fullPipeline.handle(this.event);
        this._bestpossibleState = (ResourcesStateMap) this.event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
        List messages = ((MessageOutput) this.event.getAttribute(AttributeName.MESSAGES_SELECTED.name())).getMessages(this._db, this._partition);
        Assert.assertEquals(messages.size(), 1);
        Message message = (Message) messages.get(0);
        Assert.assertEquals(message.getTgtName(), topStateInstance);
        Assert.assertEquals(message.getFromState(), MasterSlaveSMD.States.MASTER.name());
        Assert.assertEquals(message.getToState(), MasterSlaveSMD.States.SLAVE.name());
        Assert.assertEquals(message.getRelayMessages().entrySet().size(), 1);
        String topStateInstance2 = getTopStateInstance(this._bestpossibleState.getInstanceStateMap(this._db, this._partition), MasterSlaveSMD.States.MASTER.name());
        Message relayMessage = message.getRelayMessage(topStateInstance2);
        Assert.assertNotNull(relayMessage);
        Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
        Assert.assertEquals(relayMessage.getTgtName(), topStateInstance2);
        Assert.assertEquals(relayMessage.getRelaySrcHost(), topStateInstance);
        Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
        Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
        populateCurrentStateFromBestPossible.setCurrentState(this._db, this._partition, topStateInstance, "SLAVE");
        populateCurrentStateFromBestPossible.setPendingMessage(this._db, this._partition, topStateInstance, message);
        populateCurrentStateFromBestPossible.setPendingRelayMessage(this._db, this._partition, topStateInstance, relayMessage);
        this.event.addAttribute(AttributeName.CURRENT_STATE.name(), populateCurrentStateFromBestPossible);
        this.event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), populateCurrentStateFromBestPossible);
        this._fullPipeline.handle(this.event);
        Assert.assertEquals(((MessageOutput) this.event.getAttribute(AttributeName.MESSAGES_SELECTED.name())).getMessages(this._db, this._partition).size(), 0);
        populateCurrentStateFromBestPossible.setCurrentState(this._db, this._partition, topStateInstance, "SLAVE");
        populateCurrentStateFromBestPossible.getPendingMessageMap(this._db, this._partition).clear();
        populateCurrentStateFromBestPossible.setPendingRelayMessage(this._db, this._partition, topStateInstance, relayMessage);
        this.event.addAttribute(AttributeName.CURRENT_STATE.name(), populateCurrentStateFromBestPossible);
        this.event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), populateCurrentStateFromBestPossible);
        this._messagePipeline.handle(this.event);
        List<Message> messages2 = ((MessageOutput) this.event.getAttribute(AttributeName.MESSAGES_SELECTED.name())).getMessages(this._db, this._partition);
        Assert.assertEquals(messages2.size(), 2);
        boolean z = false;
        boolean z2 = false;
        for (Message message2 : messages2) {
            if (message2.getToState().equals(MasterSlaveSMD.States.MASTER.name()) && message2.getTgtName().equals(topStateInstance2)) {
                z2 = true;
            }
            if (message2.getToState().equals(MasterSlaveSMD.States.OFFLINE.name()) && message2.getTgtName().equals(topStateInstance)) {
                z = true;
            }
        }
        Assert.assertTrue(z2);
        Assert.assertTrue(z);
        String topStateInstance3 = getTopStateInstance(this._bestpossibleState.getInstanceStateMap(this._db, this._partition), MasterSlaveSMD.States.SLAVE.name());
        Map instanceStateMap = this._bestpossibleState.getInstanceStateMap(this._db, this._partition);
        instanceStateMap.put(topStateInstance2, "SLAVE");
        instanceStateMap.put(topStateInstance3, "MASTER");
        this._bestpossibleState.setState(this._db, this._partition, instanceStateMap);
        this.event.addAttribute(AttributeName.CURRENT_STATE.name(), populateCurrentStateFromBestPossible);
        this.event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), populateCurrentStateFromBestPossible);
        this.event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), this._bestpossibleState);
        this._messagePipeline.handle(this.event);
        List messages3 = ((MessageOutput) this.event.getAttribute(AttributeName.MESSAGES_SELECTED.name())).getMessages(this._db, this._partition);
        Assert.assertEquals(messages3.size(), 1);
        Assert.assertTrue(((Message) messages3.get(0)).getToState().equals("OFFLINE"));
        Assert.assertTrue(((Message) messages3.get(0)).getTgtName().equals(topStateInstance));
        Map instanceStateMap2 = this._bestpossibleState.getInstanceStateMap(this._db, this._partition);
        instanceStateMap2.put(topStateInstance2, "MASTER");
        instanceStateMap2.put(topStateInstance3, "SLAVE");
        this._bestpossibleState.setState(this._db, this._partition, instanceStateMap2);
        CurrentStateOutput populateCurrentStateFromBestPossible2 = populateCurrentStateFromBestPossible(this._bestpossibleState);
        populateCurrentStateFromBestPossible2.setCurrentState(this._db, this._partition, topStateInstance, "SLAVE");
        populateCurrentStateFromBestPossible2.setPendingMessage(this._db, this._partition, topStateInstance2, relayMessage);
        this.event.addAttribute(AttributeName.CURRENT_STATE.name(), populateCurrentStateFromBestPossible2);
        this.event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), populateCurrentStateFromBestPossible2);
        this._fullPipeline.handle(this.event);
        List messages4 = ((MessageOutput) this.event.getAttribute(AttributeName.MESSAGES_SELECTED.name())).getMessages(this._db, this._partition);
        Assert.assertEquals(messages4.size(), 1);
        Message message3 = (Message) messages4.get(0);
        Assert.assertEquals(message3.getTgtName(), topStateInstance);
        Assert.assertEquals(message3.getFromState(), MasterSlaveSMD.States.SLAVE.name());
        Assert.assertEquals(message3.getToState(), MasterSlaveSMD.States.OFFLINE.name());
        populateCurrentStateFromBestPossible2.setCurrentState(this._db, this._partition, topStateInstance, "OFFLINE");
        this.event.addAttribute(AttributeName.CURRENT_STATE.name(), populateCurrentStateFromBestPossible2);
        this.event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), populateCurrentStateFromBestPossible2);
        String topStateInstance4 = getTopStateInstance(this._bestpossibleState.getInstanceStateMap(this._db, this._partition), MasterSlaveSMD.States.SLAVE.name());
        Map instanceStateMap3 = this._bestpossibleState.getInstanceStateMap(this._db, this._partition);
        instanceStateMap3.put(topStateInstance2, "SLAVE");
        instanceStateMap3.put(topStateInstance4, "MASTER");
        this._bestpossibleState.setState(this._db, this._partition, instanceStateMap3);
        this.event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), this._bestpossibleState);
        this._messagePipeline.handle(this.event);
        Assert.assertEquals(((MessageOutput) this.event.getAttribute(AttributeName.MESSAGES_SELECTED.name())).getMessages(this._db, this._partition).size(), 0);
        populateCurrentStateFromBestPossible2.setPendingMessage(this._db, this._partition, topStateInstance2, relayMessage);
        this.event.addAttribute(AttributeName.CURRENT_STATE.name(), populateCurrentStateFromBestPossible2);
        this.event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), populateCurrentStateFromBestPossible2);
        this.event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), this._bestpossibleState);
        this._messagePipeline.handle(this.event);
        Assert.assertEquals(((MessageOutput) this.event.getAttribute(AttributeName.MESSAGES_SELECTED.name())).getMessages(this._db, this._partition).size(), 0);
        CurrentStateOutput populateCurrentStateFromBestPossible3 = populateCurrentStateFromBestPossible(this._bestpossibleState);
        populateCurrentStateFromBestPossible3.setCurrentState(this._db, this._partition, topStateInstance2, "MASTER");
        populateCurrentStateFromBestPossible3.setCurrentState(this._db, this._partition, topStateInstance4, "SLAVE");
        this.event.addAttribute(AttributeName.CURRENT_STATE.name(), populateCurrentStateFromBestPossible3);
        this.event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), populateCurrentStateFromBestPossible3);
        this._messagePipeline.handle(this.event);
        List messages5 = ((MessageOutput) this.event.getAttribute(AttributeName.MESSAGES_SELECTED.name())).getMessages(this._db, this._partition);
        Assert.assertEquals(messages5.size(), 1);
        Message message4 = (Message) messages5.get(0);
        Assert.assertEquals(message4.getTgtName(), topStateInstance2);
        Assert.assertEquals(message4.getFromState(), MasterSlaveSMD.States.MASTER.name());
        Assert.assertEquals(message4.getToState(), MasterSlaveSMD.States.SLAVE.name());
        Assert.assertEquals(message4.getRelayMessages().entrySet().size(), 1);
        Message relayMessage2 = message4.getRelayMessage(topStateInstance4);
        Assert.assertNotNull(relayMessage2);
        Assert.assertEquals(relayMessage2.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
        Assert.assertEquals(relayMessage2.getTgtName(), topStateInstance4);
        Assert.assertEquals(relayMessage2.getRelaySrcHost(), topStateInstance2);
        Assert.assertEquals(relayMessage2.getFromState(), MasterSlaveSMD.States.SLAVE.name());
        Assert.assertEquals(relayMessage2.getToState(), MasterSlaveSMD.States.MASTER.name());
    }

    private String getTopStateInstance(Map<String, String> map, String str) {
        String str2 = null;
        for (Map.Entry<String, String> entry : map.entrySet()) {
            if (str.equals(entry.getValue())) {
                str2 = entry.getKey();
            }
        }
        return str2;
    }

    private CurrentStateOutput populateCurrentStateFromBestPossible(ResourcesStateMap resourcesStateMap) {
        CurrentStateOutput currentStateOutput = new CurrentStateOutput();
        for (String str : resourcesStateMap.getResourceStatesMap().keySet()) {
            PartitionStateMap partitionStateMap = resourcesStateMap.getPartitionStateMap(str);
            for (Partition partition : partitionStateMap.partitionSet()) {
                for (Map.Entry entry : partitionStateMap.getPartitionMap(partition).entrySet()) {
                    currentStateOutput.setCurrentState(str, partition, (String) entry.getKey(), (String) entry.getValue());
                }
            }
        }
        return currentStateOutput;
    }
}
