package org.apache.kylin.common.lock.curator;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import org.apache.curator.test.TestingServer;
import org.apache.kylin.common.lock.DistributedLockFactoryTest;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.common.util.TestUtils;
import org.apache.kylin.junit.annotation.MetadataInfo;
import org.apache.kylin.shaded.curator.org.apache.curator.framework.CuratorFramework;
import org.apache.kylin.shaded.curator.org.apache.curator.framework.state.ConnectionState;
import org.apache.kylin.shaded.curator.org.apache.curator.framework.state.ConnectionStateListener;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.RetryingTest;
import org.springframework.test.util.ReflectionTestUtils;

@Disabled("TODO: re-run to check.")
@MetadataInfo(onlyProps = true)
/* loaded from: input_file:org/apache/kylin/common/lock/curator/CuratorDistributedLockFactoryTest.class */
class CuratorDistributedLockFactoryTest extends DistributedLockFactoryTest {
    private TestingServer zkTestServer;
    private volatile boolean locked = false;
    private volatile boolean isInterrupted = false;

    CuratorDistributedLockFactoryTest() {
    }

    @BeforeEach
    public void setup() throws Exception {
        this.zkTestServer = new TestingServer(true);
    }

    @AfterEach
    public void after() throws Exception {
        this.zkTestServer.close();
    }

    @Test
    public void testBasic() throws Exception {
        String str = "/test/distributed_lock_factory_test/test_basic/" + RandomUtil.randomUUIDStr();
        TestUtils.getTestConfig().setProperty("kylin.env.zookeeper-connect-string", this.zkTestServer.getConnectString());
        CuratorDistributedLock lockForCurrentThread = TestUtils.getTestConfig().getDistributedLockFactory().getLockForCurrentThread(str);
        Assert.assertFalse(lockForCurrentThread.isAcquiredInThisThread());
        lockForCurrentThread.lock();
        Assert.assertTrue(lockForCurrentThread.isAcquiredInThisThread());
        lockForCurrentThread.unlock();
        Assert.assertFalse(lockForCurrentThread.isAcquiredInThisThread());
    }

    @RetryingTest(3)
    public void testInterruptWhenLost() throws Exception {
        String str = "/test/distributed_lock_factory_test/test_interrupt_lost/" + RandomUtil.randomUUIDStr();
        TestingServer testingServer = new TestingServer(true);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        TestUtils.getTestConfig().setProperty("kylin.env.zookeeper-connect-string", this.zkTestServer.getConnectString());
        TestUtils.getTestConfig().setProperty("kap.env.zookeeper-max-retries", "1");
        TestUtils.getTestConfig().setProperty("kap.env.zookeeper-base-sleep-time", "1000");
        newFixedThreadPool.submit(() -> {
            CuratorDistributedLock curatorDistributedLock = null;
            try {
                curatorDistributedLock = (CuratorDistributedLock) TestUtils.getTestConfig().getDistributedLockFactory().getLockForCurrentThread(str);
            } catch (Exception e) {
                e.printStackTrace();
            }
            curatorDistributedLock.lock();
            this.locked = true;
            try {
                Thread.sleep(20000L);
            } catch (InterruptedException e2) {
                this.isInterrupted = true;
            }
        });
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.locked);
        });
        Assert.assertFalse(this.isInterrupted);
        this.locked = false;
        this.zkTestServer.stop();
        Awaitility.await().atMost(20L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.isInterrupted);
        });
        Assert.assertFalse(this.locked);
        TestUtils.getTestConfig().setProperty("kylin.env.zookeeper-connect-string", testingServer.getConnectString());
        newFixedThreadPool.submit(() -> {
            Lock lock = null;
            try {
                lock = TestUtils.getTestConfig().getDistributedLockFactory().getLockForCurrentThread(str);
            } catch (Exception e) {
                e.printStackTrace();
            }
            lock.lock();
            this.locked = true;
            lock.unlock();
        });
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.locked);
        });
    }

    @Test
    public void testInterruptWhenSuspended() throws Exception {
        String str = "/test/distributed_lock_factory_test/test_interrupt_suspended/" + RandomUtil.randomUUIDStr();
        new TestingServer(true);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        TestUtils.getTestConfig().setProperty("kylin.env.zookeeper-connect-string", this.zkTestServer.getConnectString());
        TestUtils.getTestConfig().setProperty("kap.env.zookeeper-max-retries", "1");
        TestUtils.getTestConfig().setProperty("kap.env.zookeeper-base-sleep-time", "1000");
        CuratorDistributedLockFactory distributedLockFactory = TestUtils.getTestConfig().getDistributedLockFactory();
        CuratorDistributedLock lockForCurrentThread = distributedLockFactory.getLockForCurrentThread(str);
        newFixedThreadPool.submit(() -> {
            lockForCurrentThread.lock();
            this.locked = true;
            try {
                Thread.sleep(20000L);
            } catch (InterruptedException e) {
                this.isInterrupted = true;
            }
        });
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.locked);
        });
        Assert.assertFalse(this.isInterrupted);
        ((ConnectionStateListener) ReflectionTestUtils.getField(distributedLockFactory, "listener")).stateChanged((CuratorFramework) ReflectionTestUtils.getField(distributedLockFactory, "client"), ConnectionState.SUSPENDED);
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.isInterrupted);
        });
    }

    @Test
    void testConcurrence() throws Exception {
        TestUtils.getTestConfig().setProperty("kylin.env.zookeeper-connect-string", this.zkTestServer.getConnectString());
        super.testConcurrence("/test/distributed_lock_factory_test/test_concurrence/" + RandomUtil.randomUUIDStr(), 10, 10);
    }
}
