package org.apache.activemq.artemis.quorum.zookeeper;

import com.google.common.base.Predicates;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.activemq.artemis.quorum.DistributedLock;
import org.apache.activemq.artemis.quorum.DistributedLockTest;
import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
import org.apache.activemq.artemis.quorum.UnavailableStateException;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingCluster;
import org.apache.curator.test.TestingZooKeeperServer;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/artemis/quorum/zookeeper/CuratorDistributedLockTest.class */
public class CuratorDistributedLockTest extends DistributedLockTest {
    private static final int BASE_SERVER_PORT = 6666;
    private static final int CONNECTION_MS = 2000;
    private static final int SESSION_MS = 6000;
    private static final int SERVER_TICK_MS = 2000;
    private static final int RETRIES_MS = 100;
    private static final int RETRIES = 1;
    private static final int ZK_NODES;

    @Parameterized.Parameter
    public int zkNodes;

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();
    private TestingCluster testingServer;
    private InstanceSpec[] clusterSpecs;
    private String connectString;

    @Parameterized.Parameters(name = "nodes={0}")
    public static Iterable<Object[]> getTestParameters() {
        return Arrays.asList(new Object[]{Integer.valueOf(ZK_NODES)});
    }

    @Override // org.apache.activemq.artemis.quorum.DistributedLockTest
    public void setupEnv() throws Throwable {
        this.clusterSpecs = new InstanceSpec[this.zkNodes];
        for (int i = 0; i < this.zkNodes; i += RETRIES) {
            this.clusterSpecs[i] = new InstanceSpec(this.tmpFolder.newFolder(), BASE_SERVER_PORT + i, -1, -1, true, -1, 2000, -1);
        }
        this.testingServer = new TestingCluster(this.clusterSpecs);
        this.testingServer.start();
        Wait.waitFor(this::ensembleHasLeader);
        this.connectString = this.testingServer.getConnectString();
        super.setupEnv();
    }

    @Override // org.apache.activemq.artemis.quorum.DistributedLockTest
    public void tearDownEnv() throws Throwable {
        super.tearDownEnv();
        this.testingServer.close();
    }

    @Override // org.apache.activemq.artemis.quorum.DistributedLockTest
    protected void configureManager(Map<String, String> map) {
        map.put("connect-string", this.connectString);
        map.put("session-ms", Integer.toString(SESSION_MS));
        map.put("connection-ms", Integer.toString(2000));
        map.put("retries", Integer.toString(RETRIES));
        map.put("retries-ms", Integer.toString(RETRIES_MS));
    }

    @Override // org.apache.activemq.artemis.quorum.DistributedLockTest
    protected String managerClassName() {
        return CuratorDistributedPrimitiveManager.class.getName();
    }

    @Test(expected = RuntimeException.class)
    public void cannotCreateManagerWithNotValidParameterNames() {
        createManagedDistributeManager(map -> {
        });
    }

    @Test
    public void canAcquireLocksFromDifferentNamespace() throws ExecutionException, InterruptedException, TimeoutException, UnavailableStateException {
        DistributedPrimitiveManager createManagedDistributeManager = createManagedDistributeManager(map -> {
        });
        createManagedDistributeManager.start();
        DistributedPrimitiveManager createManagedDistributeManager2 = createManagedDistributeManager(map2 -> {
        });
        createManagedDistributeManager2.start();
        Assert.assertTrue(createManagedDistributeManager.getDistributedLock("a").tryLock());
        Assert.assertTrue(createManagedDistributeManager2.getDistributedLock("a").tryLock());
    }

    @Test
    public void cannotStartManagerWithDisconnectedServer() throws IOException, ExecutionException, InterruptedException {
        DistributedPrimitiveManager createManagedDistributeManager = createManagedDistributeManager();
        this.testingServer.close();
        Assert.assertFalse(createManagedDistributeManager.start(1L, TimeUnit.SECONDS));
    }

    @Test(expected = UnavailableStateException.class)
    public void cannotAcquireLockWithDisconnectedServer() throws IOException, ExecutionException, InterruptedException, TimeoutException, UnavailableStateException {
        DistributedPrimitiveManager createManagedDistributeManager = createManagedDistributeManager();
        createManagedDistributeManager.start();
        DistributedLock distributedLock = createManagedDistributeManager.getDistributedLock("a");
        CountDownLatch countDownLatch = new CountDownLatch(RETRIES);
        countDownLatch.getClass();
        distributedLock.addListener(countDownLatch::countDown);
        this.testingServer.close();
        Assert.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
        distributedLock.tryLock();
    }

    @Test(expected = UnavailableStateException.class)
    public void cannotTryLockWithDisconnectedServer() throws IOException, ExecutionException, InterruptedException, TimeoutException, UnavailableStateException {
        DistributedPrimitiveManager createManagedDistributeManager = createManagedDistributeManager();
        createManagedDistributeManager.start();
        DistributedLock distributedLock = createManagedDistributeManager.getDistributedLock("a");
        this.testingServer.close();
        distributedLock.tryLock();
    }

    @Test(expected = UnavailableStateException.class)
    public void cannotCheckLockStatusWithDisconnectedServer() throws IOException, ExecutionException, InterruptedException, TimeoutException, UnavailableStateException {
        DistributedPrimitiveManager createManagedDistributeManager = createManagedDistributeManager();
        createManagedDistributeManager.start();
        DistributedLock distributedLock = createManagedDistributeManager.getDistributedLock("a");
        Assert.assertFalse(distributedLock.isHeldByCaller());
        Assert.assertTrue(distributedLock.tryLock());
        this.testingServer.close();
        distributedLock.isHeldByCaller();
    }

    @Test(expected = UnavailableStateException.class)
    public void looseLockAfterServerStop() throws ExecutionException, InterruptedException, TimeoutException, UnavailableStateException, IOException {
        DistributedPrimitiveManager createManagedDistributeManager = createManagedDistributeManager();
        createManagedDistributeManager.start();
        DistributedLock distributedLock = createManagedDistributeManager.getDistributedLock("a");
        Assert.assertTrue(distributedLock.tryLock());
        Assert.assertTrue(distributedLock.isHeldByCaller());
        CountDownLatch countDownLatch = new CountDownLatch(RETRIES);
        countDownLatch.getClass();
        distributedLock.addListener(countDownLatch::countDown);
        Assert.assertEquals(1L, countDownLatch.getCount());
        this.testingServer.close();
        Assert.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
        distributedLock.isHeldByCaller();
    }

    @Test
    public void canAcquireLockOnMajorityRestart() throws Exception {
        Assume.assumeThat(Integer.valueOf(this.zkNodes), Matchers.greaterThan(Integer.valueOf(RETRIES)));
        DistributedPrimitiveManager createManagedDistributeManager = createManagedDistributeManager();
        createManagedDistributeManager.start();
        DistributedLock distributedLock = createManagedDistributeManager.getDistributedLock("a");
        Assert.assertTrue(distributedLock.tryLock());
        Assert.assertTrue(distributedLock.isHeldByCaller());
        CountDownLatch countDownLatch = new CountDownLatch(RETRIES);
        countDownLatch.getClass();
        distributedLock.addListener(countDownLatch::countDown);
        Assert.assertEquals(1L, countDownLatch.getCount());
        this.testingServer.stop();
        countDownLatch.await();
        createManagedDistributeManager.stop();
        restartMajorityNodes(true);
        DistributedPrimitiveManager createManagedDistributeManager2 = createManagedDistributeManager();
        createManagedDistributeManager2.start();
        TimeUnit.MILLISECONDS.sleep(8000L);
        Assert.assertTrue(createManagedDistributeManager2.getDistributedLock("a").tryLock());
    }

    @Test
    public void cannotStartManagerWithoutQuorum() throws Exception {
        Assume.assumeThat(Integer.valueOf(this.zkNodes), Matchers.greaterThan(Integer.valueOf(RETRIES)));
        DistributedPrimitiveManager createManagedDistributeManager = createManagedDistributeManager();
        stopMajorityNotLeaderNodes(true);
        Assert.assertFalse(createManagedDistributeManager.start(2L, TimeUnit.SECONDS));
        Assert.assertFalse(createManagedDistributeManager.isStarted());
    }

    @Test(expected = UnavailableStateException.class)
    public void cannotAcquireLockWithoutQuorum() throws Exception {
        Assume.assumeThat(Integer.valueOf(this.zkNodes), Matchers.greaterThan(Integer.valueOf(RETRIES)));
        DistributedPrimitiveManager createManagedDistributeManager = createManagedDistributeManager();
        createManagedDistributeManager.start();
        stopMajorityNotLeaderNodes(true);
        createManagedDistributeManager.getDistributedLock("a").tryLock();
    }

    @Test
    public void cannotCheckLockWithoutQuorum() throws Exception {
        Assume.assumeThat(Integer.valueOf(this.zkNodes), Matchers.greaterThan(Integer.valueOf(RETRIES)));
        DistributedPrimitiveManager createManagedDistributeManager = createManagedDistributeManager();
        createManagedDistributeManager.start();
        stopMajorityNotLeaderNodes(true);
        try {
            Assert.assertFalse(createManagedDistributeManager.getDistributedLock("a").isHeldByCaller());
        } catch (UnavailableStateException e) {
        }
    }

    @Test
    public void canGetLockWithoutQuorum() throws Exception {
        Assume.assumeThat(Integer.valueOf(this.zkNodes), Matchers.greaterThan(Integer.valueOf(RETRIES)));
        DistributedPrimitiveManager createManagedDistributeManager = createManagedDistributeManager();
        createManagedDistributeManager.start();
        stopMajorityNotLeaderNodes(true);
        Assert.assertNotNull(createManagedDistributeManager.getDistributedLock("a"));
    }

    @Test
    public void notifiedAsUnavailableWhileLoosingQuorum() throws Exception {
        Assume.assumeThat(Integer.valueOf(this.zkNodes), Matchers.greaterThan(Integer.valueOf(RETRIES)));
        DistributedPrimitiveManager createManagedDistributeManager = createManagedDistributeManager();
        createManagedDistributeManager.start();
        DistributedLock distributedLock = createManagedDistributeManager.getDistributedLock("a");
        CountDownLatch countDownLatch = new CountDownLatch(RETRIES);
        countDownLatch.getClass();
        distributedLock.addListener(countDownLatch::countDown);
        stopMajorityNotLeaderNodes(true);
        Assert.assertTrue(countDownLatch.await(8000L, TimeUnit.MILLISECONDS));
    }

    @Test
    public void beNotifiedOnce() throws Exception {
        Assume.assumeThat(Integer.valueOf(this.zkNodes), Matchers.greaterThan(Integer.valueOf(RETRIES)));
        DistributedPrimitiveManager createManagedDistributeManager = createManagedDistributeManager();
        createManagedDistributeManager.start();
        DistributedLock distributedLock = createManagedDistributeManager.getDistributedLock("a");
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        atomicInteger.getClass();
        createManagedDistributeManager.addUnavailableManagerListener(atomicInteger::incrementAndGet);
        atomicInteger2.getClass();
        distributedLock.addListener(atomicInteger2::incrementAndGet);
        stopMajorityNotLeaderNodes(true);
        TimeUnit.MILLISECONDS.sleep(10000L);
        Assert.assertEquals(1L, atomicInteger2.get());
        Assert.assertEquals(1L, atomicInteger.get());
    }

    @Test
    public void beNotifiedOfUnavailabilityWhileBlockedOnTimedLock() throws Exception {
        Assume.assumeThat(Integer.valueOf(this.zkNodes), Matchers.greaterThan(Integer.valueOf(RETRIES)));
        DistributedPrimitiveManager createManagedDistributeManager = createManagedDistributeManager();
        createManagedDistributeManager.start();
        DistributedLock distributedLock = createManagedDistributeManager.getDistributedLock("a");
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        atomicInteger.getClass();
        createManagedDistributeManager.addUnavailableManagerListener(atomicInteger::incrementAndGet);
        atomicInteger2.getClass();
        distributedLock.addListener(atomicInteger2::incrementAndGet);
        DistributedPrimitiveManager createManagedDistributeManager2 = createManagedDistributeManager();
        createManagedDistributeManager2.start();
        Assert.assertTrue(createManagedDistributeManager2.getDistributedLock("a").tryLock());
        CountDownLatch countDownLatch = new CountDownLatch(RETRIES);
        AtomicReference atomicReference = new AtomicReference(null);
        new Thread(() -> {
            countDownLatch.countDown();
            try {
                distributedLock.tryLock(Long.MAX_VALUE, TimeUnit.DAYS);
                atomicReference.set(false);
            } catch (UnavailableStateException e) {
                atomicReference.set(true);
            } catch (InterruptedException e2) {
                atomicReference.set(false);
            }
        }).start();
        Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
        TimeUnit.SECONDS.sleep(1L);
        stopMajorityNotLeaderNodes(true);
        TimeUnit.MILLISECONDS.sleep(8000L);
        Wait.waitFor(() -> {
            return atomicInteger2.get() > 0;
        }, 2000L);
        Assert.assertEquals(1L, atomicInteger.get());
        Assert.assertEquals(Boolean.TRUE, atomicReference.get());
    }

    @Test
    public void beNotifiedOfAlreadyUnavailableManagerAfterAddingListener() throws Exception {
        Assume.assumeThat(Integer.valueOf(this.zkNodes), Matchers.greaterThan(Integer.valueOf(RETRIES)));
        DistributedPrimitiveManager createManagedDistributeManager = createManagedDistributeManager();
        createManagedDistributeManager.start();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        DistributedPrimitiveManager.UnavailableManagerListener unavailableManagerListener = () -> {
            atomicBoolean.set(true);
        };
        createManagedDistributeManager.addUnavailableManagerListener(unavailableManagerListener);
        Assert.assertFalse(atomicBoolean.get());
        stopMajorityNotLeaderNodes(true);
        atomicBoolean.getClass();
        Wait.waitFor(atomicBoolean::get);
        createManagedDistributeManager.removeUnavailableManagerListener(unavailableManagerListener);
        AtomicInteger atomicInteger = new AtomicInteger();
        atomicInteger.getClass();
        createManagedDistributeManager.addUnavailableManagerListener(atomicInteger::incrementAndGet);
        Assert.assertEquals(1L, atomicInteger.get());
        atomicInteger.set(0);
        DistributedLock distributedLock = createManagedDistributeManager.getDistributedLock("a");
        Throwable th = null;
        try {
            try {
                atomicInteger.getClass();
                distributedLock.addListener(atomicInteger::incrementAndGet);
                Assert.assertEquals(1L, atomicInteger.get());
                if (distributedLock != null) {
                    if (0 == 0) {
                        distributedLock.close();
                        return;
                    }
                    try {
                        distributedLock.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (distributedLock != null) {
                if (th != null) {
                    try {
                        distributedLock.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    distributedLock.close();
                }
            }
            throw th4;
        }
    }

    private boolean ensembleHasLeader() {
        return this.testingServer.getServers().stream().filter(CuratorDistributedLockTest::isLeader).count() != 0;
    }

    private static boolean isLeader(TestingZooKeeperServer testingZooKeeperServer) {
        if (testingZooKeeperServer.getInstanceSpecs().size() == RETRIES) {
            return true;
        }
        return testingZooKeeperServer.getQuorumPeer().getId() == testingZooKeeperServer.getQuorumPeer().getLeaderId();
    }

    private void stopMajorityNotLeaderNodes(boolean z) throws Exception {
        List list = (List) this.testingServer.getServers().stream().filter(Predicates.not(CuratorDistributedLockTest::isLeader)).collect(Collectors.toList());
        int i = (this.zkNodes / 2) + RETRIES;
        for (int i2 = 0; i2 < i; i2 += RETRIES) {
            ((TestingZooKeeperServer) list.get(z ? (list.size() - RETRIES) - i2 : i2)).stop();
        }
    }

    private void restartMajorityNodes(boolean z) throws Exception {
        int i = (this.zkNodes / 2) + RETRIES;
        for (int i2 = 0; i2 < i; i2 += RETRIES) {
            int i3 = z ? (this.zkNodes - RETRIES) - i2 : i2;
            if (!this.testingServer.restartServer(this.clusterSpecs[i3])) {
                throw new IllegalStateException("errored while restarting " + this.clusterSpecs[i3]);
            }
        }
    }

    static {
        ZK_NODES = Boolean.getBoolean("fast-tests") ? RETRIES : 3;
    }
}
