package org.apache.helix.integration;

import java.util.HashSet;
import java.util.UUID;
import org.apache.helix.Criteria;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/integration/TestMessagingService.class */
public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServerCheck {

    /* loaded from: input_file:org/apache/helix/integration/TestMessagingService$MockAsyncCallback.class */
    public static class MockAsyncCallback extends AsyncCallback {
        public void onTimeOut() {
        }

        public void onReplyMessage(Message message) {
        }
    }

    /* loaded from: input_file:org/apache/helix/integration/TestMessagingService$TestAsyncCallback.class */
    public static class TestAsyncCallback extends AsyncCallback {
        static HashSet<String> _replyedMessageContents = new HashSet<>();
        public boolean timeout;

        public TestAsyncCallback(long j) {
            super(j);
            this.timeout = false;
        }

        public void onTimeOut() {
            this.timeout = true;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void onReplyMessage(Message message) {
            System.out.println("OnreplyMessage: " + ((String) message.getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")));
            if (message.getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage") == null) {
                int i = 0 + 1;
            }
            _replyedMessageContents.add(message.getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage"));
        }
    }

    /* loaded from: input_file:org/apache/helix/integration/TestMessagingService$TestMessagingHandlerFactory.class */
    public static class TestMessagingHandlerFactory implements MessageHandlerFactory {
        public static HashSet<String> _processedMsgIds = new HashSet<>();

        /* loaded from: input_file:org/apache/helix/integration/TestMessagingService$TestMessagingHandlerFactory$TestMessagingHandler.class */
        public static class TestMessagingHandler extends MessageHandler {
            public TestMessagingHandler(Message message, NotificationContext notificationContext) {
                super(message, notificationContext);
            }

            public HelixTaskResult handleMessage() throws InterruptedException {
                HelixTaskResult helixTaskResult = new HelixTaskResult();
                helixTaskResult.setSuccess(true);
                Thread.sleep(1000L);
                System.out.println("TestMessagingHandler " + this._message.getMsgId());
                TestMessagingHandlerFactory._processedMsgIds.add(this._message.getRecord().getSimpleField("TestMessagingPara"));
                helixTaskResult.getTaskResultMap().put("ReplyMessage", "TestReplyMessage");
                return helixTaskResult;
            }

            public void onError(Exception exc, MessageHandler.ErrorCode errorCode, MessageHandler.ErrorType errorType) {
            }
        }

        public MessageHandler createHandler(Message message, NotificationContext notificationContext) {
            return new TestMessagingHandler(message, notificationContext);
        }

        public String getMessageType() {
            return "TestExtensibility";
        }

        public void reset() {
        }
    }

    @Test
    public void TestMessageSimpleSend() throws Exception {
        TestMessagingHandlerFactory testMessagingHandlerFactory = new TestMessagingHandlerFactory();
        this._participants[1].getMessagingService().registerMessageHandlerFactory(testMessagingHandlerFactory.getMessageType(), testMessagingHandlerFactory);
        String uuid = new UUID(123L, 456L).toString();
        Message message = new Message(testMessagingHandlerFactory.getMessageType(), uuid);
        message.setMsgId(uuid);
        message.setSrcName("localhost_12918");
        message.setTgtSessionId("*");
        message.setMsgState(Message.MessageState.NEW);
        message.getRecord().setSimpleField("TestMessagingPara", "Testing messaging para");
        Criteria criteria = new Criteria();
        criteria.setInstanceName("localhost_12919");
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setSessionSpecific(false);
        AssertJUnit.assertTrue(this._participants[0].getMessagingService().send(criteria, message) == 1);
        Thread.sleep(2500L);
        AssertJUnit.assertTrue(TestMessagingHandlerFactory._processedMsgIds.contains("Testing messaging para"));
        Criteria criteria2 = new Criteria();
        criteria2.setInstanceName("localhost_12919");
        criteria2.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria2.setSessionSpecific(false);
        criteria2.setDataSource(Criteria.DataSource.IDEALSTATES);
        AssertJUnit.assertTrue(this._participants[0].getMessagingService().send(criteria2, message) == 1);
        Thread.sleep(2500L);
        AssertJUnit.assertTrue(TestMessagingHandlerFactory._processedMsgIds.contains("Testing messaging para"));
    }

    @Test
    public void TestMessageSimpleSendReceiveAsync() throws Exception {
        TestMessagingHandlerFactory testMessagingHandlerFactory = new TestMessagingHandlerFactory();
        this._participants[1].getMessagingService().registerMessageHandlerFactory(testMessagingHandlerFactory.getMessageType(), testMessagingHandlerFactory);
        this._participants[0].getMessagingService().registerMessageHandlerFactory(testMessagingHandlerFactory.getMessageType(), testMessagingHandlerFactory);
        String uuid = new UUID(123L, 456L).toString();
        Message message = new Message(testMessagingHandlerFactory.getMessageType(), uuid);
        message.setMsgId(uuid);
        message.setSrcName("localhost_12918");
        message.setTgtSessionId("*");
        message.setMsgState(Message.MessageState.NEW);
        message.getRecord().setSimpleField("TestMessagingPara", "Testing messaging para");
        Criteria criteria = new Criteria();
        criteria.setInstanceName("localhost_12919");
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setSessionSpecific(false);
        TestAsyncCallback testAsyncCallback = new TestAsyncCallback(60000L);
        this._participants[0].getMessagingService().send(criteria, message, testAsyncCallback, 60000);
        Thread.sleep(2000L);
        AssertJUnit.assertTrue(TestAsyncCallback._replyedMessageContents.contains("TestReplyMessage"));
        AssertJUnit.assertTrue(testAsyncCallback.getMessageReplied().size() == 1);
        TestAsyncCallback testAsyncCallback2 = new TestAsyncCallback(500L);
        this._participants[0].getMessagingService().send(criteria, message, testAsyncCallback2, 500);
        Thread.sleep(3000L);
        AssertJUnit.assertTrue(testAsyncCallback2.isTimedOut());
        Criteria criteria2 = new Criteria();
        criteria2.setInstanceName("localhost_12919");
        criteria2.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria2.setSessionSpecific(false);
        criteria2.setDataSource(Criteria.DataSource.IDEALSTATES);
        TestAsyncCallback testAsyncCallback3 = new TestAsyncCallback(60000L);
        this._participants[0].getMessagingService().send(criteria2, message, testAsyncCallback3, 60000);
        Thread.sleep(2000L);
        AssertJUnit.assertTrue(TestAsyncCallback._replyedMessageContents.contains("TestReplyMessage"));
        AssertJUnit.assertTrue(testAsyncCallback3.getMessageReplied().size() == 1);
        TestAsyncCallback testAsyncCallback4 = new TestAsyncCallback(500L);
        this._participants[0].getMessagingService().send(criteria2, message, testAsyncCallback4, 500);
        Thread.sleep(3000L);
        AssertJUnit.assertTrue(testAsyncCallback4.isTimedOut());
    }

    @Test
    public void TestBlockingSendReceive() throws Exception {
        TestMessagingHandlerFactory testMessagingHandlerFactory = new TestMessagingHandlerFactory();
        this._participants[1].getMessagingService().registerMessageHandlerFactory(testMessagingHandlerFactory.getMessageType(), testMessagingHandlerFactory);
        String uuid = new UUID(123L, 456L).toString();
        Message message = new Message(testMessagingHandlerFactory.getMessageType(), uuid);
        message.setMsgId(uuid);
        message.setSrcName("localhost_12918");
        message.setTgtSessionId("*");
        message.setMsgState(Message.MessageState.NEW);
        message.getRecord().setSimpleField("TestMessagingPara", "Testing messaging para");
        Criteria criteria = new Criteria();
        criteria.setInstanceName("localhost_12919");
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setSessionSpecific(false);
        MockAsyncCallback mockAsyncCallback = new MockAsyncCallback();
        this._participants[0].getMessagingService().sendAndWait(criteria, message, mockAsyncCallback, 60000);
        AssertJUnit.assertTrue(((String) ((Message) mockAsyncCallback.getMessageReplied().get(0)).getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")).equals("TestReplyMessage"));
        AssertJUnit.assertTrue(mockAsyncCallback.getMessageReplied().size() == 1);
        MockAsyncCallback mockAsyncCallback2 = new MockAsyncCallback();
        this._participants[0].getMessagingService().sendAndWait(criteria, message, mockAsyncCallback2, 500);
        AssertJUnit.assertTrue(mockAsyncCallback2.isTimedOut());
    }

    @Test
    public void TestMultiMessageCriteria() throws Exception {
        for (int i = 0; i < 5; i++) {
            TestMessagingHandlerFactory testMessagingHandlerFactory = new TestMessagingHandlerFactory();
            String str = "localhost_" + (12918 + i);
            this._participants[0].getMessagingService().registerMessageHandlerFactory(testMessagingHandlerFactory.getMessageType(), testMessagingHandlerFactory);
        }
        String uuid = new UUID(123L, 456L).toString();
        Message message = new Message(new TestMessagingHandlerFactory().getMessageType(), uuid);
        message.setMsgId(uuid);
        message.setSrcName("localhost_12918");
        message.setTgtSessionId("*");
        message.setMsgState(Message.MessageState.NEW);
        message.getRecord().setSimpleField("TestMessagingPara", "Testing messaging para");
        Criteria criteria = new Criteria();
        criteria.setInstanceName("%");
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setSessionSpecific(false);
        MockAsyncCallback mockAsyncCallback = new MockAsyncCallback();
        this._participants[0].getMessagingService().sendAndWait(criteria, message, mockAsyncCallback, 10000);
        AssertJUnit.assertTrue(((String) ((Message) mockAsyncCallback.getMessageReplied().get(0)).getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")).equals("TestReplyMessage"));
        AssertJUnit.assertTrue(mockAsyncCallback.getMessageReplied().size() == 4);
        MockAsyncCallback mockAsyncCallback2 = new MockAsyncCallback();
        this._participants[0].getMessagingService().sendAndWait(criteria, message, mockAsyncCallback2, 500);
        AssertJUnit.assertTrue(mockAsyncCallback2.isTimedOut());
        criteria.setPartition("TestDB_17");
        MockAsyncCallback mockAsyncCallback3 = new MockAsyncCallback();
        this._participants[0].getMessagingService().sendAndWait(criteria, message, mockAsyncCallback3, 10000);
        AssertJUnit.assertTrue(mockAsyncCallback3.getMessageReplied().size() == this._replica - 1);
        criteria.setPartition("TestDB_15");
        MockAsyncCallback mockAsyncCallback4 = new MockAsyncCallback();
        this._participants[0].getMessagingService().sendAndWait(criteria, message, mockAsyncCallback4, 10000);
        AssertJUnit.assertTrue(mockAsyncCallback4.getMessageReplied().size() == this._replica);
        criteria.setPartitionState("SLAVE");
        MockAsyncCallback mockAsyncCallback5 = new MockAsyncCallback();
        this._participants[0].getMessagingService().sendAndWait(criteria, message, mockAsyncCallback5, 10000);
        AssertJUnit.assertTrue(mockAsyncCallback5.getMessageReplied().size() == this._replica - 1);
        criteria.setDataSource(Criteria.DataSource.IDEALSTATES);
        MockAsyncCallback mockAsyncCallback6 = new MockAsyncCallback();
        this._participants[0].getMessagingService().sendAndWait(criteria, message, mockAsyncCallback6, 10000);
        AssertJUnit.assertTrue(mockAsyncCallback6.getMessageReplied().size() == this._replica - 1);
    }

    @Test
    public void sendSelfMsg() {
        for (int i = 0; i < 5; i++) {
            TestMessagingHandlerFactory testMessagingHandlerFactory = new TestMessagingHandlerFactory();
            String str = "localhost_" + (12918 + i);
            this._participants[i].getMessagingService().registerMessageHandlerFactory(testMessagingHandlerFactory.getMessageType(), testMessagingHandlerFactory);
        }
        String uuid = new UUID(123L, 456L).toString();
        Message message = new Message(new TestMessagingHandlerFactory().getMessageType(), uuid);
        message.setMsgId(uuid);
        message.setSrcName("localhost_12918");
        message.setTgtSessionId("*");
        message.setMsgState(Message.MessageState.NEW);
        message.getRecord().setSimpleField("TestMessagingPara", "Testing messaging para");
        Criteria criteria = new Criteria();
        criteria.setInstanceName("%");
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setSessionSpecific(false);
        criteria.setSelfExcluded(false);
        MockAsyncCallback mockAsyncCallback = new MockAsyncCallback();
        this._participants[0].getMessagingService().sendAndWait(criteria, message, mockAsyncCallback, 10000);
        AssertJUnit.assertTrue(mockAsyncCallback.getMessageReplied().size() == 5);
        AssertJUnit.assertTrue(((String) ((Message) mockAsyncCallback.getMessageReplied().get(0)).getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")).equals("TestReplyMessage"));
    }

    @Test
    public void TestControllerMessage() throws Exception {
        for (int i = 0; i < 5; i++) {
            TestMessagingHandlerFactory testMessagingHandlerFactory = new TestMessagingHandlerFactory();
            String str = "localhost_" + (12918 + i);
            this._participants[i].getMessagingService().registerMessageHandlerFactory(testMessagingHandlerFactory.getMessageType(), testMessagingHandlerFactory);
        }
        String uuid = new UUID(123L, 456L).toString();
        Message message = new Message(Message.MessageType.CONTROLLER_MSG, uuid);
        message.setMsgId(uuid);
        message.setSrcName("localhost_12918");
        message.setTgtSessionId("*");
        message.setMsgState(Message.MessageState.NEW);
        message.getRecord().setSimpleField("TestMessagingPara", "Testing messaging para");
        Criteria criteria = new Criteria();
        criteria.setInstanceName("*");
        criteria.setRecipientInstanceType(InstanceType.CONTROLLER);
        criteria.setSessionSpecific(false);
        MockAsyncCallback mockAsyncCallback = new MockAsyncCallback();
        this._participants[0].getMessagingService().sendAndWait(criteria, message, mockAsyncCallback, 10000);
        AssertJUnit.assertTrue(((String) ((Message) mockAsyncCallback.getMessageReplied().get(0)).getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")).indexOf("localhost_12918") != -1);
        AssertJUnit.assertTrue(mockAsyncCallback.getMessageReplied().size() == 1);
        message.setMsgId(UUID.randomUUID().toString());
        criteria.setPartition("TestDB_17");
        MockAsyncCallback mockAsyncCallback2 = new MockAsyncCallback();
        this._participants[0].getMessagingService().sendAndWait(criteria, message, mockAsyncCallback2, 10000);
        AssertJUnit.assertTrue(((String) ((Message) mockAsyncCallback2.getMessageReplied().get(0)).getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")).indexOf("localhost_12918") != -1);
        AssertJUnit.assertTrue(mockAsyncCallback2.getMessageReplied().size() == 1);
        message.setMsgId(UUID.randomUUID().toString());
        criteria.setPartitionState("SLAVE");
        MockAsyncCallback mockAsyncCallback3 = new MockAsyncCallback();
        this._participants[0].getMessagingService().sendAndWait(criteria, message, mockAsyncCallback3, 10000);
        AssertJUnit.assertTrue(((String) ((Message) mockAsyncCallback3.getMessageReplied().get(0)).getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")).indexOf("localhost_12918") != -1);
        AssertJUnit.assertTrue(mockAsyncCallback3.getMessageReplied().size() == 1);
    }
}
