package org.apache.helix.integration;

import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.I0Itec.zkclient.IZkChildListener;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.mock.participant.ErrTransition;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/integration/TestBatchMessage.class */
public class TestBatchMessage extends ZkIntegrationTestBase {

    /* loaded from: input_file:org/apache/helix/integration/TestBatchMessage$TestZkChildListener.class */
    class TestZkChildListener implements IZkChildListener {
        int _maxNbOfChilds = 0;

        TestZkChildListener() {
        }

        public void handleChildChange(String str, List<String> list) throws Exception {
            System.out.println(str + " has " + list.size() + " messages");
            if (list.size() > this._maxNbOfChilds) {
                this._maxNbOfChilds = list.size();
            }
        }
    }

    @Test
    public void testBasic() throws Exception {
        String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        TestHelper.setupCluster(str, ZkIntegrationTestBase.ZK_ADDR, 12918, "localhost", WorkflowGenerator.DEFAULT_TGT_DB, 1, 32, 2, 2, "MasterSlave", true);
        ZKHelixDataAccessor zKHelixDataAccessor = new ZKHelixDataAccessor(str, new ZkBaseDataAccessor(_gZkClient));
        PropertyKey.Builder keyBuilder = zKHelixDataAccessor.keyBuilder();
        IdealState property = zKHelixDataAccessor.getProperty(keyBuilder.idealStates("TestDB0"));
        property.setBatchMessageMode(true);
        zKHelixDataAccessor.setProperty(keyBuilder.idealStates("TestDB0"), property);
        TestZkChildListener testZkChildListener = new TestZkChildListener();
        _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), testZkChildListener);
        ClusterControllerManager clusterControllerManager = new ClusterControllerManager(ZkIntegrationTestBase.ZK_ADDR, str, "controller_0");
        clusterControllerManager.syncStart();
        MockParticipantManager[] mockParticipantManagerArr = new MockParticipantManager[2];
        for (int i = 0; i < 2; i++) {
            mockParticipantManagerArr[i] = new MockParticipantManager(ZkIntegrationTestBase.ZK_ADDR, str, "localhost_" + (12918 + i));
            mockParticipantManagerArr[i].syncStart();
        }
        Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkIntegrationTestBase.ZK_ADDR, str)));
        Assert.assertTrue(testZkChildListener._maxNbOfChilds <= 3, "Should get no more than 2 messages (O->S and S->M)");
        clusterControllerManager.syncStop();
        for (int i2 = 0; i2 < 2; i2++) {
            mockParticipantManagerArr[i2].syncStop();
        }
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }

    @Test
    public void testChangeBatchMessageMode() throws Exception {
        String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        TestHelper.setupCluster(str, ZkIntegrationTestBase.ZK_ADDR, 12918, "localhost", WorkflowGenerator.DEFAULT_TGT_DB, 1, 32, 2, 2, "MasterSlave", true);
        ClusterControllerManager clusterControllerManager = new ClusterControllerManager(ZkIntegrationTestBase.ZK_ADDR, str, "controller_0");
        clusterControllerManager.syncStart();
        MockParticipantManager[] mockParticipantManagerArr = new MockParticipantManager[2];
        for (int i = 0; i < 2; i++) {
            mockParticipantManagerArr[i] = new MockParticipantManager(ZkIntegrationTestBase.ZK_ADDR, str, "localhost_" + (12918 + i));
            mockParticipantManagerArr[i].syncStart();
        }
        Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkIntegrationTestBase.ZK_ADDR, str)));
        for (int i2 = 0; i2 < 2; i2++) {
            mockParticipantManagerArr[i2].syncStop();
        }
        ZKHelixDataAccessor zKHelixDataAccessor = new ZKHelixDataAccessor(str, new ZkBaseDataAccessor(_gZkClient));
        PropertyKey.Builder keyBuilder = zKHelixDataAccessor.keyBuilder();
        IdealState property = zKHelixDataAccessor.getProperty(keyBuilder.idealStates("TestDB0"));
        property.setBatchMessageMode(true);
        zKHelixDataAccessor.setProperty(keyBuilder.idealStates("TestDB0"), property);
        TestZkChildListener testZkChildListener = new TestZkChildListener();
        _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), testZkChildListener);
        for (int i3 = 0; i3 < 2; i3++) {
            mockParticipantManagerArr[i3] = new MockParticipantManager(ZkIntegrationTestBase.ZK_ADDR, str, "localhost_" + (12918 + i3));
            mockParticipantManagerArr[i3].syncStart();
        }
        Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkIntegrationTestBase.ZK_ADDR, str)));
        Assert.assertTrue(testZkChildListener._maxNbOfChilds <= 3, "Should get no more than 2 messages (O->S and S->M)");
        clusterControllerManager.syncStop();
        for (int i4 = 0; i4 < 2; i4++) {
            mockParticipantManagerArr[i4].syncStop();
        }
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }

    @Test
    public void testSubMsgExecutionFail() throws Exception {
        String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        MockParticipantManager[] mockParticipantManagerArr = new MockParticipantManager[5];
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        TestHelper.setupCluster(str, ZkIntegrationTestBase.ZK_ADDR, 12918, "localhost", WorkflowGenerator.DEFAULT_TGT_DB, 1, 6, 5, 3, "MasterSlave", true);
        ZKHelixDataAccessor zKHelixDataAccessor = new ZKHelixDataAccessor(str, _baseAccessor);
        PropertyKey.Builder keyBuilder = zKHelixDataAccessor.keyBuilder();
        IdealState property = zKHelixDataAccessor.getProperty(keyBuilder.idealStates("TestDB0"));
        property.setBatchMessageMode(true);
        zKHelixDataAccessor.setProperty(keyBuilder.idealStates("TestDB0"), property);
        String str2 = null;
        Iterator it = property.getInstanceStateMap("TestDB0_0").entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry entry = (Map.Entry) it.next();
            if (((String) entry.getValue()).equals("MASTER")) {
                str2 = (String) entry.getKey();
                break;
            }
        }
        Assert.assertNotNull(str2);
        new ClusterControllerManager(ZkIntegrationTestBase.ZK_ADDR, str).syncStart();
        for (int i = 0; i < 5; i++) {
            String str3 = "localhost_" + (12918 + i);
            if (str3.equals(str2)) {
                HashMap hashMap = new HashMap();
                hashMap.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_0"));
                mockParticipantManagerArr[i] = new MockParticipantManager(ZkIntegrationTestBase.ZK_ADDR, str, str3);
                mockParticipantManagerArr[i].setTransition(new ErrTransition(hashMap));
            } else {
                mockParticipantManagerArr[i] = new MockParticipantManager(ZkIntegrationTestBase.ZK_ADDR, str, str3);
            }
            mockParticipantManagerArr[i].syncStart();
        }
        HashMap hashMap2 = new HashMap();
        hashMap2.put("TestDB0", new HashMap());
        ((Map) hashMap2.get("TestDB0")).put("TestDB0_0", str2);
        Assert.assertTrue(ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkIntegrationTestBase.ZK_ADDR, str, hashMap2)));
        HashMap hashMap3 = new HashMap();
        hashMap3.put("TestDB0_0", TestHelper.setOf(str2));
        TestHelper.verifyState(str, ZkIntegrationTestBase.ZK_ADDR, hashMap3, "ERROR");
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }

    @Test
    public void testParticipantIncompatibleWithBatchMsg() throws Exception {
        String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        TestHelper.setupCluster(str, ZkIntegrationTestBase.ZK_ADDR, 12918, "localhost", WorkflowGenerator.DEFAULT_TGT_DB, 1, 32, 2, 2, "MasterSlave", true);
        ClusterSetup.processCommandLineArgs(new String[]{"--zkSvr", ZkIntegrationTestBase.ZK_ADDR, "--addResourceProperty", str, "TestDB0", HelixProperty.HelixPropertyAttribute.BATCH_MESSAGE_MODE.toString(), "true"});
        ZKHelixDataAccessor zKHelixDataAccessor = new ZKHelixDataAccessor(str, new ZkBaseDataAccessor(_gZkClient));
        PropertyKey.Builder keyBuilder = zKHelixDataAccessor.keyBuilder();
        TestZkChildListener testZkChildListener = new TestZkChildListener();
        _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), testZkChildListener);
        ClusterControllerManager clusterControllerManager = new ClusterControllerManager(ZkIntegrationTestBase.ZK_ADDR, str, "controller_0");
        clusterControllerManager.syncStart();
        ClusterSetup.processCommandLineArgs(new String[]{"--zkSvr", ZkIntegrationTestBase.ZK_ADDR, "--enableCluster", str, "false"});
        MockParticipantManager[] mockParticipantManagerArr = new MockParticipantManager[2];
        for (int i = 0; i < 2; i++) {
            mockParticipantManagerArr[i] = new MockParticipantManager(ZkIntegrationTestBase.ZK_ADDR, str, "localhost_" + (12918 + i));
            mockParticipantManagerArr[i].syncStart();
        }
        LiveInstance property = zKHelixDataAccessor.getProperty(keyBuilder.liveInstance("localhost_12918"));
        property.setHelixVersion("0.5");
        zKHelixDataAccessor.setProperty(keyBuilder.liveInstance("localhost_12918"), property);
        ClusterSetup.processCommandLineArgs(new String[]{"--zkSvr", ZkIntegrationTestBase.ZK_ADDR, "--enableCluster", str, "true"});
        Assert.assertTrue(org.apache.helix.tools.ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkIntegrationTestBase.ZK_ADDR, str)));
        Assert.assertTrue(testZkChildListener._maxNbOfChilds > 16, "Should see more than 16 messages at the same time (32 O->S and 32 S->M)");
        clusterControllerManager.syncStop();
        for (int i2 = 0; i2 < 2; i2++) {
            mockParticipantManagerArr[i2].syncStop();
        }
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }
}
