package org.apache.helix.integration.messaging;

import java.util.UUID;
import org.apache.helix.Criteria;
import org.apache.helix.InstanceType;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.messaging.TestMessagingService;
import org.apache.helix.integration.rebalancer.TestInstanceOperation;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/integration/messaging/TestCrossClusterMessagingService.class */
public class TestCrossClusterMessagingService extends TestMessagingService {
    private final String ADMIN_CLUSTER_NAME = "ADMIN_" + this.CLUSTER_NAME;
    private ClusterControllerManager _adminController;
    private String _hostSrc;

    @Override // org.apache.helix.integration.common.ZkStandAloneCMTestBase, org.apache.helix.common.ZkTestBase
    @BeforeClass
    public void beforeClass() throws Exception {
        super.beforeClass();
        _gSetupTool.addCluster(this.ADMIN_CLUSTER_NAME, true);
        this._hostSrc = "controller_1";
        this._adminController = new ClusterControllerManager(ZkTestBase.ZK_ADDR, this.ADMIN_CLUSTER_NAME, "controller_1");
        this._adminController.syncStart();
        Assert.assertTrue(new BestPossibleExternalViewVerifier.Builder(this.ADMIN_CLUSTER_NAME).setZkClient(_gZkClient).setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build().verifyByPolling());
    }

    @Override // org.apache.helix.integration.common.ZkStandAloneCMTestBase
    @AfterClass
    public void afterClass() throws Exception {
        if (this._adminController != null && this._adminController.isConnected()) {
            this._adminController.syncStop();
        }
        deleteCluster(this.ADMIN_CLUSTER_NAME);
        super.afterClass();
    }

    @Override // org.apache.helix.integration.messaging.TestMessagingService
    @Test
    public void TestMessageSimpleSend() throws Exception {
        TestMessagingService.TestMessagingHandlerFactory testMessagingHandlerFactory = new TestMessagingService.TestMessagingHandlerFactory();
        this._participants[1].getMessagingService().registerMessageHandlerFactory(testMessagingHandlerFactory.getMessageTypes(), testMessagingHandlerFactory);
        String uuid = new UUID(123L, 456L).toString();
        Message message = new Message(testMessagingHandlerFactory.getMessageTypes().get(0), uuid);
        message.setMsgId(uuid);
        message.setSrcName(this._hostSrc);
        message.setTgtSessionId("*");
        message.setMsgState(Message.MessageState.NEW);
        message.getRecord().setSimpleField("TestMessagingPara", "Testing messaging para");
        Criteria criteria = new Criteria();
        criteria.setInstanceName("localhost_12919");
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setSessionSpecific(false);
        criteria.setClusterName(this.CLUSTER_NAME);
        AssertJUnit.assertEquals(this._adminController.getMessagingService().send(criteria, message), 1);
        Thread.sleep(2500L);
        AssertJUnit.assertTrue(TestMessagingService.TestMessagingHandlerFactory._processedMsgIds.contains("Testing messaging para"));
        Criteria criteria2 = new Criteria();
        criteria2.setInstanceName("localhost_12919");
        criteria2.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria2.setSessionSpecific(false);
        criteria2.setDataSource(Criteria.DataSource.IDEALSTATES);
        criteria2.setClusterName(this.CLUSTER_NAME);
        AssertJUnit.assertEquals(this._adminController.getMessagingService().send(criteria2, message), 1);
        Thread.sleep(2500L);
        AssertJUnit.assertTrue(TestMessagingService.TestMessagingHandlerFactory._processedMsgIds.contains("Testing messaging para"));
    }

    @Override // org.apache.helix.integration.messaging.TestMessagingService
    @Test
    public void TestMessageSimpleSendReceiveAsync() throws Exception {
        TestMessagingService.TestMessagingHandlerFactory testMessagingHandlerFactory = new TestMessagingService.TestMessagingHandlerFactory();
        this._participants[1].getMessagingService().registerMessageHandlerFactory(testMessagingHandlerFactory.getMessageTypes(), testMessagingHandlerFactory);
        this._participants[0].getMessagingService().registerMessageHandlerFactory(testMessagingHandlerFactory.getMessageTypes(), testMessagingHandlerFactory);
        String uuid = new UUID(123L, 456L).toString();
        Message message = new Message(testMessagingHandlerFactory.getMessageTypes().get(0), uuid);
        message.setMsgId(uuid);
        message.setSrcName(this._hostSrc);
        message.setTgtSessionId("*");
        message.setMsgState(Message.MessageState.NEW);
        message.getRecord().setSimpleField("TestMessagingPara", "Testing messaging para");
        Criteria criteria = new Criteria();
        criteria.setInstanceName("localhost_12919");
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setSessionSpecific(false);
        criteria.setClusterName(this.CLUSTER_NAME);
        TestMessagingService.TestAsyncCallback testAsyncCallback = new TestMessagingService.TestAsyncCallback(TestHelper.WAIT_DURATION);
        this._adminController.getMessagingService().send(criteria, message, testAsyncCallback, 60000);
        Thread.sleep(2000L);
        AssertJUnit.assertTrue(TestMessagingService.TestAsyncCallback._replyedMessageContents.contains("TestReplyMessage"));
        AssertJUnit.assertEquals(testAsyncCallback.getMessageReplied().size(), 1);
        TestMessagingService.TestAsyncCallback testAsyncCallback2 = new TestMessagingService.TestAsyncCallback(500L);
        this._adminController.getMessagingService().send(criteria, message, testAsyncCallback2, 500);
        Thread.sleep(3000L);
        AssertJUnit.assertTrue(testAsyncCallback2.isTimedOut());
        Criteria criteria2 = new Criteria();
        criteria2.setInstanceName("localhost_12919");
        criteria2.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria2.setSessionSpecific(false);
        criteria2.setDataSource(Criteria.DataSource.IDEALSTATES);
        criteria2.setClusterName(this.CLUSTER_NAME);
        TestMessagingService.TestAsyncCallback testAsyncCallback3 = new TestMessagingService.TestAsyncCallback(TestHelper.WAIT_DURATION);
        this._adminController.getMessagingService().send(criteria2, message, testAsyncCallback3, 60000);
        Thread.sleep(2000L);
        AssertJUnit.assertTrue(TestMessagingService.TestAsyncCallback._replyedMessageContents.contains("TestReplyMessage"));
        AssertJUnit.assertEquals(testAsyncCallback3.getMessageReplied().size(), 1);
        TestMessagingService.TestAsyncCallback testAsyncCallback4 = new TestMessagingService.TestAsyncCallback(500L);
        this._adminController.getMessagingService().send(criteria2, message, testAsyncCallback4, 500);
        Thread.sleep(3000L);
        AssertJUnit.assertTrue(testAsyncCallback4.isTimedOut());
    }

    @Override // org.apache.helix.integration.messaging.TestMessagingService
    @Test
    public void TestBlockingSendReceive() {
        TestMessagingService.TestMessagingHandlerFactory testMessagingHandlerFactory = new TestMessagingService.TestMessagingHandlerFactory();
        this._participants[1].getMessagingService().registerMessageHandlerFactory(testMessagingHandlerFactory.getMessageTypes(), testMessagingHandlerFactory);
        String uuid = new UUID(123L, 456L).toString();
        Message message = new Message(testMessagingHandlerFactory.getMessageTypes().get(0), uuid);
        message.setMsgId(uuid);
        message.setSrcName(this._hostSrc);
        message.setTgtSessionId("*");
        message.setMsgState(Message.MessageState.NEW);
        message.getRecord().setSimpleField("TestMessagingPara", "Testing messaging para");
        Criteria criteria = new Criteria();
        criteria.setInstanceName("localhost_12919");
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setSessionSpecific(false);
        criteria.setClusterName(this.CLUSTER_NAME);
        TestMessagingService.MockAsyncCallback mockAsyncCallback = new TestMessagingService.MockAsyncCallback();
        this._adminController.getMessagingService().sendAndWait(criteria, message, mockAsyncCallback, 60000);
        AssertJUnit.assertEquals((String) ((Message) mockAsyncCallback.getMessageReplied().get(0)).getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage"), "TestReplyMessage");
        AssertJUnit.assertEquals(mockAsyncCallback.getMessageReplied().size(), 1);
        TestMessagingService.MockAsyncCallback mockAsyncCallback2 = new TestMessagingService.MockAsyncCallback();
        this._adminController.getMessagingService().sendAndWait(criteria, message, mockAsyncCallback2, 500);
        AssertJUnit.assertTrue(mockAsyncCallback2.isTimedOut());
    }

    @Override // org.apache.helix.integration.messaging.TestMessagingService
    @Test
    public void TestMultiMessageCriteria() {
        for (int i = 0; i < 5; i++) {
            TestMessagingService.TestMessagingHandlerFactory testMessagingHandlerFactory = new TestMessagingService.TestMessagingHandlerFactory();
            this._participants[i].getMessagingService().registerMessageHandlerFactory(testMessagingHandlerFactory.getMessageTypes(), testMessagingHandlerFactory);
        }
        String uuid = new UUID(123L, 456L).toString();
        Message message = new Message(new TestMessagingService.TestMessagingHandlerFactory().getMessageTypes().get(0), uuid);
        message.setMsgId(uuid);
        message.setSrcName(this._hostSrc);
        message.setTgtSessionId("*");
        message.setMsgState(Message.MessageState.NEW);
        message.getRecord().setSimpleField("TestMessagingPara", "Testing messaging para");
        Criteria criteria = new Criteria();
        criteria.setInstanceName("%");
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setSessionSpecific(false);
        criteria.setClusterName(this.CLUSTER_NAME);
        TestMessagingService.MockAsyncCallback mockAsyncCallback = new TestMessagingService.MockAsyncCallback();
        this._adminController.getMessagingService().sendAndWait(criteria, message, mockAsyncCallback, TestInstanceOperation.TIMEOUT);
        AssertJUnit.assertEquals((String) ((Message) mockAsyncCallback.getMessageReplied().get(0)).getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage"), "TestReplyMessage");
        AssertJUnit.assertEquals(5, mockAsyncCallback.getMessageReplied().size());
        TestMessagingService.MockAsyncCallback mockAsyncCallback2 = new TestMessagingService.MockAsyncCallback();
        this._adminController.getMessagingService().sendAndWait(criteria, message, mockAsyncCallback2, 500);
        AssertJUnit.assertTrue(mockAsyncCallback2.isTimedOut());
        criteria.setPartition("TestDB_17");
        TestMessagingService.MockAsyncCallback mockAsyncCallback3 = new TestMessagingService.MockAsyncCallback();
        this._adminController.getMessagingService().sendAndWait(criteria, message, mockAsyncCallback3, TestInstanceOperation.TIMEOUT);
        AssertJUnit.assertEquals(this._replica, mockAsyncCallback3.getMessageReplied().size());
        criteria.setPartition("TestDB_15");
        TestMessagingService.MockAsyncCallback mockAsyncCallback4 = new TestMessagingService.MockAsyncCallback();
        this._adminController.getMessagingService().sendAndWait(criteria, message, mockAsyncCallback4, TestInstanceOperation.TIMEOUT);
        AssertJUnit.assertEquals(this._replica, mockAsyncCallback4.getMessageReplied().size());
        criteria.setPartitionState("SLAVE");
        TestMessagingService.MockAsyncCallback mockAsyncCallback5 = new TestMessagingService.MockAsyncCallback();
        this._adminController.getMessagingService().sendAndWait(criteria, message, mockAsyncCallback5, TestInstanceOperation.TIMEOUT);
        AssertJUnit.assertEquals(this._replica - 1, mockAsyncCallback5.getMessageReplied().size());
        criteria.setDataSource(Criteria.DataSource.IDEALSTATES);
        TestMessagingService.MockAsyncCallback mockAsyncCallback6 = new TestMessagingService.MockAsyncCallback();
        this._adminController.getMessagingService().sendAndWait(criteria, message, mockAsyncCallback6, TestInstanceOperation.TIMEOUT);
        AssertJUnit.assertEquals(this._replica - 1, mockAsyncCallback6.getMessageReplied().size());
    }

    @Override // org.apache.helix.integration.messaging.TestMessagingService
    @Test
    public void TestControllerMessage() {
        for (int i = 0; i < 5; i++) {
            TestMessagingService.TestMessagingHandlerFactory testMessagingHandlerFactory = new TestMessagingService.TestMessagingHandlerFactory();
            this._participants[i].getMessagingService().registerMessageHandlerFactory(testMessagingHandlerFactory.getMessageTypes(), testMessagingHandlerFactory);
        }
        String uuid = new UUID(123L, 456L).toString();
        Message message = new Message(Message.MessageType.CONTROLLER_MSG, uuid);
        message.setMsgId(uuid);
        message.setSrcName(this._hostSrc);
        message.setTgtSessionId("*");
        message.setMsgState(Message.MessageState.NEW);
        message.getRecord().setSimpleField("TestMessagingPara", "Testing messaging para");
        Criteria criteria = new Criteria();
        criteria.setInstanceName("*");
        criteria.setRecipientInstanceType(InstanceType.CONTROLLER);
        criteria.setSessionSpecific(false);
        criteria.setClusterName(this.CLUSTER_NAME);
        TestMessagingService.MockAsyncCallback mockAsyncCallback = new TestMessagingService.MockAsyncCallback();
        this._adminController.getMessagingService().sendAndWait(criteria, message, mockAsyncCallback, TestInstanceOperation.TIMEOUT);
        AssertJUnit.assertTrue(((String) ((Message) mockAsyncCallback.getMessageReplied().get(0)).getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")).contains(this._hostSrc));
        AssertJUnit.assertEquals(mockAsyncCallback.getMessageReplied().size(), 1);
        message.setMsgId(UUID.randomUUID().toString());
        criteria.setPartition("TestDB_17");
        TestMessagingService.MockAsyncCallback mockAsyncCallback2 = new TestMessagingService.MockAsyncCallback();
        this._adminController.getMessagingService().sendAndWait(criteria, message, mockAsyncCallback2, TestInstanceOperation.TIMEOUT);
        AssertJUnit.assertTrue(((String) ((Message) mockAsyncCallback2.getMessageReplied().get(0)).getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")).contains(this._hostSrc));
        AssertJUnit.assertEquals(mockAsyncCallback2.getMessageReplied().size(), 1);
        message.setMsgId(UUID.randomUUID().toString());
        criteria.setPartitionState("SLAVE");
        TestMessagingService.MockAsyncCallback mockAsyncCallback3 = new TestMessagingService.MockAsyncCallback();
        this._adminController.getMessagingService().sendAndWait(criteria, message, mockAsyncCallback3, TestInstanceOperation.TIMEOUT);
        AssertJUnit.assertTrue(((String) ((Message) mockAsyncCallback3.getMessageReplied().get(0)).getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")).contains(this._hostSrc));
        AssertJUnit.assertEquals(mockAsyncCallback3.getMessageReplied().size(), 1);
    }

    @Override // org.apache.helix.integration.messaging.TestMessagingService
    @Test(enabled = false)
    public void sendSelfMsg() {
    }
}
