package org.apache.helix.controller.stages;

import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/controller/stages/TestMessageThrottleStage.class */
public class TestMessageThrottleStage extends ZkUnitTestBase {
    private static final Logger LOG = Logger.getLogger(TestMessageThrottleStage.class.getName());
    final String _className = getShortClassName();

    @Test
    public void testMsgThrottleBasic() throws Exception {
        String str = "CLUSTER_" + this._className + "_basic";
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        DummyClusterManager dummyClusterManager = new DummyClusterManager(str, new ZKHelixDataAccessor(str, new ZkBaseDataAccessor(_gZkClient)));
        setupIdealState(str, new int[]{0, 1}, new String[]{WorkflowGenerator.DEFAULT_TGT_DB}, 1, 2);
        setupLiveInstances(str, new int[]{0, 1});
        setupStateModel(str);
        ClusterEvent clusterEvent = new ClusterEvent("testEvent");
        clusterEvent.addAttribute("helixmanager", dummyClusterManager);
        MessageThrottleStage messageThrottleStage = new MessageThrottleStage();
        try {
            runStage(clusterEvent, messageThrottleStage);
            Assert.fail("Should throw exception since DATA_CACHE is null");
        } catch (Exception e) {
        }
        Pipeline pipeline = new Pipeline();
        pipeline.addStage(new ReadClusterDataStage());
        runPipeline(clusterEvent, pipeline);
        try {
            runStage(clusterEvent, messageThrottleStage);
            Assert.fail("Should throw exception since RESOURCE is null");
        } catch (Exception e2) {
        }
        runStage(clusterEvent, new ResourceComputationStage());
        try {
            runStage(clusterEvent, messageThrottleStage);
            Assert.fail("Should throw exception since MESSAGE_SELECT is null");
        } catch (Exception e3) {
        }
        MessageSelectionStageOutput messageSelectionStageOutput = new MessageSelectionStageOutput();
        ArrayList arrayList = new ArrayList();
        arrayList.add(createMessage(Message.MessageType.STATE_TRANSITION, "msgId-001", "OFFLINE", "SLAVE", WorkflowGenerator.DEFAULT_TGT_DB, "localhost_0"));
        messageSelectionStageOutput.addMessages(WorkflowGenerator.DEFAULT_TGT_DB, new Partition("TestDB_0"), arrayList);
        clusterEvent.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), messageSelectionStageOutput);
        runStage(clusterEvent, messageThrottleStage);
        Assert.assertEquals(((MessageThrottleStageOutput) clusterEvent.getAttribute(AttributeName.MESSAGES_THROTTLE.toString())).getMessages(WorkflowGenerator.DEFAULT_TGT_DB, new Partition("TestDB_0")).size(), 1);
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }

    @Test
    public void testMsgThrottleConstraints() throws Exception {
        String str = "CLUSTER_" + this._className + "_constraints";
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        ZKHelixDataAccessor zKHelixDataAccessor = new ZKHelixDataAccessor(str, new ZkBaseDataAccessor(_gZkClient));
        DummyClusterManager dummyClusterManager = new DummyClusterManager(str, zKHelixDataAccessor);
        setupIdealState(str, new int[]{0, 1}, new String[]{WorkflowGenerator.DEFAULT_TGT_DB}, 1, 2);
        setupLiveInstances(str, new int[]{0, 1});
        setupStateModel(str);
        ZNRecord zNRecord = new ZNRecord(ClusterConstraints.ConstraintType.MESSAGE_CONSTRAINT.toString());
        zNRecord.setMapField("constraint0", new TreeMap());
        zNRecord.getMapField("constraint0").put("MESSAGE_TYPE", "STATE_TRANSITION");
        zNRecord.getMapField("constraint0").put("CONSTRAINT_VALUE", "ANY");
        ConstraintItem constraintItem = new ConstraintItem(zNRecord.getMapField("constraint0"));
        zNRecord.setMapField("constraint1", new TreeMap());
        zNRecord.getMapField("constraint1").put("MESSAGE_TYPE", "STATE_TRANSITION");
        zNRecord.getMapField("constraint1").put("TRANSITION", "OFFLINE-SLAVE");
        zNRecord.getMapField("constraint1").put("CONSTRAINT_VALUE", "50");
        ConstraintItem constraintItem2 = new ConstraintItem(zNRecord.getMapField("constraint1"));
        zNRecord.setMapField("constraint2", new TreeMap());
        zNRecord.getMapField("constraint2").put("MESSAGE_TYPE", "STATE_TRANSITION");
        zNRecord.getMapField("constraint2").put("TRANSITION", "OFFLINE-SLAVE");
        zNRecord.getMapField("constraint2").put("INSTANCE", ".*");
        zNRecord.getMapField("constraint2").put("RESOURCE", WorkflowGenerator.DEFAULT_TGT_DB);
        zNRecord.getMapField("constraint2").put("CONSTRAINT_VALUE", "2");
        ConstraintItem constraintItem3 = new ConstraintItem(zNRecord.getMapField("constraint2"));
        zNRecord.setMapField("constraint3", new TreeMap());
        zNRecord.getMapField("constraint3").put("MESSAGE_TYPE", "STATE_TRANSITION");
        zNRecord.getMapField("constraint3").put("TRANSITION", "OFFLINE-SLAVE");
        zNRecord.getMapField("constraint3").put("INSTANCE", "localhost_1");
        zNRecord.getMapField("constraint3").put("RESOURCE", ".*");
        zNRecord.getMapField("constraint3").put("CONSTRAINT_VALUE", "1");
        ConstraintItem constraintItem4 = new ConstraintItem(zNRecord.getMapField("constraint3"));
        zNRecord.setMapField("constraint4", new TreeMap());
        zNRecord.getMapField("constraint4").put("MESSAGE_TYPE", "STATE_TRANSITION");
        zNRecord.getMapField("constraint4").put("TRANSITION", "OFFLINE-SLAVE");
        zNRecord.getMapField("constraint4").put("INSTANCE", ".*");
        zNRecord.getMapField("constraint4").put("RESOURCE", ".*");
        zNRecord.getMapField("constraint4").put("CONSTRAINT_VALUE", "10");
        ConstraintItem constraintItem5 = new ConstraintItem(zNRecord.getMapField("constraint4"));
        zNRecord.setMapField("constraint5", new TreeMap());
        zNRecord.getMapField("constraint5").put("MESSAGE_TYPE", "STATE_TRANSITION");
        zNRecord.getMapField("constraint5").put("TRANSITION", "OFFLINE-SLAVE");
        zNRecord.getMapField("constraint5").put("INSTANCE", "localhost_0");
        zNRecord.getMapField("constraint5").put("RESOURCE", WorkflowGenerator.DEFAULT_TGT_DB);
        zNRecord.getMapField("constraint5").put("CONSTRAINT_VALUE", "3");
        ConstraintItem constraintItem6 = new ConstraintItem(zNRecord.getMapField("constraint5"));
        PropertyKey.Builder keyBuilder = zKHelixDataAccessor.keyBuilder();
        zKHelixDataAccessor.setProperty(keyBuilder.constraint(ClusterConstraints.ConstraintType.MESSAGE_CONSTRAINT.toString()), new ClusterConstraints(zNRecord));
        ClusterConstraints property = zKHelixDataAccessor.getProperty(keyBuilder.constraint(ClusterConstraints.ConstraintType.MESSAGE_CONSTRAINT.toString()));
        MessageThrottleStage messageThrottleStage = new MessageThrottleStage();
        Message createMessage = createMessage(Message.MessageType.STATE_TRANSITION, "msgId-001", "OFFLINE", "SLAVE", WorkflowGenerator.DEFAULT_TGT_DB, "localhost_0");
        Map constraintAttributes = ClusterConstraints.toConstraintAttributes(createMessage);
        Set<ConstraintItem> match = property.match(constraintAttributes);
        System.out.println(createMessage + " matches(" + match.size() + "): " + match);
        Assert.assertEquals(match.size(), 5);
        Assert.assertTrue(containsConstraint(match, constraintItem));
        Assert.assertTrue(containsConstraint(match, constraintItem2));
        Assert.assertTrue(containsConstraint(match, constraintItem3));
        Assert.assertTrue(containsConstraint(match, constraintItem5));
        Assert.assertTrue(containsConstraint(match, constraintItem6));
        Set<ConstraintItem> selectConstraints = messageThrottleStage.selectConstraints(match, constraintAttributes);
        System.out.println(createMessage + " matches(" + selectConstraints.size() + "): " + selectConstraints);
        Assert.assertEquals(selectConstraints.size(), 2);
        Assert.assertTrue(containsConstraint(selectConstraints, constraintItem2));
        Assert.assertTrue(containsConstraint(selectConstraints, constraintItem6));
        Message createMessage2 = createMessage(Message.MessageType.STATE_TRANSITION, "msgId-002", "OFFLINE", "SLAVE", WorkflowGenerator.DEFAULT_TGT_DB, "localhost_1");
        Map constraintAttributes2 = ClusterConstraints.toConstraintAttributes(createMessage2);
        Set<ConstraintItem> match2 = property.match(constraintAttributes2);
        System.out.println(createMessage2 + " matches(" + match2.size() + "): " + match2);
        Assert.assertEquals(match2.size(), 5);
        Assert.assertTrue(containsConstraint(match2, constraintItem));
        Assert.assertTrue(containsConstraint(match2, constraintItem2));
        Assert.assertTrue(containsConstraint(match2, constraintItem3));
        Assert.assertTrue(containsConstraint(match2, constraintItem4));
        Assert.assertTrue(containsConstraint(match2, constraintItem5));
        Set<ConstraintItem> selectConstraints2 = messageThrottleStage.selectConstraints(match2, constraintAttributes2);
        System.out.println(createMessage2 + " matches(" + selectConstraints2.size() + "): " + selectConstraints2);
        Assert.assertEquals(selectConstraints2.size(), 2);
        Assert.assertTrue(containsConstraint(selectConstraints2, constraintItem2));
        Assert.assertTrue(containsConstraint(selectConstraints2, constraintItem4));
        ClusterEvent clusterEvent = new ClusterEvent("testEvent");
        clusterEvent.addAttribute("helixmanager", dummyClusterManager);
        Pipeline pipeline = new Pipeline();
        pipeline.addStage(new ReadClusterDataStage());
        runPipeline(clusterEvent, pipeline);
        runStage(clusterEvent, new ResourceComputationStage());
        MessageSelectionStageOutput messageSelectionStageOutput = new MessageSelectionStageOutput();
        Message createMessage3 = createMessage(Message.MessageType.STATE_TRANSITION, "msgId-003", "OFFLINE", "SLAVE", WorkflowGenerator.DEFAULT_TGT_DB, "localhost_0");
        Message createMessage4 = createMessage(Message.MessageType.STATE_TRANSITION, "msgId-004", "OFFLINE", "SLAVE", WorkflowGenerator.DEFAULT_TGT_DB, "localhost_0");
        Message createMessage5 = createMessage(Message.MessageType.STATE_TRANSITION, "msgId-005", "OFFLINE", "SLAVE", WorkflowGenerator.DEFAULT_TGT_DB, "localhost_0");
        Message createMessage6 = createMessage(Message.MessageType.STATE_TRANSITION, "msgId-006", "OFFLINE", "SLAVE", WorkflowGenerator.DEFAULT_TGT_DB, "localhost_1");
        ArrayList arrayList = new ArrayList();
        arrayList.add(createMessage);
        arrayList.add(createMessage2);
        arrayList.add(createMessage3);
        arrayList.add(createMessage4);
        arrayList.add(createMessage5);
        arrayList.add(createMessage6);
        messageSelectionStageOutput.addMessages(WorkflowGenerator.DEFAULT_TGT_DB, new Partition("TestDB_0"), arrayList);
        clusterEvent.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), messageSelectionStageOutput);
        runStage(clusterEvent, messageThrottleStage);
        List messages = ((MessageThrottleStageOutput) clusterEvent.getAttribute(AttributeName.MESSAGES_THROTTLE.toString())).getMessages(WorkflowGenerator.DEFAULT_TGT_DB, new Partition("TestDB_0"));
        Assert.assertEquals(messages.size(), 4);
        Assert.assertTrue(messages.contains(createMessage));
        Assert.assertTrue(messages.contains(createMessage2));
        Assert.assertTrue(messages.contains(createMessage3));
        Assert.assertTrue(messages.contains(createMessage4));
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }

    private boolean containsConstraint(Set<ConstraintItem> set, ConstraintItem constraintItem) {
        Iterator<ConstraintItem> it = set.iterator();
        while (it.hasNext()) {
            if (it.next().toString().equals(constraintItem.toString())) {
                return true;
            }
        }
        return false;
    }
}
