package org.apache.helix.manager.zk;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import org.apache.helix.PreConnectCallback;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/manager/zk/TestParticipantManager.class */
public class TestParticipantManager extends ZkTestBase {
    private static Logger LOG = LoggerFactory.getLogger(TestParticipantManager.class);

    /* loaded from: input_file:org/apache/helix/manager/zk/TestParticipantManager$BlockingPreConnectCallback.class */
    static class BlockingPreConnectCallback implements PreConnectCallback {
        private final String instanceName;
        private final CountDownLatch startCountDown;
        private final CountDownLatch endCountDown;
        private final Semaphore semaphore;
        private boolean canCreateLiveInstance;

        BlockingPreConnectCallback(String str, CountDownLatch countDownLatch, CountDownLatch countDownLatch2, Semaphore semaphore) {
            this.instanceName = str;
            this.startCountDown = countDownLatch;
            this.endCountDown = countDownLatch2;
            this.semaphore = semaphore;
        }

        public void onPreConnect() {
            TestParticipantManager.LOG.info("Handling new session for instance: {}", this.instanceName);
            this.semaphore.release();
            try {
                TestParticipantManager.LOG.info("Waiting session expiry to happen.");
                this.startCountDown.await();
                if (this.canCreateLiveInstance) {
                    TestParticipantManager.LOG.info("Waiting to continue creating live instance.");
                    this.endCountDown.await();
                }
            } catch (InterruptedException e) {
                TestParticipantManager.LOG.error("Interrupted in waiting", e);
            }
            this.canCreateLiveInstance = true;
        }
    }

    @Test
    public void testSessionExpiryCreateLiveInstance() throws Exception {
        String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        ZKHelixDataAccessor zKHelixDataAccessor = new ZKHelixDataAccessor(str, new ZkBaseDataAccessor(ZkTestBase.ZK_ADDR));
        PropertyKey.Builder keyBuilder = zKHelixDataAccessor.keyBuilder();
        TestHelper.setupCluster(str, ZkTestBase.ZK_ADDR, 12918, "localhost", WorkflowGenerator.DEFAULT_TGT_DB, 1, 10, 5, 3, "MasterSlave", true);
        MockParticipantManager mockParticipantManager = new MockParticipantManager(ZkTestBase.ZK_ADDR, str, "localhost_12918");
        mockParticipantManager.syncStart();
        LiveInstance property = zKHelixDataAccessor.getProperty(keyBuilder.liveInstance("localhost_12918"));
        long creationTime = property.getStat().getCreationTime();
        String sessionId = mockParticipantManager.getSessionId();
        Assert.assertNotNull(property);
        Assert.assertEquals(property.getEphemeralOwner(), sessionId);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Semaphore semaphore = new Semaphore(0);
        mockParticipantManager.addPreConnectCallback(new BlockingPreConnectCallback("localhost_12918", countDownLatch, countDownLatch2, semaphore));
        ZkTestHelper.asyncExpireSession(mockParticipantManager.getZkClient());
        semaphore.acquire();
        Assert.assertFalse(sessionId.equals(mockParticipantManager.getSessionId()));
        Assert.assertNull(zKHelixDataAccessor.getProperty(keyBuilder.liveInstance("localhost_12918")));
        String sessionId2 = mockParticipantManager.getSessionId();
        ZkTestHelper.asyncExpireSession(mockParticipantManager.getZkClient());
        TestHelper.verify(() -> {
            return !ZKUtil.toHexSessionId(mockParticipantManager.getZkClient().getSessionId()).equals(sessionId2);
        }, TestHelper.WAIT_DURATION);
        String hexSessionId = ZKUtil.toHexSessionId(mockParticipantManager.getZkClient().getSessionId());
        Assert.assertFalse(sessionId2.equals(hexSessionId));
        countDownLatch.countDown();
        semaphore.acquire();
        Assert.assertNull(zKHelixDataAccessor.getProperty(keyBuilder.liveInstance("localhost_12918")), "Live instance should not be created because zk session is expired!");
        countDownLatch2.countDown();
        TestHelper.verify(() -> {
            LiveInstance property2 = zKHelixDataAccessor.getProperty(keyBuilder.liveInstance("localhost_12918"));
            return (property2 == null || property2.getStat().getCreationTime() == creationTime || !property2.getEphemeralOwner().equals(hexSessionId)) ? false : true;
        }, TestHelper.WAIT_DURATION);
        mockParticipantManager.syncStop();
        deleteCluster(str);
    }

    @Test(dependsOnMethods = {"testSessionExpiryCreateLiveInstance"})
    public void testCurrentTaskThreadPoolSizeCreation() throws Exception {
        String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        ZKHelixDataAccessor zKHelixDataAccessor = new ZKHelixDataAccessor(str, new ZkBaseDataAccessor.Builder().setZkAddress(ZkTestBase.ZK_ADDR).build());
        PropertyKey.Builder keyBuilder = zKHelixDataAccessor.keyBuilder();
        TestHelper.setupCluster(str, ZkTestBase.ZK_ADDR, 12918, "localhost", WorkflowGenerator.DEFAULT_TGT_DB, 1, 10, 5, 3, "MasterSlave", true);
        MockParticipantManager mockParticipantManager = new MockParticipantManager(ZkTestBase.ZK_ADDR, str, "localhost_12918");
        InstanceConfig property = zKHelixDataAccessor.getProperty(keyBuilder.instanceConfig("localhost_12918"));
        property.setTargetTaskThreadPoolSize(41);
        zKHelixDataAccessor.setProperty(keyBuilder.instanceConfig("localhost_12918"), property);
        mockParticipantManager.syncStart();
        LiveInstance property2 = zKHelixDataAccessor.getProperty(keyBuilder.liveInstance("localhost_12918"));
        Assert.assertNotNull(property2);
        Assert.assertEquals(property2.getCurrentTaskThreadPoolSize(), 41);
        mockParticipantManager.syncStop();
        deleteCluster(str);
    }
}
