package org.apache.helix.integration.manager;

import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.integration.ZkIntegrationTestBase;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.mock.participant.MockMSModelFactory;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/integration/manager/TestParticipantManager.class */
public class TestParticipantManager extends ZkIntegrationTestBase {
    private static Logger LOG = Logger.getLogger(TestParticipantManager.class);

    /* loaded from: input_file:org/apache/helix/integration/manager/TestParticipantManager$SessionExpiryTransition.class */
    class SessionExpiryTransition extends MockTransition {
        private final AtomicBoolean _done = new AtomicBoolean();
        private final CountDownLatch _startCountdown;
        private final CountDownLatch _endCountdown;

        public SessionExpiryTransition(CountDownLatch countDownLatch, CountDownLatch countDownLatch2) {
            this._startCountdown = countDownLatch;
            this._endCountdown = countDownLatch2;
        }

        @Override // org.apache.helix.mock.participant.MockTransition
        public void doTransition(Message message, NotificationContext notificationContext) throws InterruptedException {
            String tgtName = message.getTgtName();
            String partitionName = message.getPartitionName();
            if (tgtName.equals("localhost_12918") && partitionName.equals("TestDB0_0") && !this._done.getAndSet(true)) {
                this._startCountdown.countDown();
                this._endCountdown.await();
            }
        }
    }

    @Test
    public void simpleIntegrationTest() throws Exception {
        String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        TestHelper.setupCluster(str, ZkIntegrationTestBase.ZK_ADDR, 12918, "localhost", "TestDB", 1, 4, 1, 1, "MasterSlave", true);
        ZKHelixManager zKHelixManager = new ZKHelixManager(str, "localhost_12918", InstanceType.PARTICIPANT, ZkIntegrationTestBase.ZK_ADDR);
        zKHelixManager.getStateMachineEngine().registerStateModelFactory("MasterSlave", new MockMSModelFactory());
        zKHelixManager.connect();
        ZKHelixManager zKHelixManager2 = new ZKHelixManager(str, "controller_0", InstanceType.CONTROLLER, ZkIntegrationTestBase.ZK_ADDR);
        zKHelixManager2.connect();
        Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkIntegrationTestBase.ZK_ADDR, str)));
        zKHelixManager2.disconnect();
        zKHelixManager.disconnect();
        ZKHelixDataAccessor zKHelixDataAccessor = new ZKHelixDataAccessor(str, new ZkBaseDataAccessor(_gZkClient));
        PropertyKey.Builder keyBuilder = zKHelixDataAccessor.keyBuilder();
        Assert.assertNull(zKHelixDataAccessor.getProperty(keyBuilder.liveInstance("localhost_12918")));
        Assert.assertNull(zKHelixDataAccessor.getProperty(keyBuilder.controllerLeader()));
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }

    @Test
    public void simpleSessionExpiryTest() throws Exception {
        String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        MockParticipantManager[] mockParticipantManagerArr = new MockParticipantManager[1];
        TestHelper.setupCluster(str, ZkIntegrationTestBase.ZK_ADDR, 12918, "localhost", "TestDB", 1, 1, 1, 1, "MasterSlave", true);
        ClusterControllerManager clusterControllerManager = new ClusterControllerManager(ZkIntegrationTestBase.ZK_ADDR, str, "controller_0");
        clusterControllerManager.syncStart();
        for (int i = 0; i < 1; i++) {
            mockParticipantManagerArr[i] = new MockParticipantManager(ZkIntegrationTestBase.ZK_ADDR, str, "localhost_" + (12918 + i));
            mockParticipantManagerArr[i].syncStart();
        }
        Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkIntegrationTestBase.ZK_ADDR, str)));
        String sessionId = mockParticipantManagerArr[0].getSessionId();
        ZkTestHelper.expireSession(mockParticipantManagerArr[0].getZkClient());
        TimeUnit.MILLISECONDS.sleep(100L);
        Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkIntegrationTestBase.ZK_ADDR, str)));
        Assert.assertNotSame(mockParticipantManagerArr[0].getSessionId(), sessionId);
        clusterControllerManager.syncStop();
        for (int i2 = 0; i2 < 1; i2++) {
            mockParticipantManagerArr[i2].syncStop();
        }
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }

    @Test
    public void testSessionExpiryInTransition() throws Exception {
        String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        MockParticipantManager[] mockParticipantManagerArr = new MockParticipantManager[1];
        TestHelper.setupCluster(str, ZkIntegrationTestBase.ZK_ADDR, 12918, "localhost", "TestDB", 1, 1, 1, 1, "MasterSlave", true);
        ClusterControllerManager clusterControllerManager = new ClusterControllerManager(ZkIntegrationTestBase.ZK_ADDR, str, "controller_0");
        clusterControllerManager.syncStart();
        for (int i = 0; i < 1; i++) {
            mockParticipantManagerArr[i] = new MockParticipantManager(ZkIntegrationTestBase.ZK_ADDR, str, "localhost_" + (12918 + i));
            mockParticipantManagerArr[i].setTransition(new SessionExpiryTransition(countDownLatch, countDownLatch2));
            mockParticipantManagerArr[i].syncStart();
        }
        countDownLatch.await();
        String sessionId = mockParticipantManagerArr[0].getSessionId();
        System.out.println("oldSessionId: " + sessionId);
        ZkTestHelper.expireSession(mockParticipantManagerArr[0].getZkClient());
        Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZkIntegrationTestBase.ZK_ADDR, str)));
        Assert.assertNotSame(mockParticipantManagerArr[0].getSessionId(), sessionId);
        ZNRecord zNRecord = (ZNRecord) _gZkClient.readData(PropertyPathConfig.getPath(PropertyType.ERRORS, str, new String[]{"localhost_12918", sessionId, "TestDB0", "TestDB0_0"}));
        Assert.assertNotNull(zNRecord, "InterruptedException should happen in old session since task is being cancelled during handleNewSession");
        Assert.assertTrue(new String(new ZNRecordSerializer().serialize(zNRecord)).indexOf("InterruptedException") != -1);
        clusterControllerManager.syncStop();
        for (int i2 = 0; i2 < 1; i2++) {
            mockParticipantManagerArr[i2].syncStop();
        }
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }
}
