package org.apache.helix.integration;

import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.manager.zk.MockController;
import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.model.CurrentState;
import org.apache.helix.testutil.ZkTestBase;
import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/integration/TestZkCallbackHandlerLeak.class */
public class TestZkCallbackHandlerLeak extends ZkTestBase {
    @Test
    public void testCbHandlerLeakOnParticipantSessionExpiry() throws Exception {
        String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        TestHelper.setupCluster(str, _zkaddr, 12918, "localhost", WorkflowGenerator.DEFAULT_TGT_DB, 1, 32, 2, 2, "MasterSlave", true);
        final MockController mockController = new MockController(_zkaddr, str, "controller_0");
        mockController.syncStart();
        MockParticipant[] mockParticipantArr = new MockParticipant[2];
        for (int i = 0; i < 2; i++) {
            mockParticipantArr[i] = new MockParticipant(_zkaddr, str, "localhost_" + (12918 + i));
            mockParticipantArr[i].syncStart();
        }
        Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr, str)));
        final MockParticipant mockParticipant = mockParticipantArr[1];
        Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() { // from class: org.apache.helix.integration.TestZkCallbackHandlerLeak.1
            @Override // org.apache.helix.TestHelper.Verifier
            public boolean verify() throws Exception {
                return ZkTestHelper.getListenersBySession(TestZkCallbackHandlerLeak._zkaddr).get(new StringBuilder().append("0x").append(mockController.getSessionId()).toString()).size() == 18;
            }
        }, 500L), "Controller should have 8 + 5*n zk-watchers.");
        Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() { // from class: org.apache.helix.integration.TestZkCallbackHandlerLeak.2
            @Override // org.apache.helix.TestHelper.Verifier
            public boolean verify() throws Exception {
                return ZkTestHelper.getListenersBySession(TestZkCallbackHandlerLeak._zkaddr).get(new StringBuilder().append("0x").append(mockParticipant.getSessionId()).toString()).size() == 1;
            }
        }, 500L), "Participant should have 1 zk-watcher. MESSAGES->HelixTaskExecutor");
        TestHelper.printHandlers(mockController, mockController.getHandlers());
        TestHelper.printHandlers(mockParticipant, mockParticipant.getHandlers());
        int size = mockController.getHandlers().size();
        int size2 = mockParticipant.getHandlers().size();
        Assert.assertEquals(size, 12, "HelixController should have 10 (8+2n) callback handlers for 2 (n) participant");
        Assert.assertEquals(size2, 1, "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers");
        System.out.println("Expiring participant session...");
        String sessionId = mockParticipant.getSessionId();
        ZkTestHelper.expireSession(mockParticipant.getZkClient());
        System.out.println("Expried participant session. oldSessionId: " + sessionId + ", newSessionId: " + mockParticipant.getSessionId());
        Assert.assertTrue(ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr, str)));
        Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() { // from class: org.apache.helix.integration.TestZkCallbackHandlerLeak.3
            @Override // org.apache.helix.TestHelper.Verifier
            public boolean verify() throws Exception {
                return ZkTestHelper.getListenersBySession(TestZkCallbackHandlerLeak._zkaddr).get(new StringBuilder().append("0x").append(mockController.getSessionId()).toString()).size() == 18;
            }
        }, 500L), "Controller should have 8 + 5*n zk-watchers after session expiry.");
        Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() { // from class: org.apache.helix.integration.TestZkCallbackHandlerLeak.4
            @Override // org.apache.helix.TestHelper.Verifier
            public boolean verify() throws Exception {
                return ZkTestHelper.getListenersBySession(TestZkCallbackHandlerLeak._zkaddr).get(new StringBuilder().append("0x").append(mockParticipant.getSessionId()).toString()).size() == 1;
            }
        }, 500L), "Participant should have 1 zk-watcher after session expiry.");
        Assert.assertEquals(mockController.getHandlers().size(), size, "controller callback handlers should not increase after participant session expiry");
        Assert.assertEquals(mockParticipant.getHandlers().size(), size2, "participant callback handlers should not increase after participant session expiry");
        mockController.syncStop();
        for (int i2 = 0; i2 < 2; i2++) {
            mockParticipantArr[i2].syncStop();
        }
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }

    @Test
    public void testCbHandlerLeakOnControllerSessionExpiry() throws Exception {
        String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        TestHelper.setupCluster(str, _zkaddr, 12918, "localhost", WorkflowGenerator.DEFAULT_TGT_DB, 1, 32, 2, 2, "MasterSlave", true);
        final MockController mockController = new MockController(_zkaddr, str, "controller_0");
        mockController.syncStart();
        MockParticipant[] mockParticipantArr = new MockParticipant[2];
        for (int i = 0; i < 2; i++) {
            mockParticipantArr[i] = new MockParticipant(_zkaddr, str, "localhost_" + (12918 + i));
            mockParticipantArr[i].syncStart();
        }
        Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr, str)));
        final MockParticipant mockParticipant = mockParticipantArr[0];
        TestHelper.verify(new TestHelper.Verifier() { // from class: org.apache.helix.integration.TestZkCallbackHandlerLeak.5
            @Override // org.apache.helix.TestHelper.Verifier
            public boolean verify() throws Exception {
                return mockController.getHandlers().size() == 10 && mockParticipant.getHandlers().size() == 2;
            }
        }, 1000L);
        int size = mockController.getHandlers().size();
        int size2 = mockParticipant.getHandlers().size();
        TestHelper.printHandlers(mockController, mockController.getHandlers());
        Assert.assertEquals(size, 12, "HelixController should have 12 (6+2n) callback handlers for 2 participant, but was " + size);
        TestHelper.printHandlers(mockParticipant, mockParticipant.getHandlers());
        Assert.assertEquals(size2, 1, "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handler, but was " + size2);
        System.out.println("Expiring controller session...");
        String sessionId = mockController.getSessionId();
        ZkTestHelper.expireSession(mockController.getZkClient());
        System.out.println("Expired controller session. oldSessionId: " + sessionId + ", newSessionId: " + mockController.getSessionId());
        Assert.assertTrue(ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr, str)));
        Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() { // from class: org.apache.helix.integration.TestZkCallbackHandlerLeak.6
            @Override // org.apache.helix.TestHelper.Verifier
            public boolean verify() throws Exception {
                return ZkTestHelper.getListenersBySession(TestZkCallbackHandlerLeak._zkaddr).get(new StringBuilder().append("0x").append(mockController.getSessionId()).toString()).size() == 18;
            }
        }, 500L), "Controller should have 8 + 5*n zk-watchers after session expiry.");
        Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() { // from class: org.apache.helix.integration.TestZkCallbackHandlerLeak.7
            @Override // org.apache.helix.TestHelper.Verifier
            public boolean verify() throws Exception {
                return ZkTestHelper.getListenersBySession(TestZkCallbackHandlerLeak._zkaddr).get(new StringBuilder().append("0x").append(mockParticipant.getSessionId()).toString()).size() == 1;
            }
        }, 500L), "Participant should have 1 zk-watcher after session expiry.");
        int size3 = mockController.getHandlers().size();
        TestHelper.printHandlers(mockController, mockController.getHandlers());
        Assert.assertTrue(size3 <= size, "controller callback handlers should not increase after participant session expiry (expected no more than " + size + ", found " + size3 + ")");
        int size4 = mockParticipant.getHandlers().size();
        TestHelper.printHandlers(mockParticipant, mockParticipant.getHandlers());
        Assert.assertEquals(size4, size2, "participant callback handlers should not increase after participant session expiry");
        mockController.syncStop();
        for (int i2 = 0; i2 < 2; i2++) {
            mockParticipantArr[i2].syncStop();
        }
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }

    @Test
    public void testRemoveUserCbHandlerOnPathRemoval() throws Exception {
        String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        String str2 = _zkaddr;
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        TestHelper.setupCluster(str, str2, 12918, "localhost", WorkflowGenerator.DEFAULT_TGT_DB, 1, 32, 3, 2, "MasterSlave", true);
        MockController mockController = new MockController(str2, str, "controller_0");
        mockController.syncStart();
        MockParticipant[] mockParticipantArr = new MockParticipant[3];
        for (int i = 0; i < 3; i++) {
            mockParticipantArr[i] = new MockParticipant(str2, str, "localhost_" + (12918 + i));
            mockParticipantArr[i].syncStart();
            if (i == 0) {
                MockParticipant mockParticipant = mockParticipantArr[0];
                mockParticipant.addCurrentStateChangeListener(new CurrentStateChangeListener() { // from class: org.apache.helix.integration.TestZkCallbackHandlerLeak.8
                    public void onStateChange(String str3, List<CurrentState> list, NotificationContext notificationContext) {
                    }
                }, mockParticipant.getInstanceName(), mockParticipant.getSessionId());
            }
        }
        Assert.assertTrue(Boolean.valueOf(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(str2, str))).booleanValue());
        MockParticipant mockParticipant2 = mockParticipantArr[0];
        String sessionId = mockParticipant2.getSessionId();
        PropertyKey.Builder builder = new PropertyKey.Builder(str);
        Assert.assertEquals(mockParticipant2.getHandlers().size(), 2, "Should have 2 handlers: CURRENTSTATE/{sessionId}, and MESSAGES");
        Map<String, Set<IZkDataListener>> zkDataListener = ZkTestHelper.getZkDataListener(mockParticipant2.getZkClient());
        Map<String, Set<IZkChildListener>> zkChildListener = ZkTestHelper.getZkChildListener(mockParticipant2.getZkClient());
        Assert.assertEquals(zkDataListener.size(), 1, "Should have 1 path (CURRENTSTATE/{sessionId}/TestDB0) which has 1 data-listeners");
        String path = builder.currentState(mockParticipant2.getInstanceName(), sessionId, "TestDB0").getPath();
        Assert.assertEquals(zkDataListener.get(path).size(), 1, "Should have 1 data-listeners on path: " + path);
        Assert.assertEquals(zkChildListener.size(), 2, "Should have 2 paths (CURRENTSTATE/{sessionId}, and MESSAGES) each of which has 1 child-listener");
        String path2 = builder.currentStates(mockParticipant2.getInstanceName(), sessionId).getPath();
        Assert.assertEquals(zkChildListener.get(path2).size(), 1, "Should have 1 child-listener on path: " + path2);
        String path3 = builder.messages(mockParticipant2.getInstanceName()).getPath();
        Assert.assertEquals(zkChildListener.get(path3).size(), 1, "Should have 1 child-listener on path: " + path3);
        String path4 = builder.controller().getPath();
        Assert.assertNull(zkChildListener.get(path4), "Should have no child-listener on path: " + path4);
        Map<String, List<String>> zkWatch = ZkTestHelper.getZkWatch(mockParticipant2.getZkClient());
        Assert.assertEquals(zkWatch.get("dataWatches").size(), 3, "Should have 3 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, MESSAGES");
        Assert.assertEquals(zkWatch.get("childWatches").size(), 2, "Should have 2 child-watches: MESSAGES, and CURRENTSTATE/{sessionId}");
        System.out.println("Expire participant: " + mockParticipant2.getInstanceName() + ", session: " + mockParticipant2.getSessionId());
        ZkTestHelper.expireSession(mockParticipant2.getZkClient());
        System.out.println(mockParticipant2.getInstanceName() + " oldSessionId: " + sessionId + ", newSessionId: " + mockParticipant2.getSessionId());
        Assert.assertTrue(Boolean.valueOf(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(str2, str))).booleanValue());
        Assert.assertEquals(mockParticipant2.getHandlers().size(), 1, "Should have 1 handlers: MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
        Map<String, Set<IZkDataListener>> zkDataListener2 = ZkTestHelper.getZkDataListener(mockParticipant2.getZkClient());
        Map<String, Set<IZkChildListener>> zkChildListener2 = ZkTestHelper.getZkChildListener(mockParticipant2.getZkClient());
        Assert.assertTrue(zkDataListener2.isEmpty(), "Should have no data-listeners");
        Assert.assertEquals(zkChildListener2.size(), 2, "Should have 2 paths (CURRENTSTATE/{oldSessionId}, and MESSAGES). CONTROLLER and MESSAGE has 1 child-listener each. CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove empty childListener set. probably a ZkClient bug. see ZkClient#unsubscribeChildChange())");
        String path5 = builder.currentStates(mockParticipant2.getInstanceName(), sessionId).getPath();
        Assert.assertEquals(zkChildListener2.get(path5).size(), 0, "Should have no child-listener on path: " + path5);
        String path6 = builder.messages(mockParticipant2.getInstanceName()).getPath();
        Assert.assertEquals(zkChildListener2.get(path6).size(), 1, "Should have 1 child-listener on path: " + path6);
        String path7 = builder.controller().getPath();
        Assert.assertNull(zkChildListener2.get(path7), "Should have no child-listener on path: " + path7);
        Map<String, List<String>> zkWatch2 = ZkTestHelper.getZkWatch(mockParticipant2.getZkClient());
        Assert.assertEquals(zkWatch2.get("dataWatches").size(), 1, "Should have 1 data-watches: MESSAGES");
        Assert.assertEquals(zkWatch2.get("childWatches").size(), 1, "Should have 1 child-watches: MESSAGES");
        Assert.assertEquals(zkWatch2.get("existWatches").size(), 2, "Should have 2 exist-watches: CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0");
        System.out.println("Expire participant: " + mockParticipant2.getInstanceName() + ", session: " + mockParticipant2.getSessionId());
        ZkTestHelper.expireSession(mockParticipant2.getZkClient());
        Assert.assertTrue(Boolean.valueOf(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(str2, str))).booleanValue());
        Map<String, List<String>> zkWatch3 = ZkTestHelper.getZkWatch(mockParticipant2.getZkClient());
        Assert.assertEquals(zkWatch3.get("dataWatches").size(), 1, "Should have 1 data-watches: MESSAGES");
        Assert.assertEquals(zkWatch3.get("childWatches").size(), 1, "Should have 1 child-watches: MESSAGES");
        Assert.assertEquals(zkWatch3.get("existWatches").size(), 0, "Should have no exist-watches. exist-watches on CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0 should be cleared during handleNewSession");
        mockController.syncStop();
        for (int i2 = 0; i2 < 3; i2++) {
            mockParticipantArr[i2].syncStop();
        }
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }
}
