/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.storage.hbase;

import java.io.Closeable;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kylin.common.lock.DistributedLock;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.job.lock.zookeeper.ZookeeperDistributedLock;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ITZookeeperDistributedLockTest
extends HBaseMetadataTestCase {
    private static final Logger logger = LoggerFactory.getLogger(ITZookeeperDistributedLockTest.class);
    private static final String ZK_PFX = "/test/ZookeeperDistributedLockTest/" + new Random().nextInt(10000000);
    static ZookeeperDistributedLock.Factory factory;

    @BeforeClass
    public static void setup() throws Exception {
        ITZookeeperDistributedLockTest.staticCreateTestMetadata();
        factory = new ZookeeperDistributedLock.Factory();
    }

    @AfterClass
    public static void after() throws Exception {
        ITZookeeperDistributedLockTest.staticCleanupTestMetadata();
        factory.lockForCurrentProcess().purgeLocks(ZK_PFX);
    }

    @Test
    public void testBasic() {
        String path;
        DistributedLock l = factory.lockForCurrentThread();
        Assert.assertTrue((!l.isLocked(path = ZK_PFX + "/testBasic") ? 1 : 0) != 0);
        Assert.assertTrue((boolean)l.lock(path));
        Assert.assertTrue((boolean)l.lock(path));
        Assert.assertTrue((boolean)l.lock(path));
        Assert.assertEquals((Object)l.getClient(), (Object)l.peekLock(path));
        Assert.assertTrue((boolean)l.isLocked(path));
        Assert.assertTrue((boolean)l.isLockedByMe(path));
        l.unlock(path);
        Assert.assertTrue((!l.isLocked(path) ? 1 : 0) != 0);
    }

    @Test
    public void testErrorCases() {
        DistributedLock c = factory.lockForClient("client1");
        DistributedLock d = factory.lockForClient("client2");
        String path = ZK_PFX + "/testErrorCases";
        Assert.assertTrue((!c.isLocked(path) ? 1 : 0) != 0);
        Assert.assertTrue((d.peekLock(path) == null ? 1 : 0) != 0);
        Assert.assertTrue((boolean)c.lock(path));
        Assert.assertTrue((!d.lock(path) ? 1 : 0) != 0);
        Assert.assertTrue((d.isLocked(path) ? 1 : 0) != 0);
        Assert.assertEquals((Object)c.getClient(), (Object)d.peekLock(path));
        try {
            d.unlock(path);
            Assert.fail();
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        c.unlock(path);
        Assert.assertTrue((!d.isLocked(path) ? 1 : 0) != 0);
        d.lock(path);
        d.unlock(path);
    }

    @Test
    public void testLockTimeout() throws InterruptedException {
        DistributedLock c = factory.lockForClient("client1");
        final DistributedLock d = factory.lockForClient("client2");
        final String path = ZK_PFX + "/testLockTimeout";
        Assert.assertTrue((!c.isLocked(path) ? 1 : 0) != 0);
        Assert.assertTrue((d.peekLock(path) == null ? 1 : 0) != 0);
        Assert.assertTrue((boolean)c.lock(path));
        new Thread(){

            @Override
            public void run() {
                d.lock(path, 15000L);
            }
        }.start();
        c.unlock(path);
        Thread.sleep(20000L);
        Assert.assertTrue((boolean)c.isLocked(path));
        Assert.assertEquals((Object)d.getClient(), (Object)d.peekLock(path));
        d.unlock(path);
    }

    @Test
    public void testWatch() throws InterruptedException, IOException {
        int i;
        int i2;
        String base = ZK_PFX + "/testWatch";
        int nLocks = 4;
        String[] lockPaths = new String[4];
        for (int i3 = 0; i3 < 4; ++i3) {
            lockPaths[i3] = base + "/" + i3;
        }
        int[] clientIds = new int[]{2, 3, 5, 7, 11, 13, 17, 19, 23, 29};
        int nClients = clientIds.length;
        DistributedLock[] clients = new DistributedLock[nClients];
        for (int i4 = 0; i4 < nClients; ++i4) {
            clients[i4] = factory.lockForClient("" + clientIds[i4]);
        }
        DistributedLock lock = factory.lockForClient("");
        final AtomicInteger lockCounter = new AtomicInteger(0);
        final AtomicInteger unlockCounter = new AtomicInteger(0);
        final AtomicInteger scoreCounter = new AtomicInteger(0);
        Closeable watch = lock.watchLocks(base, (Executor)Executors.newFixedThreadPool(1), new DistributedLock.Watcher(){

            public void onLock(String lockPath, String client) {
                lockCounter.incrementAndGet();
                int cut = lockPath.lastIndexOf("/");
                int lockId = Integer.parseInt(lockPath.substring(cut + 1)) + 1;
                int clientId = Integer.parseInt(client);
                scoreCounter.addAndGet(lockId * clientId);
            }

            public void onUnlock(String lockPath, String client) {
                unlockCounter.incrementAndGet();
            }
        });
        ClientThread[] threads = new ClientThread[nClients];
        for (i2 = 0; i2 < nClients; ++i2) {
            DistributedLock client = clients[i2];
            threads[i2] = new ClientThread(client, lockPaths);
            threads[i2].start();
        }
        for (i2 = 0; i2 < nClients; ++i2) {
            threads[i2].join();
        }
        Thread.sleep(3000L);
        Assert.assertEquals((long)0L, (long)(lockCounter.get() - unlockCounter.get()));
        int clientSideScore = 0;
        int clientSideLocks = 0;
        for (i = 0; i < nClients; ++i) {
            clientSideScore += threads[i].scoreCounter;
            clientSideLocks += threads[i].lockCounter;
        }
        logger.info("client side locks is {} and watcher locks is {}", (Object)clientSideLocks, (Object)lockCounter.get());
        logger.info("client side score is {} and watcher score is {}", (Object)clientSideScore, (Object)scoreCounter.get());
        watch.close();
        for (i = 0; i < 4; ++i) {
            Assert.assertTrue((!lock.isLocked(lockPaths[i]) ? 1 : 0) != 0);
        }
    }

    class ClientThread
    extends Thread {
        DistributedLock client;
        String[] lockPaths;
        int nLocks;
        int lockCounter = 0;
        int scoreCounter = 0;

        ClientThread(DistributedLock client, String[] lockPaths) {
            this.client = client;
            this.lockPaths = lockPaths;
            this.nLocks = lockPaths.length;
        }

        @Override
        public void run() {
            long start = System.currentTimeMillis();
            Random rand = new Random();
            while (System.currentTimeMillis() - start <= 15000L) {
                boolean locked;
                try {
                    Thread.sleep(rand.nextInt(200));
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                int lockIdx = rand.nextInt(this.nLocks);
                if (!this.client.isLockedByMe(this.lockPaths[lockIdx]) && (locked = this.client.lock(this.lockPaths[lockIdx]))) {
                    ++this.lockCounter;
                    this.scoreCounter += (lockIdx + 1) * Integer.parseInt(this.client.getClient());
                }
                try {
                    lockIdx = rand.nextInt(this.nLocks);
                    this.client.unlock(this.lockPaths[lockIdx]);
                }
                catch (IllegalStateException illegalStateException) {}
            }
            for (String lockPath : this.lockPaths) {
                try {
                    this.client.unlock(lockPath);
                }
                catch (IllegalStateException illegalStateException) {
                    // empty catch block
                }
            }
        }
    }
}

