package org.apache.helix.manager.zk;

import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/manager/zk/TestHandleSession.class */
public class TestHandleSession extends ZkTestBase {
    private static final String _className = TestHelper.getTestClassName();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/helix/manager/zk/TestHandleSession$BlockingHandleNewSessionZkHelixManager.class */
    public static class BlockingHandleNewSessionZkHelixManager extends ZKHelixManager {
        private final Semaphore newSessionHandlingCount;
        private long handleNewSessionStartTime;
        private long handleNewSessionEndTime;

        public BlockingHandleNewSessionZkHelixManager(String str, String str2, InstanceType instanceType, String str3) {
            super(str, str2, instanceType, str3);
            this.newSessionHandlingCount = new Semaphore(1);
            this.handleNewSessionStartTime = 0L;
            this.handleNewSessionEndTime = 0L;
        }

        public void handleNewSession(String str) throws Exception {
            this.newSessionHandlingCount.acquire();
            this.handleNewSessionStartTime = System.currentTimeMillis();
            super.handleNewSession(str);
            this.handleNewSessionEndTime = System.currentTimeMillis();
        }

        void proceedNewSessionHandling() {
            this.handleNewSessionStartTime = 0L;
            this.handleNewSessionEndTime = 0L;
            this.newSessionHandlingCount.release();
        }

        List<CallbackHandler> getHandlers() {
            return this._handlers;
        }

        RealmAwareZkClient getZkClient() {
            return this._zkclient;
        }

        long getHandleNewSessionStartTime() {
            return this.handleNewSessionStartTime;
        }

        long getHandleNewSessionEndTime() {
            return this.handleNewSessionEndTime;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/helix/manager/zk/TestHandleSession$BlockingResetHandlersZkHelixManager.class */
    public static class BlockingResetHandlersZkHelixManager extends ZKHelixManager {
        private final Semaphore resetHandlersSemaphore;
        private long resetHandlersStartTime;

        public BlockingResetHandlersZkHelixManager(String str, String str2, InstanceType instanceType, String str3) {
            super(str, str2, instanceType, str3);
            this.resetHandlersSemaphore = new Semaphore(1);
            this.resetHandlersStartTime = 0L;
        }

        void resetHandlers(boolean z) {
            this.resetHandlersStartTime = System.currentTimeMillis();
            if (!z) {
                try {
                    this.resetHandlersSemaphore.tryAcquire(20L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                }
            }
            super.resetHandlers(z);
        }

        void proceedResetHandlers() {
            this.resetHandlersSemaphore.release();
        }

        List<CallbackHandler> getHandlers() {
            return this._handlers;
        }

        RealmAwareZkClient getZkClient() {
            return this._zkclient;
        }

        long getResetHandlersStartTime() {
            return this.resetHandlersStartTime;
        }
    }

    /* loaded from: input_file:org/apache/helix/manager/zk/TestHandleSession$MockLiveInstanceChangeListener.class */
    class MockLiveInstanceChangeListener implements LiveInstanceChangeListener {
        private final HelixManager _manager;
        private final Set<String> _expectedLiveInstances;

        public MockLiveInstanceChangeListener(HelixManager helixManager, Set<String> set) {
            this._manager = helixManager;
            this._expectedLiveInstances = set;
        }

        public void onLiveInstanceChange(List<LiveInstance> list, NotificationContext notificationContext) {
            if (notificationContext.getType() != NotificationContext.Type.FINALIZE) {
                for (LiveInstance liveInstance : list) {
                    if (this._expectedLiveInstances.contains(liveInstance.getInstanceName())) {
                        try {
                            this._manager.addCurrentStateChangeListener((str, list2, notificationContext2) -> {
                            }, liveInstance.getInstanceName(), liveInstance.getEphemeralOwner());
                        } catch (Exception e) {
                            throw new HelixException("Unexpected exception in the test method.", e);
                        }
                    }
                }
            }
        }
    }

    @Test
    public void testHandleNewSession() throws Exception {
        String str = _className + "_" + TestHelper.getTestMethodName();
        System.out.println("START " + str + " at " + new Date(System.currentTimeMillis()));
        TestHelper.setupCluster(str, ZkTestBase.ZK_ADDR, 12918, "localhost", WorkflowGenerator.DEFAULT_TGT_DB, 1, 10, 5, 3, "MasterSlave", true);
        MockParticipantManager mockParticipantManager = new MockParticipantManager(ZkTestBase.ZK_ADDR, str, "localhost_12918");
        mockParticipantManager.syncStart();
        String sessionId = mockParticipantManager.getSessionId();
        for (int i = 0; i < 3; i++) {
            ZkTestHelper.expireSession(mockParticipantManager.getZkClient());
            String sessionId2 = mockParticipantManager.getSessionId();
            Assert.assertTrue(sessionId2.compareTo(sessionId) > 0, "Session id should be increased after expiry");
            sessionId = sessionId2;
            Assert.assertFalse(sessionId2.equals("0"), "Hit race condition in zhclient.handleNewSession(). sessionId is not returned yet.");
        }
        System.out.println("Disconnecting ...");
        mockParticipantManager.syncStop();
        deleteCluster(str);
        System.out.println("END " + str + " at " + new Date(System.currentTimeMillis()));
    }

    @Test(dependsOnMethods = {"testHandleNewSession"})
    public void testAcquireLeadershipOnNewSession() throws Exception {
        String str = "CLUSTER_" + getShortClassName() + "_testAcquireLeadershipOnNewSession";
        ZKHelixDataAccessor zKHelixDataAccessor = new ZKHelixDataAccessor(str, new ZkBaseDataAccessor(_gZkClient));
        PropertyKey.Builder keyBuilder = zKHelixDataAccessor.keyBuilder();
        TestHelper.setupEmptyCluster(_gZkClient, str);
        BlockingHandleNewSessionZkHelixManager blockingHandleNewSessionZkHelixManager = new BlockingHandleNewSessionZkHelixManager(str, "controller_0", InstanceType.CONTROLLER, ZkTestBase.ZK_ADDR);
        new DistributedLeaderElection(blockingHandleNewSessionZkHelixManager, new GenericHelixController(), Collections.EMPTY_LIST);
        blockingHandleNewSessionZkHelixManager.connect();
        Assert.assertTrue(TestHelper.verify(() -> {
            LiveInstance property = zKHelixDataAccessor.getProperty(keyBuilder.controllerLeader());
            return property != null && "controller_0".equals(property.getInstanceName()) && blockingHandleNewSessionZkHelixManager.getSessionId().equals(property.getEphemeralOwner());
        }, 1000L));
        String sessionId = blockingHandleNewSessionZkHelixManager.getSessionId();
        long creationTime = zKHelixDataAccessor.getProperty(keyBuilder.controllerLeader()).getStat().getCreationTime();
        int size = blockingHandleNewSessionZkHelixManager.getHandlers().size();
        blockingHandleNewSessionZkHelixManager._zkclient.getEventLock().lockInterruptibly();
        zKHelixDataAccessor.removeProperty(keyBuilder.controllerLeader());
        ZkTestHelper.asyncExpireSession(blockingHandleNewSessionZkHelixManager._zkclient);
        Assert.assertTrue(TestHelper.verify(() -> {
            return !blockingHandleNewSessionZkHelixManager._zkclient.getConnection().getZookeeperState().isAlive();
        }, 3000L));
        blockingHandleNewSessionZkHelixManager._zkclient.getEventLock().unlock();
        Assert.assertTrue(TestHelper.verify(() -> {
            try {
                return !Long.toHexString(blockingHandleNewSessionZkHelixManager._zkclient.getSessionId()).equals(sessionId);
            } catch (HelixException e) {
                return false;
            }
        }, 2000L));
        Assert.assertEquals(blockingHandleNewSessionZkHelixManager.getSessionId(), sessionId);
        Assert.assertTrue(TestHelper.verify(() -> {
            LiveInstance property = zKHelixDataAccessor.getProperty(keyBuilder.controllerLeader());
            return (property == null || property.getStat().getCreationTime() == creationTime || !property.getSessionId().equals(sessionId)) ? false : true;
        }, 2000L));
        Assert.assertFalse(blockingHandleNewSessionZkHelixManager.isLeader());
        blockingHandleNewSessionZkHelixManager.proceedNewSessionHandling();
        Assert.assertTrue(TestHelper.verify(() -> {
            return blockingHandleNewSessionZkHelixManager.isLeader();
        }, 1000L));
        Assert.assertTrue(TestHelper.verify(() -> {
            return blockingHandleNewSessionZkHelixManager.getHandlers().size() == size;
        }, 3000L));
        Assert.assertTrue(blockingHandleNewSessionZkHelixManager.getHandlers().stream().allMatch(callbackHandler -> {
            return callbackHandler.isReady();
        }));
        blockingHandleNewSessionZkHelixManager.disconnect();
        TestHelper.dropCluster(str, _gZkClient);
    }

    @Test(timeOut = 300000)
    public void testDiscardExpiredSessions() throws Exception {
        String str = _className + "_" + TestHelper.getTestMethodName();
        ZKHelixDataAccessor zKHelixDataAccessor = new ZKHelixDataAccessor(str, new ZkBaseDataAccessor(ZkTestBase.ZK_ADDR));
        PropertyKey.Builder keyBuilder = zKHelixDataAccessor.keyBuilder();
        TestHelper.setupCluster(str, ZkTestBase.ZK_ADDR, 12918, "localhost", WorkflowGenerator.DEFAULT_TGT_DB, 1, 10, 5, 3, "MasterSlave", true);
        BlockingHandleNewSessionZkHelixManager blockingHandleNewSessionZkHelixManager = new BlockingHandleNewSessionZkHelixManager(str, "localhost_12918", InstanceType.PARTICIPANT, ZkTestBase.ZK_ADDR);
        blockingHandleNewSessionZkHelixManager.connect();
        String sessionId = blockingHandleNewSessionZkHelixManager.getSessionId();
        LiveInstance property = zKHelixDataAccessor.getProperty(keyBuilder.liveInstance("localhost_12918"));
        long creationTime = property.getStat().getCreationTime();
        Assert.assertNotNull(property);
        Assert.assertEquals(property.getEphemeralOwner(), sessionId);
        int size = blockingHandleNewSessionZkHelixManager.getHandlers().size();
        long handleNewSessionStartTime = blockingHandleNewSessionZkHelixManager.getHandleNewSessionStartTime();
        for (int i = 0; i < 2; i++) {
            String hexSessionId = ZKUtil.toHexSessionId(blockingHandleNewSessionZkHelixManager.getZkClient().getSessionId());
            try {
                blockingHandleNewSessionZkHelixManager.getZkClient().getEventLock().lockInterruptibly();
                ZkTestHelper.asyncExpireSession(blockingHandleNewSessionZkHelixManager.getZkClient());
                Assert.assertTrue(TestHelper.verify(() -> {
                    return !blockingHandleNewSessionZkHelixManager.getZkClient().getConnection().getZookeeperState().isAlive();
                }, 3000L));
                blockingHandleNewSessionZkHelixManager.getZkClient().getEventLock().unlock();
                Assert.assertTrue(TestHelper.verify(() -> {
                    try {
                        String hexSessionId2 = ZKUtil.toHexSessionId(blockingHandleNewSessionZkHelixManager.getZkClient().getSessionId());
                        if (!"0".equals(hexSessionId2)) {
                            if (!hexSessionId2.equals(hexSessionId)) {
                                return true;
                            }
                        }
                        return false;
                    } catch (HelixException e) {
                        return false;
                    }
                }, 2000L));
                Assert.assertEquals(blockingHandleNewSessionZkHelixManager.getHandleNewSessionStartTime(), handleNewSessionStartTime);
            } catch (Throwable th) {
                blockingHandleNewSessionZkHelixManager.getZkClient().getEventLock().unlock();
                throw th;
            }
        }
        for (int i2 = 0; i2 < 2; i2++) {
            Assert.assertNull(zKHelixDataAccessor.getProperty(keyBuilder.liveInstance("localhost_12918")));
            long handleNewSessionEndTime = blockingHandleNewSessionZkHelixManager.getHandleNewSessionEndTime();
            blockingHandleNewSessionZkHelixManager.proceedNewSessionHandling();
            Assert.assertTrue(TestHelper.verify(() -> {
                return blockingHandleNewSessionZkHelixManager.getHandleNewSessionEndTime() > handleNewSessionEndTime;
            }, 2000L));
        }
        String hexSessionId2 = ZKUtil.toHexSessionId(blockingHandleNewSessionZkHelixManager.getZkClient().getSessionId());
        Assert.assertTrue(TestHelper.verify(() -> {
            LiveInstance property2 = zKHelixDataAccessor.getProperty(keyBuilder.liveInstance("localhost_12918"));
            return (property2 == null || property2.getStat().getCreationTime() == creationTime || !property2.getEphemeralOwner().equals(hexSessionId2)) ? false : true;
        }, 2000L));
        Assert.assertTrue(TestHelper.verify(() -> {
            return blockingHandleNewSessionZkHelixManager.getHandlers().size() == size;
        }, 1000L));
        Assert.assertTrue(blockingHandleNewSessionZkHelixManager.getHandlers().stream().allMatch((v0) -> {
            return v0.isReady();
        }));
        blockingHandleNewSessionZkHelixManager.disconnect();
        deleteCluster(str);
    }

    @Test
    public void testSessionExpiredWhenResetHandlers() throws Exception {
        String str = _className + "_" + TestHelper.getTestMethodName();
        ZKHelixDataAccessor zKHelixDataAccessor = new ZKHelixDataAccessor(str, new ZkBaseDataAccessor(ZkTestBase.ZK_ADDR));
        PropertyKey.Builder keyBuilder = zKHelixDataAccessor.keyBuilder();
        TestHelper.setupCluster(str, ZkTestBase.ZK_ADDR, 12918, "localhost", WorkflowGenerator.DEFAULT_TGT_DB, 1, 10, 5, 3, "MasterSlave", true);
        BlockingResetHandlersZkHelixManager blockingResetHandlersZkHelixManager = new BlockingResetHandlersZkHelixManager(str, "localhost_12918", InstanceType.PARTICIPANT, ZkTestBase.ZK_ADDR);
        blockingResetHandlersZkHelixManager.connect();
        String sessionId = blockingResetHandlersZkHelixManager.getSessionId();
        long resetHandlersStartTime = blockingResetHandlersZkHelixManager.getResetHandlersStartTime();
        LiveInstance property = zKHelixDataAccessor.getProperty(keyBuilder.liveInstance("localhost_12918"));
        Assert.assertNotNull(property);
        Assert.assertEquals(property.getEphemeralOwner(), sessionId);
        int size = blockingResetHandlersZkHelixManager.getHandlers().size();
        long creationTime = property.getStat().getCreationTime();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        new Thread(() -> {
            try {
                TestHelper.verify(() -> {
                    return !blockingResetHandlersZkHelixManager.getSessionId().equals(sessionId) && blockingResetHandlersZkHelixManager.getResetHandlersStartTime() > resetHandlersStartTime;
                }, 3000L);
                String sessionId2 = blockingResetHandlersZkHelixManager.getSessionId();
                long resetHandlersStartTime2 = blockingResetHandlersZkHelixManager.getResetHandlersStartTime();
                blockingResetHandlersZkHelixManager.getZkClient().getEventLock().lockInterruptibly();
                try {
                    ZkTestHelper.asyncExpireSession(blockingResetHandlersZkHelixManager.getZkClient());
                    TestHelper.verify(() -> {
                        return !ZKUtil.toHexSessionId(blockingResetHandlersZkHelixManager.getZkClient().getSessionId()).equals(sessionId2);
                    }, 3000L);
                    blockingResetHandlersZkHelixManager.getZkClient().getEventLock().unlock();
                } catch (Exception e) {
                    blockingResetHandlersZkHelixManager.getZkClient().getEventLock().unlock();
                } catch (Throwable th) {
                    blockingResetHandlersZkHelixManager.getZkClient().getEventLock().unlock();
                    throw th;
                }
                blockingResetHandlersZkHelixManager.proceedResetHandlers();
                TestHelper.verify(() -> {
                    return !blockingResetHandlersZkHelixManager.getSessionId().equals(sessionId2) && blockingResetHandlersZkHelixManager.getResetHandlersStartTime() > resetHandlersStartTime2;
                }, 3000L);
                countDownLatch.countDown();
                countDownLatch2.await();
                blockingResetHandlersZkHelixManager.proceedResetHandlers();
                String hexSessionId = ZKUtil.toHexSessionId(blockingResetHandlersZkHelixManager.getZkClient().getSessionId());
                TestHelper.verify(() -> {
                    LiveInstance property2 = zKHelixDataAccessor.getProperty(keyBuilder.liveInstance("localhost_12918"));
                    return (property2 == null || property2.getStat().getCreationTime() == creationTime || !property2.getEphemeralOwner().equals(hexSessionId)) ? false : true;
                }, 2000L);
            } catch (Exception e2) {
            }
            countDownLatch.countDown();
        }).start();
        blockingResetHandlersZkHelixManager.getZkClient().getEventLock().lockInterruptibly();
        try {
            ZkTestHelper.asyncExpireSession(blockingResetHandlersZkHelixManager.getZkClient());
            Assert.assertTrue(TestHelper.verify(() -> {
                return !blockingResetHandlersZkHelixManager.getZkClient().getConnection().getZookeeperState().isAlive();
            }, 3000L));
            blockingResetHandlersZkHelixManager.getZkClient().getEventLock().unlock();
            countDownLatch.await();
            Assert.assertNull(zKHelixDataAccessor.getProperty(keyBuilder.liveInstance("localhost_12918")));
            countDownLatch2.countDown();
            countDownLatch.await();
            String hexSessionId = ZKUtil.toHexSessionId(blockingResetHandlersZkHelixManager.getZkClient().getSessionId());
            Assert.assertTrue(TestHelper.verify(() -> {
                LiveInstance property2 = zKHelixDataAccessor.getProperty(keyBuilder.liveInstance("localhost_12918"));
                return (property2 == null || property2.getStat().getCreationTime() == creationTime || !property2.getEphemeralOwner().equals(hexSessionId)) ? false : true;
            }, 2000L));
            Assert.assertTrue(TestHelper.verify(() -> {
                return blockingResetHandlersZkHelixManager.getHandlers().size() == size;
            }, 1000L));
            Assert.assertTrue(TestHelper.verify(() -> {
                return blockingResetHandlersZkHelixManager.getHandlers().stream().allMatch((v0) -> {
                    return v0.isReady();
                });
            }, 3000L));
            blockingResetHandlersZkHelixManager.disconnect();
            deleteCluster(str);
        } catch (Throwable th) {
            blockingResetHandlersZkHelixManager.getZkClient().getEventLock().unlock();
            throw th;
        }
    }

    @Test
    public void testConcurrentInitCallbackHandlers() throws Exception {
        String str = "CLUSTER_" + _className + "_" + TestHelper.getTestMethodName();
        TestHelper.setupEmptyCluster(_gZkClient, str);
        try {
            BlockingHandleNewSessionZkHelixManager blockingHandleNewSessionZkHelixManager = new BlockingHandleNewSessionZkHelixManager(str, TestHelper.getTestMethodName() + "Spectator", InstanceType.SPECTATOR, _gZkClient.getServers());
            blockingHandleNewSessionZkHelixManager.connect();
            blockingHandleNewSessionZkHelixManager.addLiveInstanceChangeListener(new MockLiveInstanceChangeListener(blockingHandleNewSessionZkHelixManager, Collections.singleton("localhost_1")));
            blockingHandleNewSessionZkHelixManager.addLiveInstanceChangeListener(new MockLiveInstanceChangeListener(blockingHandleNewSessionZkHelixManager, Collections.singleton("localhost_2")));
            ZkTestHelper.asyncExpireSession(blockingHandleNewSessionZkHelixManager.getZkClient());
            setupLiveInstances(str, new int[]{1, 2});
            blockingHandleNewSessionZkHelixManager.proceedNewSessionHandling();
            TestHelper.verify(() -> {
                return blockingHandleNewSessionZkHelixManager.getHandleNewSessionEndTime() != 0;
            }, 3000L);
            Iterator<CallbackHandler> it = blockingHandleNewSessionZkHelixManager.getHandlers().iterator();
            while (it.hasNext()) {
                Assert.assertTrue(it.next().isReady(), "CallbackHandler is not initialized as expected. It might be caused by a ConcurrentModificationException");
            }
        } finally {
            TestHelper.dropCluster(str, _gZkClient);
        }
    }
}
