package org.apache.helix.messaging.handling;

import com.google.common.collect.ImmutableList;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.MockAccessor;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.controller.stages.TestStateTransitionPriority;
import org.apache.helix.examples.OnlineOfflineStateModelFactory;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.messaging.handling.HelixStateTransitionHandler;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.mock.MockClusterMessagingService;
import org.apache.helix.mock.MockManager;
import org.apache.helix.mock.statemodel.MockMasterSlaveStateModel;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/messaging/handling/TestHelixTaskExecutor.class */
public class TestHelixTaskExecutor {

    /* loaded from: input_file:org/apache/helix/messaging/handling/TestHelixTaskExecutor$CancellableHandlerFactory.class */
    class CancellableHandlerFactory implements MultiTypeMessageHandlerFactory {
        int _handlersCreated = 0;
        ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<>();
        ConcurrentHashMap<String, String> _processingMsgIds = new ConcurrentHashMap<>();
        ConcurrentHashMap<String, String> _timedOutMsgIds = new ConcurrentHashMap<>();

        /* loaded from: input_file:org/apache/helix/messaging/handling/TestHelixTaskExecutor$CancellableHandlerFactory$CancellableHandler.class */
        class CancellableHandler extends MessageHandler {
            public boolean _interrupted;

            public CancellableHandler(Message message, NotificationContext notificationContext) {
                super(message, notificationContext);
                this._interrupted = false;
            }

            public HelixTaskResult handleMessage() throws InterruptedException {
                HelixTaskResult helixTaskResult = new HelixTaskResult();
                int i = this._message.getRecord().getSimpleFields().containsKey("Cancelcount") ? 10 : 15;
                CancellableHandlerFactory.this._processingMsgIds.put(this._message.getMsgId(), this._message.getMsgId());
                for (int i2 = 0; i2 < i; i2++) {
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                        this._interrupted = true;
                        CancellableHandlerFactory.this._timedOutMsgIds.put(this._message.getMsgId(), "");
                        helixTaskResult.setInterrupted(true);
                        if (this._message.getRecord().getSimpleFields().containsKey("Cancelcount")) {
                            this._message.getRecord().setSimpleField("Cancelcount", (Integer.parseInt(this._message.getRecord().getSimpleField("Cancelcount")) + 1));
                        } else {
                            this._message.getRecord().setSimpleField("Cancelcount", "1");
                        }
                        throw e;
                    }
                }
                CancellableHandlerFactory.this._processedMsgIds.put(this._message.getMsgId(), this._message.getMsgId());
                helixTaskResult.setSuccess(true);
                return helixTaskResult;
            }

            public void onError(Exception exc, MessageHandler.ErrorCode errorCode, MessageHandler.ErrorType errorType) {
                this._message.getRecord().setSimpleField("exception", exc.getMessage());
            }
        }

        CancellableHandlerFactory() {
        }

        public MessageHandler createHandler(Message message, NotificationContext notificationContext) {
            this._handlersCreated++;
            return new CancellableHandler(message, notificationContext);
        }

        public List<String> getMessageTypes() {
            return ImmutableList.of("Cancellable");
        }

        public void reset() {
            this._handlersCreated = 0;
            this._processedMsgIds.clear();
            this._processingMsgIds.clear();
            this._timedOutMsgIds.clear();
        }
    }

    /* loaded from: input_file:org/apache/helix/messaging/handling/TestHelixTaskExecutor$MockClusterManager.class */
    public static class MockClusterManager extends MockManager {
        @Override // org.apache.helix.mock.MockManager
        public String getSessionId() {
            return "123";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/helix/messaging/handling/TestHelixTaskExecutor$TestMessageHandlerFactory.class */
    public class TestMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
        final int _messageDelay;
        int _handlersCreated;
        ConcurrentHashMap<String, String> _processedMsgIds;
        ConcurrentSkipListSet<String> _completedMsgIds;

        /* loaded from: input_file:org/apache/helix/messaging/handling/TestHelixTaskExecutor$TestMessageHandlerFactory$TestMessageHandler.class */
        class TestMessageHandler extends MessageHandler {
            public TestMessageHandler(Message message, NotificationContext notificationContext) {
                super(message, notificationContext);
            }

            public HelixTaskResult handleMessage() throws InterruptedException {
                HelixTaskResult helixTaskResult = new HelixTaskResult();
                TestMessageHandlerFactory.this._processedMsgIds.put(this._message.getMsgId(), this._message.getMsgId());
                Thread.sleep(TestMessageHandlerFactory.this._messageDelay);
                helixTaskResult.setSuccess(true);
                TestMessageHandlerFactory.this._completedMsgIds.add(this._message.getMsgId());
                return helixTaskResult;
            }

            public void onError(Exception exc, MessageHandler.ErrorCode errorCode, MessageHandler.ErrorType errorType) {
            }
        }

        TestMessageHandlerFactory(int i) {
            this._handlersCreated = 0;
            this._processedMsgIds = new ConcurrentHashMap<>();
            this._completedMsgIds = new ConcurrentSkipListSet<>();
            this._messageDelay = i;
        }

        TestMessageHandlerFactory() {
            this._handlersCreated = 0;
            this._processedMsgIds = new ConcurrentHashMap<>();
            this._completedMsgIds = new ConcurrentSkipListSet<>();
            this._messageDelay = 100;
        }

        public MessageHandler createHandler(Message message, NotificationContext notificationContext) {
            if (message.getMsgSubType() != null && message.getMsgSubType().equals("EXCEPTION")) {
                throw new HelixException("Test Message handler exception, can ignore");
            }
            this._handlersCreated++;
            return new TestMessageHandler(message, notificationContext);
        }

        public List<String> getMessageTypes() {
            return Collections.singletonList("TestingMessageHandler");
        }

        public void reset() {
        }
    }

    /* loaded from: input_file:org/apache/helix/messaging/handling/TestHelixTaskExecutor$TestMessageHandlerFactory2.class */
    class TestMessageHandlerFactory2 extends TestMessageHandlerFactory {
        TestMessageHandlerFactory2() {
            super();
        }

        @Override // org.apache.helix.messaging.handling.TestHelixTaskExecutor.TestMessageHandlerFactory
        public List<String> getMessageTypes() {
            return ImmutableList.of("TestingMessageHandler2");
        }
    }

    /* loaded from: input_file:org/apache/helix/messaging/handling/TestHelixTaskExecutor$TestMessageHandlerFactory3.class */
    private class TestMessageHandlerFactory3 extends TestMessageHandlerFactory {
        private boolean _resetDone;

        private TestMessageHandlerFactory3() {
            super();
            this._resetDone = false;
        }

        @Override // org.apache.helix.messaging.handling.TestHelixTaskExecutor.TestMessageHandlerFactory
        public List<String> getMessageTypes() {
            return ImmutableList.of("msgType1", "msgType2", "msgType3");
        }

        @Override // org.apache.helix.messaging.handling.TestHelixTaskExecutor.TestMessageHandlerFactory
        public void reset() {
            Assert.assertFalse(this._resetDone, "reset() should only be triggered once in TestMessageHandlerFactory3");
            this._resetDone = true;
        }
    }

    /* loaded from: input_file:org/apache/helix/messaging/handling/TestHelixTaskExecutor$TestStateTransitionHandlerFactory.class */
    class TestStateTransitionHandlerFactory implements MultiTypeMessageHandlerFactory {
        ConcurrentHashMap<String, String> _processedMsgIds;
        private final String _msgType;
        private final long _delay;

        /* loaded from: input_file:org/apache/helix/messaging/handling/TestHelixTaskExecutor$TestStateTransitionHandlerFactory$TestStateTransitionMessageHandler.class */
        class TestStateTransitionMessageHandler extends HelixStateTransitionHandler {
            static final /* synthetic */ boolean $assertionsDisabled;

            public TestStateTransitionMessageHandler(Message message, NotificationContext notificationContext, CurrentState currentState) {
                super(new StateModelFactory<StateModel>() { // from class: org.apache.helix.messaging.handling.TestHelixTaskExecutor.TestStateTransitionHandlerFactory.TestStateTransitionMessageHandler.1
                }, new StateModel() { // from class: org.apache.helix.messaging.handling.TestHelixTaskExecutor.TestStateTransitionHandlerFactory.TestStateTransitionMessageHandler.2
                }, message, notificationContext, currentState);
            }

            public HelixTaskResult handleMessage() {
                HelixTaskResult helixTaskResult = new HelixTaskResult();
                TestStateTransitionHandlerFactory.this._processedMsgIds.put(this._message.getMsgId(), this._message.getMsgId());
                if (TestStateTransitionHandlerFactory.this._delay > 0) {
                    System.out.println("Sleeping..." + TestStateTransitionHandlerFactory.this._delay);
                    try {
                        Thread.sleep(TestStateTransitionHandlerFactory.this._delay);
                    } catch (Exception e) {
                        if (!$assertionsDisabled) {
                            throw new AssertionError();
                        }
                    }
                }
                helixTaskResult.setSuccess(true);
                return helixTaskResult;
            }

            public HelixStateTransitionHandler.StaleMessageValidateResult staleMessageValidator() {
                return super.staleMessageValidator();
            }

            static {
                $assertionsDisabled = !TestHelixTaskExecutor.class.desiredAssertionStatus();
            }
        }

        public TestStateTransitionHandlerFactory(TestHelixTaskExecutor testHelixTaskExecutor, String str) {
            this(str, -1L);
        }

        public TestStateTransitionHandlerFactory(String str, long j) {
            this._processedMsgIds = new ConcurrentHashMap<>();
            this._msgType = str;
            this._delay = j;
        }

        public MessageHandler createHandler(Message message, NotificationContext notificationContext) {
            CurrentState currentState = new CurrentState(message.getResourceName());
            currentState.setSessionId(message.getTgtSessionId());
            currentState.setStateModelDefRef(message.getStateModelDef());
            currentState.setStateModelFactoryName(message.getStateModelFactoryName());
            currentState.setBucketSize(message.getBucketSize());
            if (message.getResourceName().equals("testStaledMessageResource")) {
                currentState.setState(message.getPartitionName(), "MASTER");
            } else {
                currentState.setState(message.getPartitionName(), "SLAVE");
            }
            return new TestStateTransitionMessageHandler(message, notificationContext, currentState);
        }

        public List<String> getMessageTypes() {
            return ImmutableList.of(this._msgType);
        }

        public void reset() {
        }
    }

    @BeforeClass
    public void beforeClass() {
        System.out.println("START " + TestHelper.getTestClassName());
    }

    @AfterClass
    public void afterClass() {
        System.out.println("End " + TestHelper.getTestClassName());
    }

    @Test
    public void testNormalMsgExecution() throws InterruptedException {
        System.out.println("START TestCMTaskExecutor.testNormalMsgExecution()");
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        TestMessageHandlerFactory testMessageHandlerFactory = new TestMessageHandlerFactory();
        Iterator<String> it = testMessageHandlerFactory.getMessageTypes().iterator();
        while (it.hasNext()) {
            helixTaskExecutor.registerMessageHandlerFactory(it.next(), testMessageHandlerFactory);
        }
        TestMessageHandlerFactory2 testMessageHandlerFactory2 = new TestMessageHandlerFactory2();
        Iterator<String> it2 = testMessageHandlerFactory2.getMessageTypes().iterator();
        while (it2.hasNext()) {
            helixTaskExecutor.registerMessageHandlerFactory(it2.next(), testMessageHandlerFactory2);
        }
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        ArrayList<Message> arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            Message message = new Message(testMessageHandlerFactory.getMessageTypes().get(0), UUID.randomUUID().toString());
            message.setTgtSessionId(mockClusterManager.getSessionId());
            message.setTgtName("Localhost_1123");
            message.setSrcName("127.101.1.23_2234");
            message.setCorrelationId(UUID.randomUUID().toString());
            arrayList.add(message);
        }
        for (int i2 = 0; i2 < 6; i2++) {
            Message message2 = new Message(testMessageHandlerFactory2.getMessageTypes().get(0), UUID.randomUUID().toString());
            message2.setTgtSessionId(mockClusterManager.getSessionId());
            message2.setTgtName("Localhost_1123");
            message2.setSrcName("127.101.1.23_2234");
            message2.setCorrelationId(UUID.randomUUID().toString());
            arrayList.add(message2);
        }
        notificationContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
        helixTaskExecutor.onMessage("someInstance", arrayList, notificationContext);
        Thread.sleep(1000L);
        AssertJUnit.assertTrue(testMessageHandlerFactory._processedMsgIds.size() == 5);
        AssertJUnit.assertTrue(testMessageHandlerFactory2._processedMsgIds.size() == 6);
        AssertJUnit.assertTrue(testMessageHandlerFactory._handlersCreated == 5);
        AssertJUnit.assertTrue(testMessageHandlerFactory2._handlersCreated == 6);
        for (Message message3 : arrayList) {
            AssertJUnit.assertTrue(testMessageHandlerFactory._processedMsgIds.containsKey(message3.getId()) || testMessageHandlerFactory2._processedMsgIds.containsKey(message3.getId()));
            AssertJUnit.assertFalse(testMessageHandlerFactory._processedMsgIds.containsKey(message3.getId()) && testMessageHandlerFactory2._processedMsgIds.containsKey(message3.getId()));
        }
        System.out.println("END TestCMTaskExecutor.testNormalMsgExecution()");
    }

    @Test
    public void testDuplicatedMessage() throws InterruptedException {
        System.out.println("START TestHelixTaskExecutor.testDuplicatedMessage()");
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        HelixDataAccessor helixDataAccessor = mockClusterManager.getHelixDataAccessor();
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        helixTaskExecutor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), new TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), 1000L));
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        ArrayList arrayList = new ArrayList();
        String instanceName = mockClusterManager.getInstanceName();
        for (int i = 0; i < 3; i++) {
            Message message = new Message(Message.MessageType.STATE_TRANSITION.name(), UUID.randomUUID().toString());
            message.setTgtSessionId(mockClusterManager.getSessionId());
            message.setCreateTimeStamp(i);
            message.setTgtName("Localhost_1123");
            message.setSrcName("127.101.1.23_2234");
            message.setPartitionName(TestStateTransitionPriority.PARTITION);
            message.setResourceName(TestStateTransitionPriority.RESOURCE);
            message.setStateModelDef("DummyMasterSlave");
            message.setFromState("SLAVE");
            message.setToState("MASTER");
            helixDataAccessor.setProperty(message.getKey(keyBuilder, instanceName), message);
            arrayList.add(message);
        }
        AssertJUnit.assertEquals(helixDataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(), 3);
        notificationContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
        helixTaskExecutor.onMessage(instanceName, arrayList, notificationContext);
        Thread.sleep(200L);
        Assert.assertEquals(helixDataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(), 1);
        ((Message) arrayList.get(2)).setMsgState(Message.MessageState.NEW);
        helixDataAccessor.setProperty(((Message) arrayList.get(2)).getKey(keyBuilder, instanceName), (Message) arrayList.get(2));
        helixTaskExecutor.onMessage(instanceName, Arrays.asList((Message) arrayList.get(2)), notificationContext);
        Thread.sleep(200L);
        Assert.assertEquals(helixDataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(), 1);
        Thread.sleep(1000L);
        Assert.assertEquals(helixDataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(), 0);
        System.out.println("END TestHelixTaskExecutor.testDuplicatedMessage()");
    }

    @Test
    public void testStaledMessage() throws InterruptedException {
        System.out.println("START TestHelixTaskExecutor.testStaledMessage()");
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        HelixDataAccessor helixDataAccessor = mockClusterManager.getHelixDataAccessor();
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        helixTaskExecutor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), new TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), 1000L));
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        ArrayList arrayList = new ArrayList();
        String instanceName = mockClusterManager.getInstanceName();
        for (int i = 0; i < 1; i++) {
            Message message = new Message(Message.MessageType.STATE_TRANSITION.name(), UUID.randomUUID().toString());
            message.setTgtSessionId(mockClusterManager.getSessionId());
            message.setCreateTimeStamp(i);
            message.setTgtName("Localhost_1123");
            message.setSrcName("127.101.1.23_2234");
            message.setPartitionName(TestStateTransitionPriority.PARTITION);
            message.setResourceName("testStaledMessageResource");
            message.setStateModelDef("DummyMasterSlave");
            message.setFromState("SLAVE");
            message.setToState("MASTER");
            helixDataAccessor.setProperty(message.getKey(keyBuilder, instanceName), message);
            arrayList.add(message);
        }
        Assert.assertEquals(helixDataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(), 1);
        notificationContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
        helixTaskExecutor.onMessage(instanceName, arrayList, notificationContext);
        Thread.sleep(200L);
        Assert.assertEquals(helixDataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(), 0);
        System.out.println("END TestHelixTaskExecutor.testStaledMessage()");
    }

    @Test
    public void testUnknownTypeMsgExecution() throws InterruptedException {
        System.out.println("START " + TestHelper.getTestMethodName());
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        TestMessageHandlerFactory testMessageHandlerFactory = new TestMessageHandlerFactory();
        Iterator<String> it = testMessageHandlerFactory.getMessageTypes().iterator();
        while (it.hasNext()) {
            helixTaskExecutor.registerMessageHandlerFactory(it.next(), testMessageHandlerFactory);
        }
        TestMessageHandlerFactory2 testMessageHandlerFactory2 = new TestMessageHandlerFactory2();
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        ArrayList<Message> arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            Message message = new Message(testMessageHandlerFactory.getMessageTypes().get(0), UUID.randomUUID().toString());
            message.setTgtSessionId(mockClusterManager.getSessionId());
            message.setTgtName("Localhost_1123");
            message.setSrcName("127.101.1.23_2234");
            arrayList.add(message);
        }
        for (int i2 = 0; i2 < 4; i2++) {
            Message message2 = new Message(testMessageHandlerFactory2.getMessageTypes().get(0), UUID.randomUUID().toString());
            message2.setTgtSessionId(mockClusterManager.getSessionId());
            message2.setTgtName("Localhost_1123");
            message2.setSrcName("127.101.1.23_2234");
            arrayList.add(message2);
        }
        notificationContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
        helixTaskExecutor.onMessage("someInstance", arrayList, notificationContext);
        Thread.sleep(1000L);
        AssertJUnit.assertTrue(testMessageHandlerFactory._processedMsgIds.size() == 5);
        AssertJUnit.assertTrue(testMessageHandlerFactory2._processedMsgIds.size() == 0);
        AssertJUnit.assertTrue(testMessageHandlerFactory._handlersCreated == 5);
        AssertJUnit.assertTrue(testMessageHandlerFactory2._handlersCreated == 0);
        for (Message message3 : arrayList) {
            if (testMessageHandlerFactory.getMessageTypes().contains(message3.getMsgType())) {
                AssertJUnit.assertTrue(testMessageHandlerFactory._processedMsgIds.containsKey(message3.getId()));
            }
        }
        System.out.println("END " + TestHelper.getTestMethodName());
    }

    @Test
    public void testMsgSessionId() throws InterruptedException {
        System.out.println("START " + TestHelper.getTestMethodName());
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        TestMessageHandlerFactory testMessageHandlerFactory = new TestMessageHandlerFactory();
        Iterator<String> it = testMessageHandlerFactory.getMessageTypes().iterator();
        while (it.hasNext()) {
            helixTaskExecutor.registerMessageHandlerFactory(it.next(), testMessageHandlerFactory);
        }
        TestMessageHandlerFactory2 testMessageHandlerFactory2 = new TestMessageHandlerFactory2();
        Iterator<String> it2 = testMessageHandlerFactory2.getMessageTypes().iterator();
        while (it2.hasNext()) {
            helixTaskExecutor.registerMessageHandlerFactory(it2.next(), testMessageHandlerFactory2);
        }
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        ArrayList<Message> arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            Message message = new Message(testMessageHandlerFactory.getMessageTypes().get(0), UUID.randomUUID().toString());
            message.setTgtSessionId("*");
            message.setTgtName("");
            arrayList.add(message);
        }
        for (int i2 = 0; i2 < 4; i2++) {
            Message message2 = new Message(testMessageHandlerFactory2.getMessageTypes().get(0), UUID.randomUUID().toString());
            message2.setTgtSessionId("some other session id");
            message2.setTgtName("");
            arrayList.add(message2);
        }
        notificationContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
        helixTaskExecutor.onMessage("someInstance", arrayList, notificationContext);
        Thread.sleep(1000L);
        AssertJUnit.assertTrue(testMessageHandlerFactory._processedMsgIds.size() == 5);
        AssertJUnit.assertTrue(testMessageHandlerFactory2._processedMsgIds.size() == 0);
        AssertJUnit.assertTrue(testMessageHandlerFactory._handlersCreated == 5);
        AssertJUnit.assertTrue(testMessageHandlerFactory2._handlersCreated == 0);
        for (Message message3 : arrayList) {
            if (testMessageHandlerFactory.getMessageTypes().contains(message3.getMsgType())) {
                AssertJUnit.assertTrue(testMessageHandlerFactory._processedMsgIds.containsKey(message3.getId()));
            }
        }
        System.out.println("END " + TestHelper.getTestMethodName());
    }

    @Test
    public void testCreateHandlerException() throws Exception {
        System.out.println("START TestCMTaskExecutor.testCreateHandlerException()");
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        HelixDataAccessor helixDataAccessor = mockClusterManager.getHelixDataAccessor();
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        TestMessageHandlerFactory testMessageHandlerFactory = new TestMessageHandlerFactory();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            Message message = new Message(testMessageHandlerFactory.getMessageTypes().get(0), UUID.randomUUID().toString());
            message.setTgtSessionId(mockClusterManager.getSessionId());
            message.setTgtName("Localhost_1123");
            message.setSrcName("127.101.1.23_2234");
            message.setCorrelationId(UUID.randomUUID().toString());
            helixDataAccessor.setProperty(keyBuilder.message(mockClusterManager.getInstanceName(), message.getMsgId()), message);
            arrayList.add(message);
        }
        notificationContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
        helixTaskExecutor.onMessage(mockClusterManager.getInstanceName(), Collections.emptyList(), notificationContext);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Message property = helixDataAccessor.getProperty(keyBuilder.message(mockClusterManager.getInstanceName(), ((Message) it.next()).getMsgId()));
            Assert.assertNotNull(property);
            Assert.assertEquals(property.getMsgState(), Message.MessageState.NEW);
            Assert.assertEquals(property.getRetryCount(), 0);
        }
        Iterator<String> it2 = testMessageHandlerFactory.getMessageTypes().iterator();
        while (it2.hasNext()) {
            helixTaskExecutor.registerMessageHandlerFactory(it2.next(), testMessageHandlerFactory);
        }
        Message message2 = new Message(testMessageHandlerFactory.getMessageTypes().get(0), UUID.randomUUID().toString());
        message2.setTgtSessionId(mockClusterManager.getSessionId());
        message2.setMsgSubType("EXCEPTION");
        message2.setTgtName("Localhost_1123");
        message2.setSrcName("127.101.1.23_2234");
        message2.setCorrelationId(UUID.randomUUID().toString());
        helixDataAccessor.setProperty(keyBuilder.message(mockClusterManager.getInstanceName(), message2.getMsgId()), message2);
        notificationContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
        helixTaskExecutor.onMessage(mockClusterManager.getInstanceName(), Collections.emptyList(), notificationContext);
        Assert.assertTrue(TestHelper.verify(() -> {
            Message property2 = helixDataAccessor.getProperty(keyBuilder.message(mockClusterManager.getInstanceName(), message2.getMsgId()));
            return property2 != null && property2.getMsgState().equals(Message.MessageState.UNPROCESSABLE) && property2.getRetryCount() == -1;
        }, TestHelper.WAIT_DURATION), "The exception message should be retied once and in UNPROCESSABLE state.");
        Assert.assertTrue(TestHelper.verify(() -> {
            Iterator it3 = arrayList.iterator();
            while (it3.hasNext()) {
                if (helixDataAccessor.getProperty(keyBuilder.message(mockClusterManager.getInstanceName(), ((Message) it3.next()).getMsgId())) != null) {
                    return false;
                }
            }
            return true;
        }, TestHelper.WAIT_DURATION), "The normal messages should be all processed normally.");
        Assert.assertEquals(testMessageHandlerFactory._processedMsgIds.size(), 5);
        Assert.assertEquals(testMessageHandlerFactory._handlersCreated, 5);
        System.out.println("END TestCMTaskExecutor.testCreateHandlerException()");
    }

    @Test
    public void testTaskCancellation() throws InterruptedException {
        System.out.println("START " + TestHelper.getTestMethodName());
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        CancellableHandlerFactory cancellableHandlerFactory = new CancellableHandlerFactory();
        Iterator<String> it = cancellableHandlerFactory.getMessageTypes().iterator();
        while (it.hasNext()) {
            helixTaskExecutor.registerMessageHandlerFactory(it.next(), cancellableHandlerFactory);
        }
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        ArrayList<Message> arrayList = new ArrayList();
        for (int i = 0; i < 0; i++) {
            Message message = new Message(cancellableHandlerFactory.getMessageTypes().get(0), UUID.randomUUID().toString());
            message.setTgtSessionId("*");
            message.setTgtName("Localhost_1123");
            message.setSrcName("127.101.1.23_2234");
            arrayList.add(message);
        }
        ArrayList arrayList2 = new ArrayList();
        for (int i2 = 0; i2 < 4; i2++) {
            Message message2 = new Message(cancellableHandlerFactory.getMessageTypes().get(0), UUID.randomUUID().toString());
            message2.setTgtSessionId("*");
            arrayList.add(message2);
            message2.setTgtName("Localhost_1123");
            message2.setSrcName("127.101.1.23_2234");
            arrayList2.add(message2);
        }
        notificationContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
        helixTaskExecutor.onMessage("someInstance", arrayList, notificationContext);
        Thread.sleep(500L);
        for (int i3 = 0; i3 < 4; i3++) {
            helixTaskExecutor.cancelTask(new HelixTask((Message) arrayList2.get(i3), notificationContext, (MessageHandler) null, (HelixTaskExecutor) null));
        }
        Thread.sleep(1500L);
        AssertJUnit.assertTrue(cancellableHandlerFactory._processedMsgIds.size() == 0);
        AssertJUnit.assertTrue(cancellableHandlerFactory._handlersCreated == 0 + 4);
        AssertJUnit.assertTrue(cancellableHandlerFactory._processingMsgIds.size() == 0 + 4);
        for (Message message3 : arrayList) {
            if (cancellableHandlerFactory.getMessageTypes().contains(message3.getMsgType())) {
                AssertJUnit.assertTrue(cancellableHandlerFactory._processingMsgIds.containsKey(message3.getId()));
            }
        }
        System.out.println("END " + TestHelper.getTestMethodName());
    }

    @Test
    public void testShutdown() throws InterruptedException {
        System.out.println("START TestCMTaskExecutor.testShutdown()");
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        TestMessageHandlerFactory testMessageHandlerFactory = new TestMessageHandlerFactory();
        Iterator<String> it = testMessageHandlerFactory.getMessageTypes().iterator();
        while (it.hasNext()) {
            helixTaskExecutor.registerMessageHandlerFactory(it.next(), testMessageHandlerFactory);
        }
        TestMessageHandlerFactory2 testMessageHandlerFactory2 = new TestMessageHandlerFactory2();
        Iterator<String> it2 = testMessageHandlerFactory2.getMessageTypes().iterator();
        while (it2.hasNext()) {
            helixTaskExecutor.registerMessageHandlerFactory(it2.next(), testMessageHandlerFactory2);
        }
        CancellableHandlerFactory cancellableHandlerFactory = new CancellableHandlerFactory();
        Iterator<String> it3 = cancellableHandlerFactory.getMessageTypes().iterator();
        while (it3.hasNext()) {
            helixTaskExecutor.registerMessageHandlerFactory(it3.next(), cancellableHandlerFactory);
        }
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            Message message = new Message(testMessageHandlerFactory.getMessageTypes().get(0), UUID.randomUUID().toString());
            message.setTgtSessionId("*");
            message.setTgtName("Localhost_1123");
            message.setSrcName("127.101.1.23_2234");
            arrayList.add(message);
        }
        for (int i2 = 0; i2 < 10; i2++) {
            Message message2 = new Message(testMessageHandlerFactory2.getMessageTypes().get(0), UUID.randomUUID().toString());
            message2.setTgtSessionId("*");
            arrayList.add(message2);
            message2.setTgtName("Localhost_1123");
            message2.setSrcName("127.101.1.23_2234");
            arrayList.add(message2);
        }
        for (int i3 = 0; i3 < 10; i3++) {
            Message message3 = new Message(cancellableHandlerFactory.getMessageTypes().get(0), UUID.randomUUID().toString());
            message3.setTgtSessionId("*");
            arrayList.add(message3);
            message3.setTgtName("Localhost_1123");
            message3.setSrcName("127.101.1.23_2234");
            arrayList.add(message3);
        }
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        notificationContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
        helixTaskExecutor.onMessage("some", arrayList, notificationContext);
        Thread.sleep(500L);
        Iterator it4 = helixTaskExecutor._executorMap.values().iterator();
        while (it4.hasNext()) {
            Assert.assertFalse(((ExecutorService) it4.next()).isShutdown());
        }
        Assert.assertTrue(testMessageHandlerFactory._processedMsgIds.size() > 0);
        helixTaskExecutor.shutdown();
        Iterator it5 = helixTaskExecutor._executorMap.values().iterator();
        while (it5.hasNext()) {
            Assert.assertTrue(((ExecutorService) it5.next()).isShutdown());
        }
        System.out.println("END TestCMTaskExecutor.testShutdown()");
    }

    @Test(dependsOnMethods = {"testShutdown"})
    public void testHandlerResetTimeout() throws Exception {
        System.out.println("START TestCMTaskExecutor.testHandlerResetTimeout()");
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        TestMessageHandlerFactory testMessageHandlerFactory = new TestMessageHandlerFactory(2000);
        helixTaskExecutor.registerMessageHandlerFactory(testMessageHandlerFactory, 40, 100);
        Message message = new Message(testMessageHandlerFactory.getMessageTypes().get(0), UUID.randomUUID().toString());
        message.setTgtSessionId("*");
        message.setTgtName("Localhost_1123");
        message.setSrcName("127.101.1.23_2234");
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        notificationContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
        helixTaskExecutor.onMessage("some", Arrays.asList(message), notificationContext);
        Assert.assertTrue(TestHelper.verify(() -> {
            return testMessageHandlerFactory._processedMsgIds.containsKey(message.getMsgId());
        }, TestHelper.WAIT_DURATION));
        helixTaskExecutor.shutdown();
        Iterator it = helixTaskExecutor._executorMap.values().iterator();
        while (it.hasNext()) {
            Assert.assertTrue(((ExecutorService) it.next()).isShutdown());
        }
        Assert.assertEquals(testMessageHandlerFactory._completedMsgIds.size(), 0);
        HelixTaskExecutor helixTaskExecutor2 = new HelixTaskExecutor();
        helixTaskExecutor2.registerMessageHandlerFactory(testMessageHandlerFactory, 40, 2000 * 2);
        Message message2 = new Message(testMessageHandlerFactory.getMessageTypes().get(0), UUID.randomUUID().toString());
        message2.setTgtSessionId("*");
        message2.setTgtName("Localhost_1123");
        message2.setSrcName("127.101.1.23_2234");
        helixTaskExecutor2.onMessage("some", Arrays.asList(message2), notificationContext);
        Assert.assertTrue(TestHelper.verify(() -> {
            return testMessageHandlerFactory._processedMsgIds.containsKey(message2.getMsgId());
        }, TestHelper.WAIT_DURATION));
        helixTaskExecutor2.shutdown();
        Iterator it2 = helixTaskExecutor2._executorMap.values().iterator();
        while (it2.hasNext()) {
            Assert.assertTrue(((ExecutorService) it2.next()).isShutdown());
        }
        Assert.assertEquals(testMessageHandlerFactory._completedMsgIds.size(), 1);
        Assert.assertTrue(testMessageHandlerFactory._completedMsgIds.contains(message2.getMsgId()));
        System.out.println("END TestCMTaskExecutor.testHandlerResetTimeout()");
    }

    @Test
    public void testMsgHandlerRegistryAndShutdown() {
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        TestMessageHandlerFactory testMessageHandlerFactory = new TestMessageHandlerFactory();
        TestMessageHandlerFactory3 testMessageHandlerFactory3 = new TestMessageHandlerFactory3();
        helixTaskExecutor.registerMessageHandlerFactory(testMessageHandlerFactory, 40, 200);
        helixTaskExecutor.registerMessageHandlerFactory(testMessageHandlerFactory3, 40, 200);
        Message message = new Message(testMessageHandlerFactory.getMessageTypes().get(0), UUID.randomUUID().toString());
        message.setTgtSessionId("*");
        message.setTgtName("Localhost_1123");
        message.setSrcName("127.101.1.23_2234");
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        notificationContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
        helixTaskExecutor.onMessage("some", Collections.singletonList(message), notificationContext);
        Assert.assertEquals(helixTaskExecutor._hdlrFtyRegistry.size(), 4);
        helixTaskExecutor.shutdown();
        Assert.assertTrue(testMessageHandlerFactory3._resetDone, "TestMessageHandlerFactory3 should be reset");
    }

    @Test
    public void testNoRetry() throws InterruptedException {
        System.out.println("START " + TestHelper.getTestMethodName());
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        CancellableHandlerFactory cancellableHandlerFactory = new CancellableHandlerFactory();
        Iterator<String> it = cancellableHandlerFactory.getMessageTypes().iterator();
        while (it.hasNext()) {
            helixTaskExecutor.registerMessageHandlerFactory(it.next(), cancellableHandlerFactory);
        }
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4; i++) {
            Message message = new Message(cancellableHandlerFactory.getMessageTypes().get(0), UUID.randomUUID().toString());
            message.setTgtSessionId("*");
            message.setTgtName("Localhost_1123");
            message.setSrcName("127.101.1.23_2234");
            message.setExecutionTimeout((i + 1) * 600);
            arrayList.add(message);
        }
        notificationContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
        helixTaskExecutor.onMessage("someInstance", arrayList, notificationContext);
        Thread.sleep(4000L);
        AssertJUnit.assertTrue(cancellableHandlerFactory._handlersCreated == 4);
        AssertJUnit.assertEquals(cancellableHandlerFactory._timedOutMsgIds.size(), 2);
        for (int i2 = 0; i2 < 4 - 2; i2++) {
            if (cancellableHandlerFactory.getMessageTypes().contains(((Message) arrayList.get(i2)).getMsgType())) {
                AssertJUnit.assertTrue(((Message) arrayList.get(i2)).getRecord().getSimpleFields().containsKey("Cancelcount"));
                AssertJUnit.assertTrue(cancellableHandlerFactory._timedOutMsgIds.containsKey(((Message) arrayList.get(i2)).getId()));
            }
        }
        System.out.println("END " + TestHelper.getTestMethodName());
    }

    @Test
    public void testRetryOnce() throws InterruptedException {
        System.out.println("START " + TestHelper.getTestMethodName());
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        CancellableHandlerFactory cancellableHandlerFactory = new CancellableHandlerFactory();
        Iterator<String> it = cancellableHandlerFactory.getMessageTypes().iterator();
        while (it.hasNext()) {
            helixTaskExecutor.registerMessageHandlerFactory(it.next(), cancellableHandlerFactory);
        }
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4; i++) {
            Message message = new Message(cancellableHandlerFactory.getMessageTypes().get(0), UUID.randomUUID().toString());
            message.setTgtSessionId("*");
            message.setTgtName("Localhost_1123");
            message.setSrcName("127.101.1.23_2234");
            message.setExecutionTimeout((i + 1) * 600);
            message.setRetryCount(1);
            arrayList.add(message);
        }
        notificationContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
        helixTaskExecutor.onMessage("someInstance", arrayList, notificationContext);
        Thread.sleep(3500L);
        AssertJUnit.assertEquals(cancellableHandlerFactory._processedMsgIds.size(), 3);
        AssertJUnit.assertTrue(((Message) arrayList.get(0)).getRecord().getSimpleField("Cancelcount").equals("2"));
        AssertJUnit.assertTrue(((Message) arrayList.get(1)).getRecord().getSimpleField("Cancelcount").equals("1"));
        AssertJUnit.assertEquals(cancellableHandlerFactory._timedOutMsgIds.size(), 2);
        AssertJUnit.assertTrue(helixTaskExecutor._taskMap.size() == 0);
        System.out.println("END " + TestHelper.getTestMethodName());
    }

    @Test
    public void testStateTransitionCancellationMsg() throws InterruptedException {
        System.out.println("START " + TestHelper.getTestMethodName());
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        TestStateTransitionHandlerFactory testStateTransitionHandlerFactory = new TestStateTransitionHandlerFactory(this, Message.MessageType.STATE_TRANSITION.name());
        TestStateTransitionHandlerFactory testStateTransitionHandlerFactory2 = new TestStateTransitionHandlerFactory(this, Message.MessageType.STATE_TRANSITION_CANCELLATION.name());
        helixTaskExecutor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), testStateTransitionHandlerFactory);
        helixTaskExecutor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION_CANCELLATION.name(), testStateTransitionHandlerFactory2);
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        ArrayList arrayList = new ArrayList();
        Message message = new Message(Message.MessageType.STATE_TRANSITION, UUID.randomUUID().toString());
        message.setTgtSessionId("*");
        message.setPartitionName("P1");
        message.setResourceName("R1");
        message.setTgtName("Localhost_1123");
        message.setSrcName("127.101.1.23_2234");
        message.setFromState("SLAVE");
        message.setToState("MASTER");
        arrayList.add(message);
        Message message2 = new Message(Message.MessageType.STATE_TRANSITION_CANCELLATION, UUID.randomUUID().toString());
        message2.setTgtSessionId("*");
        message2.setPartitionName("P1");
        message2.setResourceName("R1");
        message2.setTgtName("Localhost_1123");
        message2.setSrcName("127.101.1.23_2234");
        message2.setFromState("SLAVE");
        message2.setToState("MASTER");
        arrayList.add(message2);
        notificationContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
        helixTaskExecutor.onMessage("someInstance", arrayList, notificationContext);
        Thread.sleep(3000L);
        AssertJUnit.assertEquals(testStateTransitionHandlerFactory2._processedMsgIds.size(), 0);
        AssertJUnit.assertEquals(testStateTransitionHandlerFactory._processedMsgIds.size(), 0);
        System.out.println("END " + TestHelper.getTestMethodName());
    }

    @Test
    public void testMessageReadOptimization() throws InterruptedException {
        System.out.println("START " + TestHelper.getTestMethodName());
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        TestMessageHandlerFactory testMessageHandlerFactory = new TestMessageHandlerFactory();
        Iterator<String> it = testMessageHandlerFactory.getMessageTypes().iterator();
        while (it.hasNext()) {
            helixTaskExecutor.registerMessageHandlerFactory(it.next(), testMessageHandlerFactory);
        }
        HelixDataAccessor helixDataAccessor = mockClusterManager.getHelixDataAccessor();
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            Message message = new Message(testMessageHandlerFactory.getMessageTypes().get(0), UUID.randomUUID().toString());
            message.setTgtSessionId(mockClusterManager.getSessionId());
            message.setTgtName("Localhost_1123");
            message.setSrcName("127.101.1.23_2234");
            message.setCorrelationId(UUID.randomUUID().toString());
            helixDataAccessor.setProperty(keyBuilder.message("someInstance", message.getId()), message);
            arrayList.add(message.getId());
        }
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        notificationContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
        helixTaskExecutor._knownMessageIds.addAll(arrayList);
        helixTaskExecutor.onMessage("someInstance", Collections.EMPTY_LIST, notificationContext);
        Thread.sleep(3000L);
        AssertJUnit.assertEquals(0, testMessageHandlerFactory._processedMsgIds.size());
        helixTaskExecutor._knownMessageIds.clear();
        helixTaskExecutor.onMessage("someInstance", Collections.EMPTY_LIST, notificationContext);
        Thread.sleep(3000L);
        AssertJUnit.assertEquals(5, testMessageHandlerFactory._processedMsgIds.size());
        Assert.assertTrue(helixTaskExecutor._knownMessageIds.isEmpty());
        System.out.println("END " + TestHelper.getTestMethodName());
    }

    @Test
    public void testNoWriteReadStateForRemovedMessage() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        System.out.println("START " + TestHelper.getTestMethodName());
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        TestMessageHandlerFactory testMessageHandlerFactory = new TestMessageHandlerFactory();
        Iterator<String> it = testMessageHandlerFactory.getMessageTypes().iterator();
        while (it.hasNext()) {
            helixTaskExecutor.registerMessageHandlerFactory(it.next(), testMessageHandlerFactory);
        }
        HelixDataAccessor helixDataAccessor = mockClusterManager.getHelixDataAccessor();
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < 5; i++) {
            Message message = new Message(testMessageHandlerFactory.getMessageTypes().get(0), UUID.randomUUID().toString());
            message.setTgtSessionId(mockClusterManager.getSessionId());
            message.setTgtName("Localhost_1123");
            message.setSrcName("127.101.1.23_2234");
            message.setCorrelationId(UUID.randomUUID().toString());
            helixDataAccessor.setProperty(keyBuilder.message("someInstance", message.getId()), message);
            arrayList.add(message.getId());
            message.setMsgState(Message.MessageState.READ);
            arrayList2.add(message);
        }
        Method declaredMethod = HelixTaskExecutor.class.getDeclaredMethod("updateMessageState", Collection.class, HelixDataAccessor.class, String.class);
        declaredMethod.setAccessible(true);
        declaredMethod.invoke(helixTaskExecutor, arrayList2, helixDataAccessor, "someInstance");
        Assert.assertEquals(helixDataAccessor.getChildNames(keyBuilder.messages("someInstance")).size(), 5);
        helixDataAccessor.removeProperty(keyBuilder.message("someInstance", (String) arrayList.get(0)));
        System.out.println(helixDataAccessor.getChildNames(keyBuilder.messages("someInstance")).size());
        Iterator it2 = arrayList2.iterator();
        while (it2.hasNext()) {
            ((Message) it2.next()).setCorrelationId(UUID.randomUUID().toString());
        }
        declaredMethod.invoke(helixTaskExecutor, arrayList2, helixDataAccessor, "someInstance");
        Assert.assertEquals(helixDataAccessor.getChildNames(keyBuilder.messages("someInstance")).size(), 5 - 1);
        System.out.println("END " + TestHelper.getTestMethodName());
    }

    @Test(dependsOnMethods = {"testStateTransitionCancellationMsg"})
    public void testStateTransitionMsgScheduleFailure() {
        System.out.println("START " + TestHelper.getTestMethodName());
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor() { // from class: org.apache.helix.messaging.handling.TestHelixTaskExecutor.1
            public boolean scheduleTask(MessageTask messageTask) {
                return false;
            }
        };
        MockClusterManager mockClusterManager = new MockClusterManager();
        HelixDataAccessor helixDataAccessor = mockClusterManager.getHelixDataAccessor();
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        TestStateTransitionHandlerFactory testStateTransitionHandlerFactory = new TestStateTransitionHandlerFactory(this, Message.MessageType.STATE_TRANSITION.name());
        helixTaskExecutor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), testStateTransitionHandlerFactory);
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        Message message = new Message(Message.MessageType.STATE_TRANSITION, UUID.randomUUID().toString());
        message.setTgtSessionId(mockClusterManager.getSessionId());
        message.setPartitionName("P1");
        message.setResourceName("R1");
        message.setTgtName("Localhost_1123");
        message.setSrcName("127.101.1.23_2234");
        message.setFromState("SLAVE");
        message.setToState("MASTER");
        helixDataAccessor.setProperty(keyBuilder.message(mockClusterManager.getInstanceName(), message.getMsgId()), message);
        notificationContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
        helixTaskExecutor.onMessage(mockClusterManager.getInstanceName(), Collections.emptyList(), notificationContext);
        Assert.assertEquals(testStateTransitionHandlerFactory._processedMsgIds.size(), 0);
        Assert.assertNull(helixDataAccessor.getProperty(keyBuilder.message(mockClusterManager.getInstanceName(), message.getMsgId())));
        CurrentState property = helixDataAccessor.getProperty(keyBuilder.currentState(mockClusterManager.getInstanceName(), mockClusterManager.getSessionId(), message.getResourceName()));
        Assert.assertNotNull(property);
        Assert.assertEquals(property.getState(message.getPartitionName()), HelixDefinedState.ERROR.toString());
        System.out.println("END " + TestHelper.getTestMethodName());
    }

    @Test
    public void testUpdateAndFindMessageThreadpool() throws Exception {
        final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        final ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        final ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        System.out.println("START " + TestHelper.getTestMethodName());
        String uuid = UUID.randomUUID().toString();
        HelixManager helixManager = (HelixManager) Mockito.mock(ZKHelixManager.class);
        StateMachineEngine stateMachineEngine = (StateMachineEngine) Mockito.mock(HelixStateMachineEngine.class);
        Mockito.when(helixManager.getStateMachineEngine()).thenReturn(stateMachineEngine);
        Mockito.when(helixManager.getInstanceType()).thenReturn(InstanceType.PARTICIPANT);
        Mockito.when(helixManager.getHelixDataAccessor()).thenReturn(new MockAccessor());
        Mockito.when(helixManager.getSessionId()).thenReturn(uuid);
        Mockito.when(helixManager.getInstanceName()).thenReturn("TestInstance");
        Mockito.when(helixManager.getMessagingService()).thenReturn(new MockClusterMessagingService());
        Mockito.when(helixManager.getClusterName()).thenReturn(TestHelper.getTestMethodName());
        MockMasterSlaveStateModel mockMasterSlaveStateModel = new MockMasterSlaveStateModel();
        NotificationContext notificationContext = new NotificationContext(helixManager);
        Message message = new Message(Message.MessageType.STATE_TRANSITION, "testMsgId");
        message.setFromState("Offline");
        message.setToState("Online");
        message.setResourceName("testDB");
        message.setSTRebalanceType(Message.STRebalanceType.LOAD_REBALANCE);
        message.setStateModelDef(TestResourceThreadpoolSize.ONLINE_OFFLINE);
        message.setPartitionName("TestPartition");
        message.setTgtName("TgtInstance");
        message.setStateModelFactoryName("DEFAULT");
        message.setTgtSessionId(uuid);
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        StateModelFactory<OnlineOfflineStateModelFactory.OnlineOfflineStateModel> stateModelFactory = new StateModelFactory<OnlineOfflineStateModelFactory.OnlineOfflineStateModel>() { // from class: org.apache.helix.messaging.handling.TestHelixTaskExecutor.1MockStateModelFactory_STType
            public ExecutorService getExecutorService(String str, String str2, String str3) {
                return threadPoolExecutor2;
            }
        };
        ((StateMachineEngine) Mockito.doReturn(stateModelFactory).when(stateMachineEngine)).getStateModelFactory(TestResourceThreadpoolSize.ONLINE_OFFLINE, "DEFAULT");
        helixTaskExecutor.scheduleTask(new HelixTask(message, notificationContext, new HelixStateTransitionHandler(stateModelFactory, mockMasterSlaveStateModel, message, notificationContext, new CurrentState("testDB")), helixTaskExecutor));
        Assert.assertTrue(TestHelper.verify(() -> {
            return threadPoolExecutor2.getTaskCount() == 1;
        }, TestHelper.WAIT_DURATION));
        System.out.println(TestHelper.getTestMethodName() + ": State transition based test passed.");
        HelixTaskExecutor helixTaskExecutor2 = new HelixTaskExecutor();
        StateModelFactory<OnlineOfflineStateModelFactory.OnlineOfflineStateModel> stateModelFactory2 = new StateModelFactory<OnlineOfflineStateModelFactory.OnlineOfflineStateModel>() { // from class: org.apache.helix.messaging.handling.TestHelixTaskExecutor.1MockStateModelFactory_ResourceName
            public ExecutorService getExecutorService(String str) {
                return threadPoolExecutor;
            }
        };
        ((StateMachineEngine) Mockito.doReturn(stateModelFactory2).when(stateMachineEngine)).getStateModelFactory(TestResourceThreadpoolSize.ONLINE_OFFLINE, "DEFAULT");
        HelixStateTransitionHandler helixStateTransitionHandler = new HelixStateTransitionHandler(stateModelFactory2, mockMasterSlaveStateModel, message, notificationContext, new CurrentState("testDB"));
        stateMachineEngine.registerStateModelFactory(TestResourceThreadpoolSize.ONLINE_OFFLINE, stateModelFactory2);
        helixTaskExecutor2.scheduleTask(new HelixTask(message, notificationContext, helixStateTransitionHandler, helixTaskExecutor2));
        Assert.assertTrue(TestHelper.verify(() -> {
            return threadPoolExecutor.getTaskCount() == 1;
        }, TestHelper.WAIT_DURATION));
        System.out.println(TestHelper.getTestMethodName() + ": Resource name based test passed.");
        HelixTaskExecutor helixTaskExecutor3 = new HelixTaskExecutor();
        StateModelFactory<OnlineOfflineStateModelFactory.OnlineOfflineStateModel> stateModelFactory3 = new StateModelFactory<OnlineOfflineStateModelFactory.OnlineOfflineStateModel>() { // from class: org.apache.helix.messaging.handling.TestHelixTaskExecutor.1MockStateModelFactory_MsgInfo
            public StateModelFactory.CustomizedExecutorService getExecutorService(Message.MessageInfo messageInfo) {
                return new StateModelFactory.CustomizedExecutorService(Message.MessageInfo.MessageIdentifierBase.PER_REBALANCE_TYPE, threadPoolExecutor3);
            }
        };
        HelixStateTransitionHandler helixStateTransitionHandler2 = new HelixStateTransitionHandler(stateModelFactory3, mockMasterSlaveStateModel, message, notificationContext, new CurrentState("testDB"));
        ((StateMachineEngine) Mockito.doReturn(stateModelFactory3).when(stateMachineEngine)).getStateModelFactory(TestResourceThreadpoolSize.ONLINE_OFFLINE, "DEFAULT");
        helixTaskExecutor3.scheduleTask(new HelixTask(message, notificationContext, helixStateTransitionHandler2, helixTaskExecutor3));
        Assert.assertTrue(TestHelper.verify(() -> {
            return threadPoolExecutor3.getTaskCount() == 1;
        }, TestHelper.WAIT_DURATION));
        System.out.println(TestHelper.getTestMethodName() + ": Message Info based test passed.");
        System.out.println("END " + TestHelper.getTestMethodName());
    }

    @Test(dataProvider = "throwableClass")
    public void testHandleThrowableErrors(Class cls) throws InterruptedException {
        System.out.println("START " + TestHelper.getTestMethodName());
        HelixTaskExecutor helixTaskExecutor = (HelixTaskExecutor) Mockito.spy(new HelixTaskExecutor());
        ((HelixTaskExecutor) Mockito.doThrow(cls).when(helixTaskExecutor)).updateStateTransitionMessageThreadPool((Message) Mockito.any(), (HelixManager) Mockito.any());
        MockClusterManager mockClusterManager = new MockClusterManager();
        TestMessageHandlerFactory testMessageHandlerFactory = new TestMessageHandlerFactory();
        Iterator<String> it = testMessageHandlerFactory.getMessageTypes().iterator();
        while (it.hasNext()) {
            helixTaskExecutor.registerMessageHandlerFactory(it.next(), testMessageHandlerFactory);
        }
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            Message message = new Message(testMessageHandlerFactory.getMessageTypes().get(0), UUID.randomUUID().toString());
            message.setTgtSessionId("*");
            message.setTgtName("Localhost_1123");
            message.setSrcName("127.101.1.23_2234");
            message.setExecutionTimeout((i + 1) * 600);
            message.setRetryCount(1);
            arrayList.add(message);
        }
        notificationContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
        helixTaskExecutor.onMessage("someInstance", arrayList, notificationContext);
        Thread.sleep(1000L);
        AssertJUnit.assertTrue(helixTaskExecutor._taskMap.size() == 0);
        ((HelixTaskExecutor) Mockito.verify(helixTaskExecutor, Mockito.times(2))).updateStateTransitionMessageThreadPool((Message) Mockito.any(), (HelixManager) Mockito.any());
        System.out.println("END " + TestHelper.getTestMethodName());
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "throwableClass")
    public static Object[][] throwableClass() {
        return new Object[]{new Object[]{OutOfMemoryError.class}, new Object[]{IllegalStateException.class}};
    }
}
