package org.apache.helix.integration.manager;

import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.helix.PreConnectCallback;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.manager.zk.CallbackHandler;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.mock.participant.MockMSModelFactory;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.tools.ClusterStateVerifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.class */
public class TestConsecutiveZkSessionExpiry extends ZkUnitTestBase {
    private static Logger LOG = LoggerFactory.getLogger(TestConsecutiveZkSessionExpiry.class);

    /* loaded from: input_file:org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry$PreConnectTestCallback.class */
    class PreConnectTestCallback implements PreConnectCallback {
        final String instanceName;
        final CountDownLatch startCountDown;
        final CountDownLatch endCountDown;
        int count = 0;

        public PreConnectTestCallback(String str, CountDownLatch countDownLatch, CountDownLatch countDownLatch2) {
            this.instanceName = str;
            this.startCountDown = countDownLatch;
            this.endCountDown = countDownLatch2;
        }

        public void onPreConnect() {
            TestConsecutiveZkSessionExpiry.LOG.info("handleNewSession for instance: " + this.instanceName + ", count: " + this.count);
            int i = this.count;
            this.count = i + 1;
            if (i == 1) {
                this.startCountDown.countDown();
                TestConsecutiveZkSessionExpiry.LOG.info("wait session expiry to happen");
                try {
                    this.endCountDown.await();
                } catch (Exception e) {
                    TestConsecutiveZkSessionExpiry.LOG.error("interrupted in waiting", e);
                }
            }
        }
    }

    @Test
    public void testParticipant() throws Exception {
        String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        TestHelper.setupCluster(str, ZkTestBase.ZK_ADDR, 12918, "localhost", WorkflowGenerator.DEFAULT_TGT_DB, 1, 32, 2, 2, "MasterSlave", true);
        ClusterControllerManager clusterControllerManager = new ClusterControllerManager(ZkTestBase.ZK_ADDR, str, "controller");
        clusterControllerManager.syncStart();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        MockParticipantManager[] mockParticipantManagerArr = new MockParticipantManager[2];
        for (int i = 0; i < 2; i++) {
            String str2 = "localhost_" + (12918 + i);
            mockParticipantManagerArr[i] = new MockParticipantManager(ZkTestBase.ZK_ADDR, str, str2);
            if (i == 0) {
                mockParticipantManagerArr[i].addPreConnectCallback(new PreConnectTestCallback(str2, countDownLatch, countDownLatch2));
            }
            mockParticipantManagerArr[i].syncStart();
        }
        Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkTestBase.ZK_ADDR, str)));
        LOG.info("1st Expiring participant session...");
        String sessionId = mockParticipantManagerArr[0].getSessionId();
        ZkTestHelper.asyncExpireSession(mockParticipantManagerArr[0].getZkClient());
        LOG.info("Expried participant session. oldSessionId: " + sessionId + ", newSessionId: " + mockParticipantManagerArr[0].getSessionId());
        countDownLatch.await();
        LOG.info("2nd Expiring participant session...");
        String sessionId2 = mockParticipantManagerArr[0].getSessionId();
        ZkTestHelper.asyncExpireSession(mockParticipantManagerArr[0].getZkClient());
        LOG.info("Expried participant session. oldSessionId: " + sessionId2 + ", newSessionId: " + mockParticipantManagerArr[0].getSessionId());
        countDownLatch2.countDown();
        Assert.assertTrue(ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkTestBase.ZK_ADDR, str)));
        clusterControllerManager.syncStop();
        for (int i2 = 0; i2 < 2; i2++) {
            mockParticipantManagerArr[i2].syncStop();
        }
        deleteCluster(str);
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }

    @Test
    public void testDistributedController() throws Exception {
        String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        TestHelper.setupCluster(str, ZkTestBase.ZK_ADDR, 12918, "localhost", WorkflowGenerator.DEFAULT_TGT_DB, 1, 4, 2, 2, "MasterSlave", true);
        ClusterDistributedController[] clusterDistributedControllerArr = new ClusterDistributedController[2];
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        for (int i = 0; i < 2; i++) {
            String str2 = "localhost_" + (12918 + i);
            clusterDistributedControllerArr[i] = new ClusterDistributedController(ZkTestBase.ZK_ADDR, str, str2);
            clusterDistributedControllerArr[i].getStateMachineEngine().registerStateModelFactory("MasterSlave", new MockMSModelFactory());
            if (i == 0) {
                clusterDistributedControllerArr[i].addPreConnectCallback(new PreConnectTestCallback(str2, countDownLatch, countDownLatch2));
            }
            clusterDistributedControllerArr[i].connect();
        }
        Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkTestBase.ZK_ADDR, str)));
        LOG.info("1st Expiring distributedController session...");
        String sessionId = clusterDistributedControllerArr[0].getSessionId();
        ZkTestHelper.asyncExpireSession(clusterDistributedControllerArr[0].getZkClient());
        LOG.info("Expried distributedController session. oldSessionId: " + sessionId + ", newSessionId: " + clusterDistributedControllerArr[0].getSessionId());
        countDownLatch.await();
        LOG.info("2nd Expiring distributedController session...");
        String sessionId2 = clusterDistributedControllerArr[0].getSessionId();
        ZkTestHelper.asyncExpireSession(clusterDistributedControllerArr[0].getZkClient());
        LOG.info("Expried distributedController session. oldSessionId: " + sessionId2 + ", newSessionId: " + clusterDistributedControllerArr[0].getSessionId());
        countDownLatch2.countDown();
        Assert.assertTrue(ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkTestBase.ZK_ADDR, str)));
        ZKHelixDataAccessor zKHelixDataAccessor = new ZKHelixDataAccessor(str, new ZkBaseDataAccessor(_gZkClient));
        PropertyKey.Builder keyBuilder = zKHelixDataAccessor.keyBuilder();
        Assert.assertNotNull(pollForProperty(LiveInstance.class, zKHelixDataAccessor, keyBuilder.liveInstance("localhost_12918"), true));
        LiveInstance pollForProperty = pollForProperty(LiveInstance.class, zKHelixDataAccessor, keyBuilder.controllerLeader(), true);
        Assert.assertNotNull(pollForProperty);
        Assert.assertEquals(pollForProperty.getId(), "localhost_12919");
        LOG.debug("handlers: " + TestHelper.printHandlers(clusterDistributedControllerArr[0]));
        List<CallbackHandler> handlers = clusterDistributedControllerArr[0].getHandlers();
        Assert.assertEquals(handlers.size(), 1, "Distributed controller should have 1 handler (message) after lose leadership, but was " + handlers.size());
        clusterDistributedControllerArr[0].disconnect();
        clusterDistributedControllerArr[1].disconnect();
        Assert.assertNull(pollForProperty(LiveInstance.class, zKHelixDataAccessor, keyBuilder.liveInstance("localhost_12918"), false));
        Assert.assertNull(pollForProperty(LiveInstance.class, zKHelixDataAccessor, keyBuilder.liveInstance("localhost_12919"), false));
        Assert.assertNull(pollForProperty(LiveInstance.class, zKHelixDataAccessor, keyBuilder.controllerLeader(), false));
        deleteCluster(str);
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }
}
