package org.apache.helix.integration;

import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import org.apache.helix.Criteria;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.Message;
import org.apache.helix.model.StatusUpdate;
import org.apache.helix.monitoring.ZKPathDataDumpTask;
import org.apache.helix.util.HelixUtil;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/integration/TestSchedulerMessage.class */
public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
    TestMessagingHandlerFactory _factory = new TestMessagingHandlerFactory();

    /* loaded from: input_file:org/apache/helix/integration/TestSchedulerMessage$MockAsyncCallback.class */
    class MockAsyncCallback extends AsyncCallback {
        Message _message;

        public MockAsyncCallback() {
        }

        public void onTimeOut() {
        }

        public void onReplyMessage(Message message) {
            this._message = message;
        }
    }

    /* loaded from: input_file:org/apache/helix/integration/TestSchedulerMessage$TestMessagingHandlerFactory.class */
    public static class TestMessagingHandlerFactory implements MessageHandlerFactory {
        public Map<String, Set<String>> _results = new ConcurrentHashMap();

        /* loaded from: input_file:org/apache/helix/integration/TestSchedulerMessage$TestMessagingHandlerFactory$TestMessagingHandler.class */
        public class TestMessagingHandler extends MessageHandler {
            public TestMessagingHandler(Message message, NotificationContext notificationContext) {
                super(message, notificationContext);
            }

            public HelixTaskResult handleMessage() throws InterruptedException {
                HelixTaskResult helixTaskResult = new HelixTaskResult();
                helixTaskResult.setSuccess(true);
                this._message.getTgtName();
                helixTaskResult.getTaskResultMap().put("Message", this._message.getMsgId());
                synchronized (TestMessagingHandlerFactory.this._results) {
                    if (!TestMessagingHandlerFactory.this._results.containsKey(this._message.getPartitionName())) {
                        TestMessagingHandlerFactory.this._results.put(this._message.getPartitionName(), new ConcurrentSkipListSet());
                    }
                }
                TestMessagingHandlerFactory.this._results.get(this._message.getPartitionName()).add(this._message.getMsgId());
                return helixTaskResult;
            }

            public void onError(Exception exc, MessageHandler.ErrorCode errorCode, MessageHandler.ErrorType errorType) {
            }
        }

        public MessageHandler createHandler(Message message, NotificationContext notificationContext) {
            return new TestMessagingHandler(message, notificationContext);
        }

        public String getMessageType() {
            return "TestParticipant";
        }

        public void reset() {
        }
    }

    /* loaded from: input_file:org/apache/helix/integration/TestSchedulerMessage$TestMessagingHandlerFactoryLatch.class */
    public static class TestMessagingHandlerFactoryLatch implements MessageHandlerFactory {
        public volatile CountDownLatch _latch = new CountDownLatch(1);
        public int _messageCount = 0;
        public Map<String, Set<String>> _results = new ConcurrentHashMap();

        /* loaded from: input_file:org/apache/helix/integration/TestSchedulerMessage$TestMessagingHandlerFactoryLatch$TestMessagingHandlerLatch.class */
        public class TestMessagingHandlerLatch extends MessageHandler {
            public TestMessagingHandlerLatch(Message message, NotificationContext notificationContext) {
                super(message, notificationContext);
            }

            public HelixTaskResult handleMessage() throws InterruptedException {
                TestMessagingHandlerFactoryLatch.this._latch.await();
                HelixTaskResult helixTaskResult = new HelixTaskResult();
                helixTaskResult.setSuccess(true);
                helixTaskResult.getTaskResultMap().put("Message", this._message.getMsgId());
                String tgtName = this._message.getTgtName();
                synchronized (TestMessagingHandlerFactoryLatch.this._results) {
                    if (!TestMessagingHandlerFactoryLatch.this._results.containsKey(this._message.getPartitionName())) {
                        TestMessagingHandlerFactoryLatch.this._results.put(this._message.getPartitionName(), new ConcurrentSkipListSet());
                    }
                }
                TestMessagingHandlerFactoryLatch.this._results.get(this._message.getPartitionName()).add(tgtName);
                return helixTaskResult;
            }

            public void onError(Exception exc, MessageHandler.ErrorCode errorCode, MessageHandler.ErrorType errorType) {
            }
        }

        public synchronized MessageHandler createHandler(Message message, NotificationContext notificationContext) {
            this._messageCount++;
            return new TestMessagingHandlerLatch(message, notificationContext);
        }

        public synchronized void signal() {
            this._latch.countDown();
            this._latch = new CountDownLatch(1);
        }

        public String getMessageType() {
            return "TestMessagingHandlerLatch";
        }

        public void reset() {
        }
    }

    @Test
    public void TestSchedulerMsgUsingQueue() throws Exception {
        Logger.getRootLogger().setLevel(Level.INFO);
        this._factory._results.clear();
        Thread.sleep(2000L);
        MockParticipantManager mockParticipantManager = null;
        for (int i = 0; i < 5; i++) {
            this._participants[i].getMessagingService().registerMessageHandlerFactory(this._factory.getMessageType(), this._factory);
            mockParticipantManager = this._participants[i];
        }
        Message message = new Message(Message.MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
        message.setTgtSessionId("*");
        message.setTgtName("CONTROLLER");
        message.setSrcName("CONTROLLER");
        message.getRecord().setSimpleField("SchedulerTaskQueue", "TestSchedulerMsg");
        Message message2 = new Message(this._factory.getMessageType(), "Template");
        message2.setTgtSessionId("*");
        message2.setMsgState(Message.MessageState.NEW);
        Criteria criteria = new Criteria();
        criteria.setInstanceName("localhost_%");
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setSessionSpecific(false);
        criteria.setResource("%");
        criteria.setPartition("%");
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.getSerializationConfig().set(SerializationConfig.Feature.INDENT_OUTPUT, true);
        StringWriter stringWriter = new StringWriter();
        objectMapper.writeValue(stringWriter, criteria);
        message.getRecord().setSimpleField("Criteria", stringWriter.toString());
        message.getRecord().setMapField("MessageTemplate", message2.getRecord().getSimpleFields());
        message.getRecord().setSimpleField("TIMEOUT", "-1");
        HelixDataAccessor helixDataAccessor = mockParticipantManager.getHelixDataAccessor();
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        helixDataAccessor.createProperty(keyBuilder.controllerMessage(message.getMsgId()), message);
        for (int i2 = 0; i2 < 30; i2++) {
            Thread.sleep(2000L);
            if (20 == this._factory._results.size()) {
                break;
            }
        }
        Assert.assertEquals(20, this._factory._results.size());
        PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), message.getMsgId());
        int i3 = 0;
        for (int i4 = 0; i4 < 10; i4++) {
            ZNRecord record = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
            Assert.assertTrue(((String) record.getMapField("SentMessageCount").get("MessageCount")).equals("60"));
            Iterator it = record.getMapFields().keySet().iterator();
            while (it.hasNext()) {
                if (((String) it.next()).startsWith("MessageResult ")) {
                    i3++;
                }
            }
            if (i3 == 60) {
                break;
            }
            Thread.sleep(2000L);
        }
        Assert.assertEquals(i3, 60);
        int i5 = 0;
        Iterator<Set<String>> it2 = this._factory._results.values().iterator();
        while (it2.hasNext()) {
            i5 += it2.next().size();
        }
        Assert.assertEquals(i5, 60);
    }

    @Test
    public void TestSchedulerMsg() throws Exception {
        Logger.getRootLogger().setLevel(Level.INFO);
        this._factory._results.clear();
        MockParticipantManager mockParticipantManager = null;
        for (int i = 0; i < 5; i++) {
            this._participants[i].getMessagingService().registerMessageHandlerFactory(this._factory.getMessageType(), this._factory);
            mockParticipantManager = this._participants[i];
        }
        Message message = new Message(Message.MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
        message.setTgtSessionId("*");
        message.setTgtName("CONTROLLER");
        message.setSrcName("CONTROLLER");
        Message message2 = new Message(this._factory.getMessageType(), "Template");
        message2.setTgtSessionId("*");
        message2.setMsgState(Message.MessageState.NEW);
        Criteria criteria = new Criteria();
        criteria.setInstanceName("localhost_%");
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setSessionSpecific(false);
        criteria.setResource("%");
        criteria.setPartition("%");
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.getSerializationConfig().set(SerializationConfig.Feature.INDENT_OUTPUT, true);
        StringWriter stringWriter = new StringWriter();
        objectMapper.writeValue(stringWriter, criteria);
        message.getRecord().setSimpleField("Criteria", stringWriter.toString());
        message.getRecord().setMapField("MessageTemplate", message2.getRecord().getSimpleFields());
        message.getRecord().setSimpleField("TIMEOUT", "-1");
        HelixDataAccessor helixDataAccessor = mockParticipantManager.getHelixDataAccessor();
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        helixDataAccessor.createProperty(keyBuilder.controllerMessage(message.getMsgId()), message);
        for (int i2 = 0; i2 < 30; i2++) {
            Thread.sleep(2000L);
            if (20 == this._factory._results.size()) {
                break;
            }
        }
        Assert.assertEquals(20, this._factory._results.size());
        PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), message.getMsgId());
        int i3 = 0;
        for (int i4 = 0; i4 < 10; i4++) {
            Thread.sleep(1000L);
            ZNRecord record = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
            Assert.assertTrue(((String) record.getMapField("SentMessageCount").get("MessageCount")).equals("60"));
            for (String str : record.getMapFields().keySet()) {
                if (str.startsWith("MessageResult ")) {
                    i3++;
                    Assert.assertTrue(record.getMapField(str).size() > 1);
                }
            }
            if (i3 == 60) {
                break;
            }
            Thread.sleep(2000L);
        }
        Assert.assertEquals(i3, 60);
        int i5 = 0;
        Iterator<Set<String>> it = this._factory._results.values().iterator();
        while (it.hasNext()) {
            i5 += it.next().size();
        }
        Assert.assertEquals(i5, 60);
        String controllerPropertyPath = HelixUtil.getControllerPropertyPath(mockParticipantManager.getClusterName(), PropertyType.STATUSUPDATES_CONTROLLER);
        List children = _gZkClient.getChildren(controllerPropertyPath);
        Assert.assertTrue(children.size() > 0);
        Iterator it2 = children.iterator();
        while (it2.hasNext()) {
            Assert.assertTrue(_gZkClient.getChildren(new StringBuilder().append(controllerPropertyPath).append("/").append((String) it2.next()).toString()).size() > 0);
        }
        String instancePropertyPath = HelixUtil.getInstancePropertyPath(mockParticipantManager.getClusterName(), "localhost_12918", PropertyType.STATUSUPDATES);
        List children2 = _gZkClient.getChildren(instancePropertyPath);
        Assert.assertTrue(children2.size() > 0);
        Iterator it3 = children2.iterator();
        while (it3.hasNext()) {
            String str2 = instancePropertyPath + "/" + ((String) it3.next());
            List children3 = _gZkClient.getChildren(str2);
            Assert.assertTrue(children3.size() > 0);
            Iterator it4 = children3.iterator();
            while (it4.hasNext()) {
                Assert.assertTrue(_gZkClient.getChildren(new StringBuilder().append(str2).append("/").append((String) it4.next()).toString()).size() > 0);
            }
        }
        Thread.sleep(3000L);
        new ZKPathDataDumpTask(mockParticipantManager, _gZkClient, 0).run();
        List children4 = _gZkClient.getChildren(controllerPropertyPath);
        Assert.assertTrue(children4.size() > 0);
        Iterator it5 = children4.iterator();
        while (it5.hasNext()) {
            Assert.assertTrue(_gZkClient.getChildren(new StringBuilder().append(controllerPropertyPath).append("/").append((String) it5.next()).toString()).size() == 0);
        }
        List children5 = _gZkClient.getChildren(instancePropertyPath);
        Assert.assertTrue(children5.size() > 0);
        Iterator it6 = children5.iterator();
        while (it6.hasNext()) {
            String str3 = instancePropertyPath + "/" + ((String) it6.next());
            List children6 = _gZkClient.getChildren(str3);
            Assert.assertTrue(children6.size() > 0);
            Iterator it7 = children6.iterator();
            while (it7.hasNext()) {
                Assert.assertTrue(_gZkClient.getChildren(new StringBuilder().append(str3).append("/").append((String) it7.next()).toString()).size() == 0);
            }
        }
    }

    @Test
    public void TestSchedulerMsg2() throws Exception {
        this._factory._results.clear();
        Thread.sleep(2000L);
        MockParticipantManager mockParticipantManager = null;
        for (int i = 0; i < 5; i++) {
            this._participants[i].getMessagingService().registerMessageHandlerFactory(this._factory.getMessageType(), this._factory);
            mockParticipantManager = this._participants[i];
        }
        Message message = new Message(Message.MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
        message.setTgtSessionId("*");
        message.setTgtName("CONTROLLER");
        message.setSrcName("CONTROLLER");
        Message message2 = new Message(this._factory.getMessageType(), "Template");
        message2.setTgtSessionId("*");
        message2.setMsgState(Message.MessageState.NEW);
        Criteria criteria = new Criteria();
        criteria.setInstanceName("localhost_%");
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setSessionSpecific(false);
        criteria.setResource("%");
        criteria.setPartition("%");
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.getSerializationConfig().set(SerializationConfig.Feature.INDENT_OUTPUT, true);
        StringWriter stringWriter = new StringWriter();
        objectMapper.writeValue(stringWriter, criteria);
        message.getRecord().setSimpleField("Criteria", stringWriter.toString());
        message.getRecord().setMapField("MessageTemplate", message2.getRecord().getSimpleFields());
        message.getRecord().setSimpleField("TIMEOUT", "-1");
        message.getRecord().setSimpleField("WAIT_ALL", "true");
        Criteria criteria2 = new Criteria();
        criteria2.setRecipientInstanceType(InstanceType.CONTROLLER);
        criteria2.setInstanceName("*");
        criteria2.setSessionSpecific(false);
        message.getRecord().setSimpleField("SchedulerTaskQueue", "TestSchedulerMsg2");
        MockAsyncCallback mockAsyncCallback = new MockAsyncCallback();
        mockParticipantManager.getMessagingService().sendAndWait(criteria2, message, mockAsyncCallback, -1);
        String str = (String) mockAsyncCallback._message.getResultMap().get("SchedulerMessageId");
        HelixDataAccessor helixDataAccessor = mockParticipantManager.getHelixDataAccessor();
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        for (int i2 = 0; i2 < 10; i2++) {
            Thread.sleep(200L);
            if (helixDataAccessor.getProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), str)).getRecord().getMapFields().containsKey("Summary")) {
                break;
            }
        }
        Assert.assertEquals(20, this._factory._results.size());
        ZNRecord record = helixDataAccessor.getProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), str)).getRecord();
        Assert.assertTrue(((String) record.getMapField("SentMessageCount").get("MessageCount")).equals("60"));
        int i3 = 0;
        Iterator it = record.getMapFields().keySet().iterator();
        while (it.hasNext()) {
            if (((String) it.next()).startsWith("MessageResult ")) {
                i3++;
            }
        }
        Assert.assertEquals(i3, 60);
        int i4 = 0;
        Iterator<Set<String>> it2 = this._factory._results.values().iterator();
        while (it2.hasNext()) {
            i4 += it2.next().size();
        }
        Assert.assertEquals(i4, 60);
    }

    @Test
    public void TestSchedulerZeroMsg() throws Exception {
        TestMessagingHandlerFactory testMessagingHandlerFactory = new TestMessagingHandlerFactory();
        MockParticipantManager mockParticipantManager = null;
        for (int i = 0; i < 5; i++) {
            this._participants[i].getMessagingService().registerMessageHandlerFactory(testMessagingHandlerFactory.getMessageType(), testMessagingHandlerFactory);
            mockParticipantManager = this._participants[i];
        }
        Message message = new Message(Message.MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
        message.setTgtSessionId("*");
        message.setTgtName("CONTROLLER");
        message.setSrcName("CONTROLLER");
        Message message2 = new Message(testMessagingHandlerFactory.getMessageType(), "Template");
        message2.setTgtSessionId("*");
        message2.setMsgState(Message.MessageState.NEW);
        Criteria criteria = new Criteria();
        criteria.setInstanceName("localhost_DOESNOTEXIST");
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setSessionSpecific(false);
        criteria.setResource("%");
        criteria.setPartition("%");
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.getSerializationConfig().set(SerializationConfig.Feature.INDENT_OUTPUT, true);
        StringWriter stringWriter = new StringWriter();
        objectMapper.writeValue(stringWriter, criteria);
        message.getRecord().setSimpleField("Criteria", stringWriter.toString());
        message.getRecord().setMapField("MessageTemplate", message2.getRecord().getSimpleFields());
        message.getRecord().setSimpleField("TIMEOUT", "-1");
        HelixDataAccessor helixDataAccessor = mockParticipantManager.getHelixDataAccessor();
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        helixDataAccessor.setProperty(keyBuilder.controllerMessage(message.getMsgId()), message);
        Thread.sleep(3000L);
        Assert.assertEquals(0, testMessagingHandlerFactory._results.size());
        PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), message.getMsgId());
        for (int i2 = 0; i2 < 10; i2++) {
            StatusUpdate property = helixDataAccessor.getProperty(controllerTaskStatus);
            if (property == null || property.getRecord().getMapField("SentMessageCount") == null) {
                Thread.sleep(1000L);
            }
        }
        Assert.assertTrue(((String) helixDataAccessor.getProperty(controllerTaskStatus).getRecord().getMapField("SentMessageCount").get("MessageCount")).equals("0"));
        int i3 = 0;
        Iterator<Set<String>> it = testMessagingHandlerFactory._results.values().iterator();
        while (it.hasNext()) {
            i3 += it.next().size();
        }
        Assert.assertEquals(i3, 0);
    }

    @Test
    public void TestSchedulerMsg3() throws Exception {
        this._factory._results.clear();
        Thread.sleep(2000L);
        MockParticipantManager mockParticipantManager = null;
        for (int i = 0; i < 5; i++) {
            this._participants[i].getMessagingService().registerMessageHandlerFactory(this._factory.getMessageType(), this._factory);
            this._participants[i].getMessagingService().registerMessageHandlerFactory(this._factory.getMessageType(), this._factory);
            mockParticipantManager = this._participants[i];
        }
        Message message = new Message(Message.MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
        message.setTgtSessionId("*");
        message.setTgtName("CONTROLLER");
        message.setSrcName("CONTROLLER");
        Message message2 = new Message(this._factory.getMessageType(), "Template");
        message2.setTgtSessionId("*");
        message2.setMsgState(Message.MessageState.NEW);
        Criteria criteria = new Criteria();
        criteria.setInstanceName("localhost_%");
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setSessionSpecific(false);
        criteria.setResource("%");
        criteria.setPartition("%");
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.getSerializationConfig().set(SerializationConfig.Feature.INDENT_OUTPUT, true);
        StringWriter stringWriter = new StringWriter();
        objectMapper.writeValue(stringWriter, criteria);
        message.getRecord().setSimpleField("Criteria", stringWriter.toString());
        message.getRecord().setMapField("MessageTemplate", message2.getRecord().getSimpleFields());
        message.getRecord().setSimpleField("TIMEOUT", "-1");
        message.getRecord().setSimpleField("WAIT_ALL", "true");
        message.getRecord().setSimpleField("SchedulerTaskQueue", "TestSchedulerMsg3");
        Criteria criteria2 = new Criteria();
        criteria2.setRecipientInstanceType(InstanceType.CONTROLLER);
        criteria2.setInstanceName("*");
        criteria2.setSessionSpecific(false);
        new MockAsyncCallback();
        criteria.setInstanceName("localhost_%");
        ObjectMapper objectMapper2 = new ObjectMapper();
        objectMapper2.getSerializationConfig().set(SerializationConfig.Feature.INDENT_OUTPUT, true);
        StringWriter stringWriter2 = new StringWriter();
        objectMapper2.writeValue(stringWriter2, criteria);
        message.getRecord().setSimpleField("Criteria", stringWriter2.toString());
        for (int i2 = 0; i2 < 4; i2++) {
            MockAsyncCallback mockAsyncCallback = new MockAsyncCallback();
            criteria.setInstanceName("localhost_" + (12918 + i2));
            ObjectMapper objectMapper3 = new ObjectMapper();
            objectMapper3.getSerializationConfig().set(SerializationConfig.Feature.INDENT_OUTPUT, true);
            StringWriter stringWriter3 = new StringWriter();
            objectMapper3.writeValue(stringWriter3, criteria);
            message.setMsgId(UUID.randomUUID().toString());
            message.getRecord().setSimpleField("Criteria", stringWriter3.toString());
            mockParticipantManager.getMessagingService().sendAndWait(criteria2, message, mockAsyncCallback, -1);
            String str = (String) mockAsyncCallback._message.getResultMap().get("SchedulerMessageId");
            HelixDataAccessor helixDataAccessor = mockParticipantManager.getHelixDataAccessor();
            PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
            for (int i3 = 0; i3 < 100; i3++) {
                Thread.sleep(200L);
                if (helixDataAccessor.getProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), str)).getRecord().getMapFields().containsKey("Summary")) {
                    break;
                }
            }
            ZNRecord record = helixDataAccessor.getProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), str)).getRecord();
            Assert.assertTrue(((String) record.getMapField("SentMessageCount").get("MessageCount")).equals("12"));
            int i4 = 0;
            Iterator it = record.getMapFields().keySet().iterator();
            while (it.hasNext()) {
                if (((String) it.next()).startsWith("MessageResult")) {
                    i4++;
                }
            }
            Assert.assertEquals(i4, 12);
            int i5 = 0;
            Iterator<Set<String>> it2 = this._factory._results.values().iterator();
            while (it2.hasNext()) {
                i5 += it2.next().size();
            }
            Assert.assertEquals(i5, 12 * (i2 + 1));
        }
    }

    @Test
    public void TestSchedulerMsg4() throws Exception {
        this._factory._results.clear();
        MockParticipantManager mockParticipantManager = null;
        for (int i = 0; i < 5; i++) {
            this._participants[i].getMessagingService().registerMessageHandlerFactory(this._factory.getMessageType(), this._factory);
            this._participants[i].getMessagingService().registerMessageHandlerFactory(this._factory.getMessageType(), this._factory);
            mockParticipantManager = this._participants[i];
        }
        Message message = new Message(Message.MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
        message.setTgtSessionId("*");
        message.setTgtName("CONTROLLER");
        message.setSrcName("CONTROLLER");
        Message message2 = new Message(this._factory.getMessageType(), "Template");
        message2.setTgtSessionId("*");
        message2.setMsgState(Message.MessageState.NEW);
        Criteria criteria = new Criteria();
        criteria.setInstanceName("localhost_%");
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setSessionSpecific(false);
        criteria.setResource("TestDB");
        criteria.setPartition("%");
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.getSerializationConfig().set(SerializationConfig.Feature.INDENT_OUTPUT, true);
        StringWriter stringWriter = new StringWriter();
        objectMapper.writeValue(stringWriter, criteria);
        message.getRecord().setSimpleField("Criteria", stringWriter.toString());
        message.getRecord().setMapField("MessageTemplate", message2.getRecord().getSimpleFields());
        message.getRecord().setSimpleField("TIMEOUT", "-1");
        message.getRecord().setSimpleField("WAIT_ALL", "true");
        message.getRecord().setSimpleField("SchedulerTaskQueue", "TestSchedulerMsg4");
        Criteria criteria2 = new Criteria();
        criteria2.setRecipientInstanceType(InstanceType.CONTROLLER);
        criteria2.setInstanceName("*");
        criteria2.setSessionSpecific(false);
        TreeMap treeMap = new TreeMap();
        treeMap.put("MESSAGE_TYPE", "STATE_TRANSITION");
        treeMap.put("TRANSITION", "OFFLINE-COMPLETED");
        treeMap.put("CONSTRAINT_VALUE", "1");
        treeMap.put("INSTANCE", ".*");
        mockParticipantManager.getClusterManagmentTool().setConstraint(mockParticipantManager.getClusterName(), ClusterConstraints.ConstraintType.MESSAGE_CONSTRAINT, "constraint1", new ConstraintItem(treeMap));
        MockAsyncCallback mockAsyncCallback = new MockAsyncCallback();
        criteria.setInstanceName("localhost_%");
        ObjectMapper objectMapper2 = new ObjectMapper();
        objectMapper2.getSerializationConfig().set(SerializationConfig.Feature.INDENT_OUTPUT, true);
        StringWriter stringWriter2 = new StringWriter();
        objectMapper2.writeValue(stringWriter2, criteria);
        message.getRecord().setSimpleField("Criteria", stringWriter2.toString());
        mockParticipantManager.getMessagingService().sendAndWait(criteria2, message, mockAsyncCallback, -1);
        String str = (String) mockAsyncCallback._message.getResultMap().get("SchedulerMessageId");
        HelixDataAccessor helixDataAccessor = mockParticipantManager.getHelixDataAccessor();
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 5; i2++) {
            MockAsyncCallback mockAsyncCallback2 = new MockAsyncCallback();
            criteria.setInstanceName("localhost_" + (12918 + i2));
            ObjectMapper objectMapper3 = new ObjectMapper();
            objectMapper3.getSerializationConfig().set(SerializationConfig.Feature.INDENT_OUTPUT, true);
            StringWriter stringWriter3 = new StringWriter();
            objectMapper3.writeValue(stringWriter3, criteria);
            message.setMsgId(UUID.randomUUID().toString());
            message.getRecord().setSimpleField("Criteria", stringWriter3.toString());
            mockParticipantManager.getMessagingService().sendAndWait(criteria2, message, mockAsyncCallback2, -1);
            arrayList.add((String) mockAsyncCallback2._message.getResultMap().get("SchedulerMessageId"));
        }
        for (int i3 = 0; i3 < 5; i3++) {
            String str2 = (String) arrayList.get(i3);
            for (int i4 = 0; i4 < 100; i4++) {
                Thread.sleep(200L);
                if (helixDataAccessor.getProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), str2)).getRecord().getMapFields().containsKey("Summary")) {
                    break;
                }
            }
            ZNRecord record = helixDataAccessor.getProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), str2)).getRecord();
            Assert.assertTrue(((String) record.getMapField("SentMessageCount").get("MessageCount")).equals("12"));
            int i5 = 0;
            Iterator it = record.getMapFields().keySet().iterator();
            while (it.hasNext()) {
                if (((String) it.next()).startsWith("MessageResult")) {
                    i5++;
                }
            }
            if (i5 != 12) {
                int i6 = 10 + i5;
            }
            Assert.assertEquals(i5, 12);
        }
        for (int i7 = 0; i7 < 100; i7++) {
            Thread.sleep(200L);
            if (helixDataAccessor.getProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), str)).getRecord().getMapFields().containsKey("Summary")) {
                break;
            }
        }
        int i8 = 0;
        Iterator<Set<String>> it2 = this._factory._results.values().iterator();
        while (it2.hasNext()) {
            i8 += it2.next().size();
        }
        Assert.assertEquals(i8, 120);
    }

    @Test
    public void TestSchedulerMsgContraints() throws JsonGenerationException, JsonMappingException, IOException, InterruptedException {
        TestMessagingHandlerFactoryLatch testMessagingHandlerFactoryLatch = new TestMessagingHandlerFactoryLatch();
        MockParticipantManager mockParticipantManager = null;
        for (int i = 0; i < 5; i++) {
            this._participants[i].getMessagingService().registerMessageHandlerFactory(testMessagingHandlerFactoryLatch.getMessageType(), testMessagingHandlerFactoryLatch);
            this._participants[i].getMessagingService().registerMessageHandlerFactory(testMessagingHandlerFactoryLatch.getMessageType(), testMessagingHandlerFactoryLatch);
            mockParticipantManager = this._participants[i];
        }
        Message message = new Message(Message.MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
        message.setTgtSessionId("*");
        message.setTgtName("CONTROLLER");
        message.setSrcName("CONTROLLER");
        Message message2 = new Message(testMessagingHandlerFactoryLatch.getMessageType(), "Template");
        message2.setTgtSessionId("*");
        message2.setMsgState(Message.MessageState.NEW);
        Criteria criteria = new Criteria();
        criteria.setInstanceName("localhost_%");
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setSessionSpecific(false);
        criteria.setResource("%");
        criteria.setPartition("%");
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.getSerializationConfig().set(SerializationConfig.Feature.INDENT_OUTPUT, true);
        StringWriter stringWriter = new StringWriter();
        objectMapper.writeValue(stringWriter, criteria);
        message.getRecord().setSimpleField("Criteria", stringWriter.toString());
        message.getRecord().setMapField("MessageTemplate", message2.getRecord().getSimpleFields());
        message.getRecord().setSimpleField("TIMEOUT", "-1");
        message.getRecord().setSimpleField("WAIT_ALL", "true");
        message.getRecord().setSimpleField("SchedulerTaskQueue", "TestSchedulerMsgContraints");
        Criteria criteria2 = new Criteria();
        criteria2.setRecipientInstanceType(InstanceType.CONTROLLER);
        criteria2.setInstanceName("*");
        criteria2.setSessionSpecific(false);
        MockAsyncCallback mockAsyncCallback = new MockAsyncCallback();
        ObjectMapper objectMapper2 = new ObjectMapper();
        objectMapper2.getSerializationConfig().set(SerializationConfig.Feature.INDENT_OUTPUT, true);
        StringWriter stringWriter2 = new StringWriter();
        objectMapper2.writeValue(stringWriter2, criteria);
        HelixDataAccessor helixDataAccessor = mockParticipantManager.getHelixDataAccessor();
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        TreeMap treeMap = new TreeMap();
        treeMap.put("MESSAGE_TYPE", "STATE_TRANSITION");
        treeMap.put("TRANSITION", "OFFLINE-COMPLETED");
        treeMap.put("CONSTRAINT_VALUE", "1");
        treeMap.put("INSTANCE", ".*");
        mockParticipantManager.getClusterManagmentTool().setConstraint(mockParticipantManager.getClusterName(), ClusterConstraints.ConstraintType.MESSAGE_CONSTRAINT, "constraint1", new ConstraintItem(treeMap));
        message.getRecord().setSimpleField("Criteria", stringWriter2.toString());
        mockParticipantManager.getMessagingService().sendAndWait(criteria2, message, mockAsyncCallback, -1);
        String str = (String) mockAsyncCallback._message.getResultMap().get("SchedulerMessageId");
        int i2 = 0;
        while (true) {
            if (i2 >= 10) {
                break;
            }
            Thread.sleep(200L);
            ZNRecord record = helixDataAccessor.getProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), str)).getRecord();
            if (record.getMapFields().containsKey("SentMessageCount")) {
                Assert.assertEquals((String) ((Map) record.getMapFields().get("SentMessageCount")).get("MessageCount"), "60");
                break;
            }
            i2++;
        }
        for (int i3 = 0; i3 < 12; i3++) {
            for (int i4 = 0; i4 < 10; i4++) {
                Thread.sleep(300L);
                if (testMessagingHandlerFactoryLatch._messageCount == 5 * (i3 + 1)) {
                    break;
                }
            }
            Thread.sleep(300L);
            Assert.assertEquals(testMessagingHandlerFactoryLatch._messageCount, 5 * (i3 + 1));
            testMessagingHandlerFactoryLatch.signal();
        }
        for (int i5 = 0; i5 < 10; i5++) {
            Thread.sleep(200L);
            if (helixDataAccessor.getProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), str)).getRecord().getMapFields().containsKey("Summary")) {
                break;
            }
        }
        Assert.assertEquals(20, testMessagingHandlerFactoryLatch._results.size());
        ZNRecord record2 = helixDataAccessor.getProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), str)).getRecord();
        Assert.assertTrue(((String) record2.getMapField("SentMessageCount").get("MessageCount")).equals("60"));
        int i6 = 0;
        Iterator it = record2.getMapFields().keySet().iterator();
        while (it.hasNext()) {
            if (((String) it.next()).startsWith("MessageResult ")) {
                i6++;
            }
        }
        Assert.assertEquals(i6, 60);
        int i7 = 0;
        Iterator<Set<String>> it2 = testMessagingHandlerFactoryLatch._results.values().iterator();
        while (it2.hasNext()) {
            i7 += it2.next().size();
        }
        Assert.assertEquals(i7, 60);
        mockParticipantManager.getClusterManagmentTool().removeConstraint(mockParticipantManager.getClusterName(), ClusterConstraints.ConstraintType.MESSAGE_CONSTRAINT, "constraint1");
    }
}
