package org.apache.helix.messaging.p2pMessage;

import java.lang.reflect.Method;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.PropertyKey;
import org.apache.helix.controller.common.ResourcesStateMap;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BaseStageTest;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageThrottleStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.testng.Assert;
import org.testng.ITestContext;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/messaging/p2pMessage/TestP2PMessages.class */
public class TestP2PMessages extends BaseStageTest {
    private String _db = "testDB";
    private int _numPartition = 1;
    private int _numReplica = 3;
    private Partition _partition = new Partition(this._db + "_0");
    private ResourceControllerDataProvider _dataCache;
    private Pipeline _fullPipeline;
    private ResourcesStateMap _initialStateMap;
    private Set<String> _instances;
    private Map<String, LiveInstance> _liveInstanceMap;
    private String _initialMaster;

    @Override // org.apache.helix.controller.stages.BaseStageTest
    @BeforeClass
    public void beforeClass() {
        super.beforeClass();
        setup();
        setupIdealState(3, new String[]{this._db}, this._numPartition, this._numReplica, IdealState.RebalanceMode.SEMI_AUTO, BuiltInStateModelDefinitions.MasterSlave.name());
        setupStateModel();
        setupInstances(3);
        setupLiveInstances(3);
        ClusterConfig clusterConfig = new ClusterConfig(this._clusterName);
        clusterConfig.enableP2PMessage(true);
        setClusterConfig(clusterConfig);
        this._dataCache = new ResourceControllerDataProvider(this._clusterName);
        this._dataCache.setAsyncTasksThreadPool(Executors.newSingleThreadExecutor());
        this._dataCache.refresh(this.manager.getHelixDataAccessor());
        this.event.addAttribute(AttributeName.ControllerDataProvider.name(), this._dataCache);
        this.event.addAttribute(AttributeName.helixmanager.name(), this.manager);
        this._fullPipeline = new Pipeline("FullPipeline");
        this._fullPipeline.addStage(new ReadClusterDataStage());
        this._fullPipeline.addStage(new ResourceComputationStage());
        this._fullPipeline.addStage(new CurrentStateComputationStage());
        this._fullPipeline.addStage(new BestPossibleStateCalcStage());
        this._fullPipeline.addStage(new IntermediateStateCalcStage());
        this._fullPipeline.addStage(new ResourceMessageGenerationPhase());
        this._fullPipeline.addStage(new MessageSelectionStage());
        this._fullPipeline.addStage(new MessageThrottleStage());
        this._fullPipeline.addStage(new ResourceMessageDispatchStage());
        try {
            this._fullPipeline.handle(this.event);
        } catch (Exception e) {
            e.printStackTrace();
        }
        this._instances = this._dataCache.getAllInstances();
        this._liveInstanceMap = this._dataCache.getLiveInstances();
        this._initialStateMap = (ResourcesStateMap) this.event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
        this._initialMaster = getTopStateInstance(this._initialStateMap.getInstanceStateMap(this._db, this._partition), MasterSlaveSMD.States.MASTER.name());
        Assert.assertNotNull(this._initialMaster);
    }

    @Override // org.apache.helix.controller.stages.BaseStageTest
    @BeforeMethod
    public void beforeTest(Method method, ITestContext iTestContext) {
        System.out.println("START " + method.getName() + " at " + new Date(System.currentTimeMillis()));
        iTestContext.setAttribute("StartTime", Long.valueOf(System.currentTimeMillis()));
    }

    @Test
    public void testP2PSendAndTimeout() throws Exception {
        reset(this._initialStateMap);
        this.admin.enableInstance(this._clusterName, this._initialMaster, false);
        this._dataCache = (ResourceControllerDataProvider) this.event.getAttribute(AttributeName.ControllerDataProvider.name());
        this._dataCache.notifyDataChange(HelixConstants.ChangeType.INSTANCE_CONFIG);
        this._fullPipeline.handle(this.event);
        ResourcesStateMap resourcesStateMap = (ResourcesStateMap) this.event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
        List<Message> messages = getMessages(this._initialMaster);
        Assert.assertEquals(messages.size(), 1);
        Message message = messages.get(0);
        Assert.assertEquals(message.getTgtName(), this._initialMaster);
        Assert.assertEquals(message.getFromState(), MasterSlaveSMD.States.MASTER.name());
        Assert.assertEquals(message.getToState(), MasterSlaveSMD.States.SLAVE.name());
        Assert.assertEquals(message.getRelayMessages().entrySet().size(), 1);
        String topStateInstance = getTopStateInstance(resourcesStateMap.getInstanceStateMap(this._db, this._partition), MasterSlaveSMD.States.MASTER.name());
        Message relayMessage = message.getRelayMessage(topStateInstance);
        Assert.assertNotNull(relayMessage);
        Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
        Assert.assertEquals(relayMessage.getTgtName(), topStateInstance);
        Assert.assertEquals(relayMessage.getRelaySrcHost(), this._initialMaster);
        Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
        Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
        handleMessage(this._initialMaster, this._db);
        this._fullPipeline.handle(this.event);
        Assert.assertEquals(getMessages(topStateInstance).size(), 0);
        Thread.sleep(5000L);
        this._fullPipeline.handle(this.event);
        List<Message> messages2 = getMessages(topStateInstance);
        Assert.assertEquals(messages2.size(), 1);
        Assert.assertEquals(messages2.get(0).getTgtName(), topStateInstance);
        Assert.assertEquals(messages2.get(0).getFromState(), MasterSlaveSMD.States.SLAVE.name());
        Assert.assertEquals(messages2.get(0).getToState(), MasterSlaveSMD.States.MASTER.name());
    }

    @Test
    public void testP2PWithErrorState() throws Exception {
        reset(this._initialStateMap);
        this.admin.enableInstance(this._clusterName, this._initialMaster, false);
        this._dataCache.notifyDataChange(HelixConstants.ChangeType.INSTANCE_CONFIG);
        this._fullPipeline.handle(this.event);
        ResourcesStateMap resourcesStateMap = (ResourcesStateMap) this.event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
        List<Message> messages = getMessages(this._initialMaster);
        Assert.assertEquals(messages.size(), 1);
        Message message = messages.get(0);
        Assert.assertEquals(message.getRelayMessages().entrySet().size(), 1);
        String topStateInstance = getTopStateInstance(resourcesStateMap.getInstanceStateMap(this._db, this._partition), MasterSlaveSMD.States.MASTER.name());
        Message relayMessage = message.getRelayMessage(topStateInstance);
        Assert.assertNotNull(relayMessage);
        Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
        Assert.assertEquals(relayMessage.getTgtName(), topStateInstance);
        PropertyKey currentState = new PropertyKey.Builder(this._clusterName).currentState(this._initialMaster, ((LiveInstance) this._dataCache.getLiveInstances().get(this._initialMaster)).getEphemeralOwner(), this._db);
        CurrentState property = this.accessor.getProperty(currentState);
        property.setPreviousState(this._partition.getPartitionName(), MasterSlaveSMD.States.MASTER.name());
        property.setState(this._partition.getPartitionName(), HelixDefinedState.ERROR.name());
        property.setEndTime(this._partition.getPartitionName(), System.currentTimeMillis());
        this.accessor.setProperty(currentState, property);
        this.accessor.removeProperty(new PropertyKey.Builder(this._clusterName).message(this._initialMaster, messages.get(0).getMsgId()));
        this._fullPipeline.handle(this.event);
        List<Message> messages2 = getMessages(topStateInstance);
        Assert.assertEquals(messages2.size(), 1);
        Assert.assertEquals(messages2.get(0).getTgtName(), topStateInstance);
        Assert.assertEquals(messages2.get(0).getFromState(), MasterSlaveSMD.States.SLAVE.name());
        Assert.assertEquals(messages2.get(0).getToState(), MasterSlaveSMD.States.MASTER.name());
    }

    @Test
    public void testP2PWithInstanceOffline() throws Exception {
        reset(this._initialStateMap);
        this.admin.enableInstance(this._clusterName, this._initialMaster, false);
        this._dataCache.notifyDataChange(HelixConstants.ChangeType.INSTANCE_CONFIG);
        this._fullPipeline.handle(this.event);
        ResourcesStateMap resourcesStateMap = (ResourcesStateMap) this.event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
        List<Message> messages = getMessages(this._initialMaster);
        Assert.assertEquals(messages.size(), 1);
        Message message = messages.get(0);
        Assert.assertEquals(message.getRelayMessages().entrySet().size(), 1);
        String topStateInstance = getTopStateInstance(resourcesStateMap.getInstanceStateMap(this._db, this._partition), MasterSlaveSMD.States.MASTER.name());
        Message relayMessage = message.getRelayMessage(topStateInstance);
        Assert.assertNotNull(relayMessage);
        Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
        Assert.assertEquals(relayMessage.getTgtName(), topStateInstance);
        handleMessage(this._initialMaster, this._db);
        this._fullPipeline.handle(this.event);
        Assert.assertEquals(getMessages(topStateInstance).size(), 0);
        this.accessor.removeProperty(new PropertyKey.Builder(this._clusterName).liveInstance(topStateInstance));
        this._dataCache.requireFullRefresh();
        this._fullPipeline.handle(this.event);
        String topStateInstance2 = getTopStateInstance(((ResourcesStateMap) this.event.getAttribute(AttributeName.INTERMEDIATE_STATE.name())).getInstanceStateMap(this._db, this._partition), MasterSlaveSMD.States.MASTER.name());
        Assert.assertTrue(topStateInstance != topStateInstance2);
        List<Message> messages2 = getMessages(topStateInstance2);
        Assert.assertEquals(messages2.size(), 1);
        Assert.assertEquals(messages2.get(0).getTgtName(), topStateInstance2);
        Assert.assertEquals(messages2.get(0).getFromState(), MasterSlaveSMD.States.SLAVE.name());
        Assert.assertEquals(messages2.get(0).getToState(), MasterSlaveSMD.States.MASTER.name());
    }

    private void handleMessage(String str, String str2) {
        List<Message> childValues = this.accessor.getChildValues(new PropertyKey.Builder(this._clusterName).messages(str), true);
        String ephemeralOwner = ((LiveInstance) this._dataCache.getLiveInstances().get(str)).getEphemeralOwner();
        for (Message message : childValues) {
            if (message.getResourceName().equals(str2)) {
                PropertyKey currentState = new PropertyKey.Builder(this._clusterName).currentState(str, ephemeralOwner, str2);
                CurrentState property = this.accessor.getProperty(currentState);
                if (property == null) {
                    property = new CurrentState(str2);
                    property.setSessionId(ephemeralOwner);
                    property.setStateModelDefRef(BuiltInStateModelDefinitions.MasterSlave.name());
                }
                String partitionName = message.getPartitionName();
                String fromState = message.getFromState();
                String toState = message.getToState();
                String state = property.getState(partitionName);
                if ((state == null && fromState.equals(BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition().getInitialState())) || state.equals(fromState)) {
                    property.setPreviousState(partitionName, fromState);
                    property.setState(partitionName, toState);
                    property.setStartTime(partitionName, System.currentTimeMillis());
                    try {
                        Thread.sleep(50L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    property.setEndTime(partitionName, System.currentTimeMillis());
                    this.accessor.setProperty(currentState, property);
                    this.accessor.removeProperty(new PropertyKey.Builder(this._clusterName).message(str, message.getMsgId()));
                }
            }
        }
    }

    private void reset(ResourcesStateMap resourcesStateMap) {
        Iterator<String> it = this._liveInstanceMap.keySet().iterator();
        while (it.hasNext()) {
            LiveInstance liveInstance = this._liveInstanceMap.get(it.next());
            this.accessor.setProperty(this.accessor.keyBuilder().liveInstance(liveInstance.getId()), liveInstance);
        }
        for (String str : this._instances) {
            this.admin.enableInstance(this._clusterName, this._initialMaster, true);
            cleanMessages(str);
        }
        for (String str2 : resourcesStateMap.resourceSet()) {
            setCurrentState(str2, resourcesStateMap.getPartitionStateMap(str2).getStateMap());
        }
        Iterator<String> it2 = this._instances.iterator();
        while (it2.hasNext()) {
            cleanMessages(it2.next());
        }
        this._dataCache.requireFullRefresh();
    }

    private void setCurrentState(String str, Map<Partition, Map<String, String>> map) {
        for (Partition partition : map.keySet()) {
            Map<String, String> map2 = map.get(partition);
            for (String str2 : map2.keySet()) {
                String str3 = map2.get(str2);
                String ephemeralOwner = this._liveInstanceMap.get(str2).getEphemeralOwner();
                PropertyKey currentState = new PropertyKey.Builder(this._clusterName).currentState(str2, ephemeralOwner, str);
                CurrentState property = this.accessor.getProperty(currentState);
                if (property == null) {
                    property = new CurrentState(str);
                    property.setSessionId(ephemeralOwner);
                    property.setStateModelDefRef(BuiltInStateModelDefinitions.MasterSlave.name());
                }
                property.setState(partition.getPartitionName(), str3);
                this.accessor.setProperty(currentState, property);
            }
        }
    }

    private void cleanMessages(String str) {
        Iterator it = this.accessor.getChildValues(new PropertyKey.Builder(this._clusterName).messages(str), true).iterator();
        while (it.hasNext()) {
            this.accessor.removeProperty(new PropertyKey.Builder(this._clusterName).message(str, ((Message) it.next()).getMsgId()));
        }
    }

    List<Message> getMessages(String str) {
        return this.accessor.getChildValues(new PropertyKey.Builder(this._clusterName).messages(str), true);
    }

    private String getTopStateInstance(Map<String, String> map, String str) {
        String str2 = null;
        for (Map.Entry<String, String> entry : map.entrySet()) {
            if (str.equals(entry.getValue())) {
                str2 = entry.getKey();
            }
        }
        return str2;
    }
}
