package org.apache.helix.integration;

import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkHelixTestManager;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.manager.zk.CallbackHandler;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.mock.controller.ClusterController;
import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.CurrentState;
import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/integration/TestZkCallbackHandlerLeak.class */
public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
    @Test
    public void testCbHandlerLeakOnParticipantSessionExpiry() throws Exception {
        String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        TestHelper.setupCluster(str, ZkUnitTestBase.ZK_ADDR, 12918, "localhost", "TestDB", 1, 32, 2, 2, "MasterSlave", true);
        ClusterController clusterController = new ClusterController(str, "controller_0", ZkUnitTestBase.ZK_ADDR);
        clusterController.syncStart();
        MockParticipant[] mockParticipantArr = new MockParticipant[2];
        for (int i = 0; i < 2; i++) {
            mockParticipantArr[i] = new MockParticipant(str, "localhost_" + (12918 + i), ZkUnitTestBase.ZK_ADDR, null);
            mockParticipantArr[i].syncStart();
        }
        Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkUnitTestBase.ZK_ADDR, str)));
        final ZkHelixTestManager manager = clusterController.getManager();
        final ZkHelixTestManager manager2 = mockParticipantArr[1].getManager();
        Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() { // from class: org.apache.helix.integration.TestZkCallbackHandlerLeak.1
            @Override // org.apache.helix.TestHelper.Verifier
            public boolean verify() throws Exception {
                return ZkTestHelper.getListenersBySession(ZkUnitTestBase.ZK_ADDR).get(new StringBuilder().append("0x").append(manager.getSessionId()).toString()).size() == 16;
            }
        }, 500L), "Controller should have 6 + 5*n zk-watchers.");
        Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() { // from class: org.apache.helix.integration.TestZkCallbackHandlerLeak.2
            @Override // org.apache.helix.TestHelper.Verifier
            public boolean verify() throws Exception {
                return ZkTestHelper.getListenersBySession(ZkUnitTestBase.ZK_ADDR).get(new StringBuilder().append("0x").append(manager2.getSessionId()).toString()).size() == 2;
            }
        }, 500L), "Participant should have 2 zk-watchers.");
        int size = manager.getHandlers().size();
        int size2 = manager2.getHandlers().size();
        Assert.assertEquals(size, 9, "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant");
        Assert.assertEquals(size2, 2, "HelixParticipant should have 2 (msg+cur-state) callback handlers");
        System.out.println("Expiring participant session...");
        String sessionId = manager2.getSessionId();
        ZkTestHelper.expireSession(manager2.getZkClient());
        System.out.println("Expried participant session. oldSessionId: " + sessionId + ", newSessionId: " + manager2.getSessionId());
        Assert.assertTrue(ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkUnitTestBase.ZK_ADDR, str)));
        Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() { // from class: org.apache.helix.integration.TestZkCallbackHandlerLeak.3
            @Override // org.apache.helix.TestHelper.Verifier
            public boolean verify() throws Exception {
                return ZkTestHelper.getListenersBySession(ZkUnitTestBase.ZK_ADDR).get(new StringBuilder().append("0x").append(manager.getSessionId()).toString()).size() == 16;
            }
        }, 500L), "Controller should have 6 + 5*n zk-watchers after session expiry.");
        Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() { // from class: org.apache.helix.integration.TestZkCallbackHandlerLeak.4
            @Override // org.apache.helix.TestHelper.Verifier
            public boolean verify() throws Exception {
                return ZkTestHelper.getListenersBySession(ZkUnitTestBase.ZK_ADDR).get(new StringBuilder().append("0x").append(manager2.getSessionId()).toString()).size() == 2;
            }
        }, 500L), "Participant should have 2 zk-watchers after session expiry.");
        Assert.assertEquals(manager.getHandlers().size(), size, "controller callback handlers should not increase after participant session expiry");
        Assert.assertEquals(manager2.getHandlers().size(), size2, "participant callback handlers should not increase after participant session expiry");
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }

    @Test
    public void testCbHandlerLeakOnControllerSessionExpiry() throws Exception {
        String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        TestHelper.setupCluster(str, ZkUnitTestBase.ZK_ADDR, 12918, "localhost", "TestDB", 1, 32, 2, 2, "MasterSlave", true);
        ClusterController clusterController = new ClusterController(str, "controller_0", ZkUnitTestBase.ZK_ADDR);
        clusterController.syncStart();
        MockParticipant[] mockParticipantArr = new MockParticipant[2];
        for (int i = 0; i < 2; i++) {
            mockParticipantArr[i] = new MockParticipant(str, "localhost_" + (12918 + i), ZkUnitTestBase.ZK_ADDR, null);
            mockParticipantArr[i].syncStart();
        }
        Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkUnitTestBase.ZK_ADDR, str)));
        final ZkHelixTestManager manager = clusterController.getManager();
        final ZkHelixTestManager manager2 = mockParticipantArr[0].getManager();
        TestHelper.verify(new TestHelper.Verifier() { // from class: org.apache.helix.integration.TestZkCallbackHandlerLeak.5
            @Override // org.apache.helix.TestHelper.Verifier
            public boolean verify() throws Exception {
                return manager.getHandlers().size() == 9 && manager2.getHandlers().size() == 2;
            }
        }, 1000L);
        int size = manager.getHandlers().size();
        int size2 = manager2.getHandlers().size();
        Assert.assertEquals(size, 9, "HelixController should have 9 (5+2n) callback handlers for 2 participant, but was " + size + ", " + printHandlers(manager));
        Assert.assertEquals(size2, 2, "HelixParticipant should have 2 (msg+cur-state) callback handlers, but was " + size2 + ", " + printHandlers(manager2));
        System.out.println("Expiring controller session...");
        String sessionId = manager.getSessionId();
        ZkTestHelper.expireSession(manager.getZkClient());
        System.out.println("Expired controller session. oldSessionId: " + sessionId + ", newSessionId: " + manager.getSessionId());
        Assert.assertTrue(ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkUnitTestBase.ZK_ADDR, str)));
        Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() { // from class: org.apache.helix.integration.TestZkCallbackHandlerLeak.6
            @Override // org.apache.helix.TestHelper.Verifier
            public boolean verify() throws Exception {
                return ZkTestHelper.getListenersBySession(ZkUnitTestBase.ZK_ADDR).get(new StringBuilder().append("0x").append(manager.getSessionId()).toString()).size() == 16;
            }
        }, 500L), "Controller should have 6 + 5*n zk-watchers after session expiry.");
        Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() { // from class: org.apache.helix.integration.TestZkCallbackHandlerLeak.7
            @Override // org.apache.helix.TestHelper.Verifier
            public boolean verify() throws Exception {
                return ZkTestHelper.getListenersBySession(ZkUnitTestBase.ZK_ADDR).get(new StringBuilder().append("0x").append(manager2.getSessionId()).toString()).size() == 2;
            }
        }, 500L), "Participant should have 2 zk-watchers after session expiry.");
        Assert.assertEquals(manager.getHandlers().size(), size, "controller callback handlers should not increase after participant session expiry, but was " + printHandlers(manager));
        Assert.assertEquals(manager2.getHandlers().size(), size2, "participant callback handlers should not increase after participant session expiry, but was " + printHandlers(manager2));
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }

    @Test
    public void testRemoveUserCbHandlerOnPathRemoval() throws Exception {
        String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        TestHelper.setupCluster(str, ZkUnitTestBase.ZK_ADDR, 12918, "localhost", "TestDB", 1, 32, 3, 2, "MasterSlave", true);
        new ClusterController(str, "controller_0", ZkUnitTestBase.ZK_ADDR).syncStart();
        MockParticipant[] mockParticipantArr = new MockParticipant[3];
        for (int i = 0; i < 3; i++) {
            mockParticipantArr[i] = new MockParticipant(str, "localhost_" + (12918 + i), ZkUnitTestBase.ZK_ADDR, null);
            mockParticipantArr[i].syncStart();
            if (i == 0) {
                ZkHelixTestManager manager = mockParticipantArr[0].getManager();
                manager.addCurrentStateChangeListener(new CurrentStateChangeListener() { // from class: org.apache.helix.integration.TestZkCallbackHandlerLeak.8
                    public void onStateChange(String str2, List<CurrentState> list, NotificationContext notificationContext) {
                    }
                }, manager.getInstanceName(), manager.getSessionId());
            }
        }
        Assert.assertTrue(Boolean.valueOf(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkUnitTestBase.ZK_ADDR, str))).booleanValue());
        ZkHelixTestManager manager2 = mockParticipantArr[0].getManager();
        String sessionId = manager2.getSessionId();
        PropertyKey.Builder builder = new PropertyKey.Builder(str);
        Assert.assertEquals(manager2.getHandlers().size(), 3, "Should have 3 handlers: CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES");
        Map<String, Set<IZkDataListener>> zkDataListener = ZkTestHelper.getZkDataListener(manager2.getZkClient());
        Map<String, Set<IZkChildListener>> zkChildListener = ZkTestHelper.getZkChildListener(manager2.getZkClient());
        Assert.assertEquals(zkDataListener.size(), 1, "Should have 1 path (CURRENTSTATE/{sessionId}/TestDB0) which has 1 data-listeners");
        String path = builder.currentState(manager2.getInstanceName(), sessionId, "TestDB0").getPath();
        Assert.assertEquals(zkDataListener.get(path).size(), 1, "Should have 1 data-listeners on path: " + path);
        Assert.assertEquals(zkChildListener.size(), 3, "Should have 3 paths (CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES) each of which has 1 child-listener");
        String path2 = builder.currentStates(manager2.getInstanceName(), sessionId).getPath();
        Assert.assertEquals(zkChildListener.get(path2).size(), 1, "Should have 1 child-listener on path: " + path2);
        String path3 = builder.messages(manager2.getInstanceName()).getPath();
        Assert.assertEquals(zkChildListener.get(path3).size(), 1, "Should have 1 child-listener on path: " + path3);
        String path4 = builder.controller().getPath();
        Assert.assertEquals(zkChildListener.get(path4).size(), 1, "Should have 1 child-listener on path: " + path4);
        Map<String, List<String>> zkWatch = ZkTestHelper.getZkWatch(manager2.getZkClient());
        Assert.assertEquals(zkWatch.get("dataWatches").size(), 4, "Should have 4 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, CONTROLLER, MESSAGES");
        Assert.assertEquals(zkWatch.get("childWatches").size(), 3, "Should have 3 child-watches: CONTROLLER, MESSAGES, and CURRENTSTATE/{sessionId}");
        System.out.println("Expire participant: " + manager2.getInstanceName() + ", session: " + manager2.getSessionId());
        ZkTestHelper.expireSession(manager2.getZkClient());
        System.out.println(manager2.getInstanceName() + " oldSessionId: " + sessionId + ", newSessionId: " + manager2.getSessionId());
        Assert.assertTrue(Boolean.valueOf(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkUnitTestBase.ZK_ADDR, str))).booleanValue());
        Assert.assertEquals(manager2.getHandlers().size(), 2, "Should have 2 handlers: CONTROLLER and MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
        Map<String, Set<IZkDataListener>> zkDataListener2 = ZkTestHelper.getZkDataListener(manager2.getZkClient());
        Map<String, Set<IZkChildListener>> zkChildListener2 = ZkTestHelper.getZkChildListener(manager2.getZkClient());
        Assert.assertTrue(zkDataListener2.isEmpty(), "Should have no data-listeners");
        Assert.assertEquals(zkChildListener2.size(), 3, "Should have 3 paths (CURRENTSTATE/{oldSessionId}, CONTROLLER, and MESSAGES). CONTROLLER and MESSAGE has 1 child-listener each. CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove empty childListener set. probably a ZkClient bug. see ZkClient#unsubscribeChildChange())");
        String path5 = builder.currentStates(manager2.getInstanceName(), sessionId).getPath();
        Assert.assertEquals(zkChildListener2.get(path5).size(), 0, "Should have no child-listener on path: " + path5);
        String path6 = builder.messages(manager2.getInstanceName()).getPath();
        Assert.assertEquals(zkChildListener2.get(path6).size(), 1, "Should have 1 child-listener on path: " + path6);
        String path7 = builder.controller().getPath();
        Assert.assertEquals(zkChildListener2.get(path7).size(), 1, "Should have 1 child-listener on path: " + path7);
        Map<String, List<String>> zkWatch2 = ZkTestHelper.getZkWatch(manager2.getZkClient());
        Assert.assertEquals(zkWatch2.get("dataWatches").size(), 2, "Should have 2 data-watches: CONTROLLER and MESSAGES");
        Assert.assertEquals(zkWatch2.get("childWatches").size(), 2, "Should have 2 child-watches: CONTROLLER and MESSAGES");
        Assert.assertEquals(zkWatch2.get("existWatches").size(), 2, "Should have 2 exist-watches: CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0");
        System.out.println("Expire participant: " + manager2.getInstanceName() + ", session: " + manager2.getSessionId());
        ZkTestHelper.expireSession(manager2.getZkClient());
        Assert.assertTrue(Boolean.valueOf(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkUnitTestBase.ZK_ADDR, str))).booleanValue());
        Map<String, List<String>> zkWatch3 = ZkTestHelper.getZkWatch(manager2.getZkClient());
        Assert.assertEquals(zkWatch3.get("dataWatches").size(), 2, "Should have 2 data-watches: CONTROLLER and MESSAGES");
        Assert.assertEquals(zkWatch3.get("childWatches").size(), 2, "Should have 2 child-watches: CONTROLLER and MESSAGES");
        Assert.assertEquals(zkWatch3.get("existWatches").size(), 0, "Should have no exist-watches. exist-watches on CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0 should be cleared during handleNewSession");
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }

    static String printHandlers(ZkHelixTestManager zkHelixTestManager) {
        StringBuilder sb = new StringBuilder();
        List<CallbackHandler> handlers = zkHelixTestManager.getHandlers();
        sb.append(zkHelixTestManager.getInstanceName() + " has " + handlers.size() + " cb-handlers. [");
        for (int i = 0; i < handlers.size(); i++) {
            CallbackHandler callbackHandler = handlers.get(i);
            sb.append(callbackHandler.getPath().substring(zkHelixTestManager.getClusterName().length() + 1) + ": " + callbackHandler.getListener());
            if (i < handlers.size() - 1) {
                sb.append(", ");
            }
        }
        sb.append("]");
        return sb.toString();
    }

    void printZkListeners(ZkClient zkClient) throws Exception {
        Map<String, Set<IZkDataListener>> zkDataListener = ZkTestHelper.getZkDataListener(zkClient);
        Map<String, Set<IZkChildListener>> zkChildListener = ZkTestHelper.getZkChildListener(zkClient);
        System.out.println("dataListeners {");
        for (String str : zkDataListener.keySet()) {
            System.out.println("\t" + str + ": ");
            Iterator<IZkDataListener> it = zkDataListener.get(str).iterator();
            while (it.hasNext()) {
                System.out.println("\t\t" + ((IZkDataListener) it.next()).getListener());
            }
        }
        System.out.println("}");
        System.out.println("childListeners {");
        for (String str2 : zkChildListener.keySet()) {
            System.out.println("\t" + str2 + ": ");
            Iterator<IZkChildListener> it2 = zkChildListener.get(str2).iterator();
            while (it2.hasNext()) {
                System.out.println("\t\t" + ((IZkChildListener) it2.next()).getListener());
            }
        }
        System.out.println("}");
    }
}
