package org.apache.helix.messaging.handling;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.apache.helix.HelixException;
import org.apache.helix.Mocks;
import org.apache.helix.NotificationContext;
import org.apache.helix.api.id.MessageId;
import org.apache.helix.api.id.SessionId;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.model.Message;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/messaging/handling/TestHelixTaskExecutor.class */
public class TestHelixTaskExecutor {

    /* loaded from: input_file:org/apache/helix/messaging/handling/TestHelixTaskExecutor$CancellableHandlerFactory.class */
    class CancellableHandlerFactory implements MessageHandlerFactory {
        int _handlersCreated = 0;
        ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<>();
        ConcurrentHashMap<String, String> _processingMsgIds = new ConcurrentHashMap<>();
        ConcurrentHashMap<String, String> _timedOutMsgIds = new ConcurrentHashMap<>();

        /* loaded from: input_file:org/apache/helix/messaging/handling/TestHelixTaskExecutor$CancellableHandlerFactory$CancellableHandler.class */
        class CancellableHandler extends MessageHandler {
            public boolean _interrupted;

            public CancellableHandler(Message message, NotificationContext notificationContext) {
                super(message, notificationContext);
                this._interrupted = false;
            }

            public HelixTaskResult handleMessage() throws InterruptedException {
                HelixTaskResult helixTaskResult = new HelixTaskResult();
                int i = this._message.getRecord().getSimpleFields().containsKey("Cancelcount") ? 10 : 15;
                CancellableHandlerFactory.this._processingMsgIds.put(this._message.getMessageId().stringify(), this._message.getMessageId().stringify());
                for (int i2 = 0; i2 < i; i2++) {
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                        this._interrupted = true;
                        CancellableHandlerFactory.this._timedOutMsgIds.put(this._message.getMessageId().stringify(), "");
                        helixTaskResult.setInterrupted(true);
                        if (this._message.getRecord().getSimpleFields().containsKey("Cancelcount")) {
                            this._message.getRecord().setSimpleField("Cancelcount", "" + (Integer.parseInt(this._message.getRecord().getSimpleField("Cancelcount")) + 1));
                        } else {
                            this._message.getRecord().setSimpleField("Cancelcount", "1");
                        }
                        throw e;
                    }
                }
                CancellableHandlerFactory.this._processedMsgIds.put(this._message.getMessageId().stringify(), this._message.getMessageId().stringify());
                helixTaskResult.setSuccess(true);
                return helixTaskResult;
            }

            public void onError(Exception exc, MessageHandler.ErrorCode errorCode, MessageHandler.ErrorType errorType) {
                this._message.getRecord().setSimpleField("exception", exc.getMessage());
            }
        }

        CancellableHandlerFactory() {
        }

        public MessageHandler createHandler(Message message, NotificationContext notificationContext) {
            this._handlersCreated++;
            return new CancellableHandler(message, notificationContext);
        }

        public String getMessageType() {
            return "Cancellable";
        }

        public void reset() {
            this._handlersCreated = 0;
            this._processedMsgIds.clear();
            this._processingMsgIds.clear();
            this._timedOutMsgIds.clear();
        }
    }

    /* loaded from: input_file:org/apache/helix/messaging/handling/TestHelixTaskExecutor$MockClusterManager.class */
    public static class MockClusterManager extends Mocks.MockManager {
        @Override // org.apache.helix.Mocks.MockManager
        public String getSessionId() {
            return "123";
        }
    }

    /* loaded from: input_file:org/apache/helix/messaging/handling/TestHelixTaskExecutor$TestMessageHandlerFactory.class */
    class TestMessageHandlerFactory implements MessageHandlerFactory {
        int _handlersCreated = 0;
        ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<>();

        /* loaded from: input_file:org/apache/helix/messaging/handling/TestHelixTaskExecutor$TestMessageHandlerFactory$TestMessageHandler.class */
        class TestMessageHandler extends MessageHandler {
            public TestMessageHandler(Message message, NotificationContext notificationContext) {
                super(message, notificationContext);
            }

            public HelixTaskResult handleMessage() throws InterruptedException {
                HelixTaskResult helixTaskResult = new HelixTaskResult();
                TestMessageHandlerFactory.this._processedMsgIds.put(this._message.getMessageId().stringify(), this._message.getMessageId().stringify());
                Thread.sleep(100L);
                helixTaskResult.setSuccess(true);
                return helixTaskResult;
            }

            public void onError(Exception exc, MessageHandler.ErrorCode errorCode, MessageHandler.ErrorType errorType) {
            }
        }

        TestMessageHandlerFactory() {
        }

        public MessageHandler createHandler(Message message, NotificationContext notificationContext) {
            if (message.getMsgSubType() != null && message.getMsgSubType().equals("EXCEPTION")) {
                throw new HelixException("Test Message handler exception, can ignore");
            }
            this._handlersCreated++;
            return new TestMessageHandler(message, notificationContext);
        }

        public String getMessageType() {
            return "TestingMessageHandler";
        }

        public void reset() {
        }
    }

    /* loaded from: input_file:org/apache/helix/messaging/handling/TestHelixTaskExecutor$TestMessageHandlerFactory2.class */
    class TestMessageHandlerFactory2 extends TestMessageHandlerFactory {
        TestMessageHandlerFactory2() {
            super();
        }

        @Override // org.apache.helix.messaging.handling.TestHelixTaskExecutor.TestMessageHandlerFactory
        public String getMessageType() {
            return "TestingMessageHandler2";
        }
    }

    @Test
    public void testNormalMsgExecution() throws InterruptedException {
        System.out.println("START TestCMTaskExecutor.testNormalMsgExecution()");
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        TestMessageHandlerFactory testMessageHandlerFactory = new TestMessageHandlerFactory();
        helixTaskExecutor.registerMessageHandlerFactory(testMessageHandlerFactory.getMessageType(), testMessageHandlerFactory);
        TestMessageHandlerFactory2 testMessageHandlerFactory2 = new TestMessageHandlerFactory2();
        helixTaskExecutor.registerMessageHandlerFactory(testMessageHandlerFactory2.getMessageType(), testMessageHandlerFactory2);
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        ArrayList<Message> arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            Message message = new Message(testMessageHandlerFactory.getMessageType(), MessageId.from(UUID.randomUUID().toString()));
            message.setTgtSessionId(SessionId.from(mockClusterManager.getSessionId()));
            message.setTgtName("Localhost_1123");
            message.setSrcName("127.101.1.23_2234");
            message.setCorrelationId(UUID.randomUUID().toString());
            arrayList.add(message);
        }
        for (int i2 = 0; i2 < 6; i2++) {
            Message message2 = new Message(testMessageHandlerFactory2.getMessageType(), MessageId.from(UUID.randomUUID().toString()));
            message2.setTgtSessionId(SessionId.from(mockClusterManager.getSessionId()));
            message2.setTgtName("Localhost_1123");
            message2.setSrcName("127.101.1.23_2234");
            message2.setCorrelationId(UUID.randomUUID().toString());
            arrayList.add(message2);
        }
        helixTaskExecutor.onMessage("someInstance", arrayList, notificationContext);
        Thread.sleep(1000L);
        AssertJUnit.assertTrue(testMessageHandlerFactory._processedMsgIds.size() == 5);
        AssertJUnit.assertTrue(testMessageHandlerFactory2._processedMsgIds.size() == 6);
        AssertJUnit.assertTrue(testMessageHandlerFactory._handlersCreated == 5);
        AssertJUnit.assertTrue(testMessageHandlerFactory2._handlersCreated == 6);
        for (Message message3 : arrayList) {
            AssertJUnit.assertTrue(testMessageHandlerFactory._processedMsgIds.containsKey(message3.getId()) || testMessageHandlerFactory2._processedMsgIds.containsKey(message3.getId()));
            AssertJUnit.assertFalse(testMessageHandlerFactory._processedMsgIds.containsKey(message3.getId()) && testMessageHandlerFactory2._processedMsgIds.containsKey(message3.getId()));
        }
        System.out.println("END TestCMTaskExecutor.testNormalMsgExecution()");
    }

    @Test
    public void testUnknownTypeMsgExecution() throws InterruptedException {
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        TestMessageHandlerFactory testMessageHandlerFactory = new TestMessageHandlerFactory();
        helixTaskExecutor.registerMessageHandlerFactory(testMessageHandlerFactory.getMessageType(), testMessageHandlerFactory);
        TestMessageHandlerFactory2 testMessageHandlerFactory2 = new TestMessageHandlerFactory2();
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        ArrayList<Message> arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            Message message = new Message(testMessageHandlerFactory.getMessageType(), MessageId.from(UUID.randomUUID().toString()));
            message.setTgtSessionId(SessionId.from(mockClusterManager.getSessionId()));
            message.setTgtName("Localhost_1123");
            message.setSrcName("127.101.1.23_2234");
            arrayList.add(message);
        }
        for (int i2 = 0; i2 < 4; i2++) {
            Message message2 = new Message(testMessageHandlerFactory2.getMessageType(), MessageId.from(UUID.randomUUID().toString()));
            message2.setTgtSessionId(SessionId.from(mockClusterManager.getSessionId()));
            message2.setTgtName("Localhost_1123");
            message2.setSrcName("127.101.1.23_2234");
            arrayList.add(message2);
        }
        helixTaskExecutor.onMessage("someInstance", arrayList, notificationContext);
        Thread.sleep(1000L);
        AssertJUnit.assertTrue(testMessageHandlerFactory._processedMsgIds.size() == 5);
        AssertJUnit.assertTrue(testMessageHandlerFactory2._processedMsgIds.size() == 0);
        AssertJUnit.assertTrue(testMessageHandlerFactory._handlersCreated == 5);
        AssertJUnit.assertTrue(testMessageHandlerFactory2._handlersCreated == 0);
        for (Message message3 : arrayList) {
            if (message3.getMsgType().equalsIgnoreCase(testMessageHandlerFactory.getMessageType())) {
                AssertJUnit.assertTrue(testMessageHandlerFactory._processedMsgIds.containsKey(message3.getId()));
            }
        }
    }

    @Test
    public void testMsgSessionId() throws InterruptedException {
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        TestMessageHandlerFactory testMessageHandlerFactory = new TestMessageHandlerFactory();
        helixTaskExecutor.registerMessageHandlerFactory(testMessageHandlerFactory.getMessageType(), testMessageHandlerFactory);
        TestMessageHandlerFactory2 testMessageHandlerFactory2 = new TestMessageHandlerFactory2();
        helixTaskExecutor.registerMessageHandlerFactory(testMessageHandlerFactory2.getMessageType(), testMessageHandlerFactory2);
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        ArrayList<Message> arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            Message message = new Message(testMessageHandlerFactory.getMessageType(), MessageId.from(UUID.randomUUID().toString()));
            message.setTgtSessionId(SessionId.from("*"));
            message.setTgtName("");
            arrayList.add(message);
        }
        for (int i2 = 0; i2 < 4; i2++) {
            Message message2 = new Message(testMessageHandlerFactory2.getMessageType(), MessageId.from(UUID.randomUUID().toString()));
            message2.setTgtSessionId(SessionId.from("some other session id"));
            message2.setTgtName("");
            arrayList.add(message2);
        }
        helixTaskExecutor.onMessage("someInstance", arrayList, notificationContext);
        Thread.sleep(1000L);
        AssertJUnit.assertTrue(testMessageHandlerFactory._processedMsgIds.size() == 5);
        AssertJUnit.assertTrue(testMessageHandlerFactory2._processedMsgIds.size() == 0);
        AssertJUnit.assertTrue(testMessageHandlerFactory._handlersCreated == 5);
        AssertJUnit.assertTrue(testMessageHandlerFactory2._handlersCreated == 0);
        for (Message message3 : arrayList) {
            if (message3.getMsgType().equalsIgnoreCase(testMessageHandlerFactory.getMessageType())) {
                AssertJUnit.assertTrue(testMessageHandlerFactory._processedMsgIds.containsKey(message3.getId()));
            }
        }
    }

    @Test
    public void testCreateHandlerException() throws InterruptedException {
        System.out.println("START TestCMTaskExecutor.testCreateHandlerException()");
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        TestMessageHandlerFactory testMessageHandlerFactory = new TestMessageHandlerFactory();
        helixTaskExecutor.registerMessageHandlerFactory(testMessageHandlerFactory.getMessageType(), testMessageHandlerFactory);
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            Message message = new Message(testMessageHandlerFactory.getMessageType(), MessageId.from(UUID.randomUUID().toString()));
            message.setTgtSessionId(SessionId.from(mockClusterManager.getSessionId()));
            message.setTgtName("Localhost_1123");
            message.setSrcName("127.101.1.23_2234");
            message.setCorrelationId(UUID.randomUUID().toString());
            arrayList.add(message);
        }
        Message message2 = new Message(testMessageHandlerFactory.getMessageType(), MessageId.from(UUID.randomUUID().toString()));
        message2.setTgtSessionId(SessionId.from(mockClusterManager.getSessionId()));
        message2.setMsgSubType("EXCEPTION");
        message2.setTgtName("Localhost_1123");
        message2.setSrcName("127.101.1.23_2234");
        message2.setCorrelationId(UUID.randomUUID().toString());
        arrayList.add(message2);
        helixTaskExecutor.onMessage("someInstance", arrayList, notificationContext);
        Thread.sleep(1000L);
        AssertJUnit.assertTrue(testMessageHandlerFactory._processedMsgIds.size() == 5);
        AssertJUnit.assertTrue(testMessageHandlerFactory._handlersCreated == 5);
        AssertJUnit.assertTrue(message2.getMsgState() == Message.MessageState.UNPROCESSABLE);
        System.out.println("END TestCMTaskExecutor.testCreateHandlerException()");
    }

    @Test
    public void testTaskCancellation() throws InterruptedException {
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        CancellableHandlerFactory cancellableHandlerFactory = new CancellableHandlerFactory();
        helixTaskExecutor.registerMessageHandlerFactory(cancellableHandlerFactory.getMessageType(), cancellableHandlerFactory);
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        ArrayList<Message> arrayList = new ArrayList();
        for (int i = 0; i < 0; i++) {
            Message message = new Message(cancellableHandlerFactory.getMessageType(), MessageId.from(UUID.randomUUID().toString()));
            message.setTgtSessionId(SessionId.from("*"));
            message.setTgtName("Localhost_1123");
            message.setSrcName("127.101.1.23_2234");
            arrayList.add(message);
        }
        ArrayList arrayList2 = new ArrayList();
        for (int i2 = 0; i2 < 4; i2++) {
            Message message2 = new Message(cancellableHandlerFactory.getMessageType(), MessageId.from(UUID.randomUUID().toString()));
            message2.setTgtSessionId(SessionId.from("*"));
            arrayList.add(message2);
            message2.setTgtName("Localhost_1123");
            message2.setSrcName("127.101.1.23_2234");
            arrayList2.add(message2);
        }
        helixTaskExecutor.onMessage("someInstance", arrayList, notificationContext);
        Thread.sleep(500L);
        for (int i3 = 0; i3 < 4; i3++) {
            helixTaskExecutor.cancelTask(new HelixTask((Message) arrayList2.get(i3), notificationContext, (MessageHandler) null, (HelixTaskExecutor) null));
        }
        Thread.sleep(1500L);
        AssertJUnit.assertTrue(cancellableHandlerFactory._processedMsgIds.size() == 0);
        AssertJUnit.assertTrue(cancellableHandlerFactory._handlersCreated == 0 + 4);
        AssertJUnit.assertTrue(cancellableHandlerFactory._processingMsgIds.size() == 0 + 4);
        for (Message message3 : arrayList) {
            if (message3.getMsgType().equalsIgnoreCase(cancellableHandlerFactory.getMessageType())) {
                AssertJUnit.assertTrue(cancellableHandlerFactory._processingMsgIds.containsKey(message3.getId()));
            }
        }
    }

    @Test
    public void testShutdown() throws InterruptedException {
        System.out.println("START TestCMTaskExecutor.testShutdown()");
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        TestMessageHandlerFactory testMessageHandlerFactory = new TestMessageHandlerFactory();
        helixTaskExecutor.registerMessageHandlerFactory(testMessageHandlerFactory.getMessageType(), testMessageHandlerFactory);
        TestMessageHandlerFactory2 testMessageHandlerFactory2 = new TestMessageHandlerFactory2();
        helixTaskExecutor.registerMessageHandlerFactory(testMessageHandlerFactory2.getMessageType(), testMessageHandlerFactory2);
        CancellableHandlerFactory cancellableHandlerFactory = new CancellableHandlerFactory();
        helixTaskExecutor.registerMessageHandlerFactory(cancellableHandlerFactory.getMessageType(), cancellableHandlerFactory);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            Message message = new Message(testMessageHandlerFactory.getMessageType(), MessageId.from(UUID.randomUUID().toString()));
            message.setTgtSessionId(SessionId.from("*"));
            message.setTgtName("Localhost_1123");
            message.setSrcName("127.101.1.23_2234");
            arrayList.add(message);
        }
        for (int i2 = 0; i2 < 10; i2++) {
            Message message2 = new Message(testMessageHandlerFactory2.getMessageType(), MessageId.from(UUID.randomUUID().toString()));
            message2.setTgtSessionId(SessionId.from("*"));
            arrayList.add(message2);
            message2.setTgtName("Localhost_1123");
            message2.setSrcName("127.101.1.23_2234");
            arrayList.add(message2);
        }
        for (int i3 = 0; i3 < 10; i3++) {
            Message message3 = new Message(cancellableHandlerFactory.getMessageType(), MessageId.from(UUID.randomUUID().toString()));
            message3.setTgtSessionId(SessionId.from("*"));
            arrayList.add(message3);
            message3.setTgtName("Localhost_1123");
            message3.setSrcName("127.101.1.23_2234");
            arrayList.add(message3);
        }
        helixTaskExecutor.onMessage("some", arrayList, new NotificationContext(mockClusterManager));
        Thread.sleep(500L);
        Iterator it = helixTaskExecutor._executorMap.values().iterator();
        while (it.hasNext()) {
            Assert.assertFalse(((ExecutorService) it.next()).isShutdown());
        }
        Assert.assertTrue(testMessageHandlerFactory._processedMsgIds.size() > 0);
        helixTaskExecutor.shutdown();
        Iterator it2 = helixTaskExecutor._executorMap.values().iterator();
        while (it2.hasNext()) {
            Assert.assertTrue(((ExecutorService) it2.next()).isShutdown());
        }
        System.out.println("END TestCMTaskExecutor.testShutdown()");
    }

    @Test
    public void testNoRetry() throws InterruptedException {
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        CancellableHandlerFactory cancellableHandlerFactory = new CancellableHandlerFactory();
        helixTaskExecutor.registerMessageHandlerFactory(cancellableHandlerFactory.getMessageType(), cancellableHandlerFactory);
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4; i++) {
            Message message = new Message(cancellableHandlerFactory.getMessageType(), MessageId.from(UUID.randomUUID().toString()));
            message.setTgtSessionId(SessionId.from("*"));
            message.setTgtName("Localhost_1123");
            message.setSrcName("127.101.1.23_2234");
            message.setExecutionTimeout((i + 1) * 600);
            arrayList.add(message);
        }
        helixTaskExecutor.onMessage("someInstance", arrayList, notificationContext);
        Thread.sleep(4000L);
        AssertJUnit.assertTrue(cancellableHandlerFactory._handlersCreated == 4);
        AssertJUnit.assertEquals(cancellableHandlerFactory._timedOutMsgIds.size(), 2);
        for (int i2 = 0; i2 < 4 - 2; i2++) {
            if (((Message) arrayList.get(i2)).getMsgType().equalsIgnoreCase(cancellableHandlerFactory.getMessageType())) {
                AssertJUnit.assertTrue(((Message) arrayList.get(i2)).getRecord().getSimpleFields().containsKey("Cancelcount"));
                AssertJUnit.assertTrue(cancellableHandlerFactory._timedOutMsgIds.containsKey(((Message) arrayList.get(i2)).getId()));
            }
        }
    }

    @Test
    public void testRetryOnce() throws InterruptedException {
        HelixTaskExecutor helixTaskExecutor = new HelixTaskExecutor();
        MockClusterManager mockClusterManager = new MockClusterManager();
        CancellableHandlerFactory cancellableHandlerFactory = new CancellableHandlerFactory();
        helixTaskExecutor.registerMessageHandlerFactory(cancellableHandlerFactory.getMessageType(), cancellableHandlerFactory);
        NotificationContext notificationContext = new NotificationContext(mockClusterManager);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4; i++) {
            Message message = new Message(cancellableHandlerFactory.getMessageType(), MessageId.from(UUID.randomUUID().toString()));
            message.setTgtSessionId(SessionId.from("*"));
            message.setTgtName("Localhost_1123");
            message.setSrcName("127.101.1.23_2234");
            message.setExecutionTimeout((i + 1) * 600);
            message.setRetryCount(1);
            arrayList.add(message);
        }
        helixTaskExecutor.onMessage("someInstance", arrayList, notificationContext);
        Thread.sleep(3500L);
        AssertJUnit.assertEquals(cancellableHandlerFactory._processedMsgIds.size(), 3);
        AssertJUnit.assertTrue(((Message) arrayList.get(0)).getRecord().getSimpleField("Cancelcount").equals("2"));
        AssertJUnit.assertTrue(((Message) arrayList.get(1)).getRecord().getSimpleField("Cancelcount").equals("1"));
        AssertJUnit.assertEquals(cancellableHandlerFactory._timedOutMsgIds.size(), 2);
        AssertJUnit.assertTrue(helixTaskExecutor._taskMap.size() == 0);
    }
}
