/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.storage.hbase.util;

import java.io.Closeable;
import java.nio.charset.Charset;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.lock.DistributedLock;
import org.apache.kylin.common.lock.DistributedLockFactory;
import org.apache.kylin.job.lock.JobLock;
import org.apache.kylin.storage.hbase.util.ZookeeperAclBuilder;
import org.apache.kylin.storage.hbase.util.ZookeeperUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZookeeperDistributedLock
implements DistributedLock,
JobLock {
    private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedLock.class);
    final CuratorFramework curator;
    final String zkPathBase;
    final String client;
    final byte[] clientBytes;

    private ZookeeperDistributedLock(CuratorFramework curator, String zkPathBase, String client) {
        if (client == null) {
            throw new NullPointerException("client must not be null");
        }
        if (zkPathBase == null) {
            throw new NullPointerException("zkPathBase must not be null");
        }
        this.curator = curator;
        this.zkPathBase = zkPathBase;
        this.client = client;
        this.clientBytes = client.getBytes(Charset.forName("UTF-8"));
    }

    @Override
    public String getClient() {
        return this.client;
    }

    @Override
    public boolean lock(String lockPath) {
        lockPath = this.norm(lockPath);
        logger.debug(this.client + " trying to lock " + lockPath);
        try {
            ((ACLBackgroundPathAndBytesable)this.curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath(lockPath, this.clientBytes);
        }
        catch (KeeperException.NodeExistsException ex) {
            logger.debug(this.client + " see " + lockPath + " is already locked");
        }
        catch (Exception ex) {
            throw new RuntimeException("Error while " + this.client + " trying to lock " + lockPath, ex);
        }
        String lockOwner = this.peekLock(lockPath);
        if (this.client.equals(lockOwner)) {
            logger.info(this.client + " acquired lock at " + lockPath);
            return true;
        }
        logger.debug(this.client + " failed to acquire lock at " + lockPath + ", which is held by " + lockOwner);
        return false;
    }

    @Override
    public boolean lock(String lockPath, long timeout) {
        if (this.lock(lockPath = this.norm(lockPath))) {
            return true;
        }
        if (timeout <= 0L) {
            timeout = Long.MAX_VALUE;
        }
        logger.debug(this.client + " will wait for lock path " + lockPath);
        long waitStart = System.currentTimeMillis();
        Random random = new Random();
        long sleep = 10000L;
        while (System.currentTimeMillis() - waitStart <= timeout) {
            try {
                Thread.sleep((long)(1000.0 + (double)sleep * random.nextDouble()));
            }
            catch (InterruptedException e) {
                return false;
            }
            if (!this.lock(lockPath)) continue;
            logger.debug(this.client + " waited " + (System.currentTimeMillis() - waitStart) + " ms for lock path " + lockPath);
            return true;
        }
        return false;
    }

    @Override
    public String peekLock(String lockPath) {
        lockPath = this.norm(lockPath);
        try {
            byte[] bytes = (byte[])this.curator.getData().forPath(lockPath);
            return new String(bytes, Charset.forName("UTF-8"));
        }
        catch (KeeperException.NoNodeException ex) {
            return null;
        }
        catch (Exception ex) {
            throw new RuntimeException("Error while peeking at " + lockPath, ex);
        }
    }

    @Override
    public boolean isLocked(String lockPath) {
        return this.peekLock(lockPath) != null;
    }

    @Override
    public boolean isLockedByMe(String lockPath) {
        return this.client.equals(this.peekLock(lockPath));
    }

    @Override
    public void unlock(String lockPath) {
        lockPath = this.norm(lockPath);
        logger.debug(this.client + " trying to unlock " + lockPath);
        String owner = this.peekLock(lockPath);
        if (owner == null) {
            throw new IllegalStateException(this.client + " cannot unlock path " + lockPath + " which is not locked currently");
        }
        if (!this.client.equals(owner)) {
            throw new IllegalStateException(this.client + " cannot unlock path " + lockPath + " which is locked by " + owner);
        }
        try {
            this.curator.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath);
            logger.info(this.client + " released lock at " + lockPath);
        }
        catch (Exception ex) {
            throw new RuntimeException("Error while " + this.client + " trying to unlock " + lockPath, ex);
        }
    }

    @Override
    public void purgeLocks(String lockPathRoot) {
        lockPathRoot = this.norm(lockPathRoot);
        try {
            this.curator.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPathRoot);
            logger.info(this.client + " purged all locks under " + lockPathRoot);
        }
        catch (Exception ex) {
            throw new RuntimeException("Error while " + this.client + " trying to purge " + lockPathRoot, ex);
        }
    }

    @Override
    public Closeable watchLocks(String lockPathRoot, Executor executor, final DistributedLock.Watcher watcher) {
        lockPathRoot = this.norm(lockPathRoot);
        PathChildrenCache cache = new PathChildrenCache(this.curator, lockPathRoot, true);
        try {
            cache.start();
            cache.getListenable().addListener((Object)new PathChildrenCacheListener(){

                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                    switch (event.getType()) {
                        case CHILD_ADDED: {
                            watcher.onLock(event.getData().getPath(), new String(event.getData().getData(), Charset.forName("UTF-8")));
                            break;
                        }
                        case CHILD_REMOVED: {
                            watcher.onUnlock(event.getData().getPath(), new String(event.getData().getData(), Charset.forName("UTF-8")));
                            break;
                        }
                    }
                }
            }, executor);
        }
        catch (Exception ex) {
            logger.error("Error to watch lock path " + lockPathRoot, (Throwable)ex);
        }
        return cache;
    }

    private String norm(String lockPath) {
        if (!lockPath.startsWith(this.zkPathBase)) {
            lockPath = this.zkPathBase + (lockPath.startsWith("/") ? "" : "/") + lockPath;
        }
        return ZookeeperDistributedLock.fixSlash(lockPath);
    }

    private static String fixSlash(String path) {
        if (!path.startsWith("/")) {
            path = "/" + path;
        }
        if (path.endsWith("/")) {
            path = path.substring(0, path.length() - 1);
        }
        int n = Integer.MAX_VALUE;
        while (n > path.length()) {
            n = path.length();
            path = path.replace("//", "/");
        }
        return path;
    }

    public boolean lockJobEngine() {
        String path = this.jobEngineLockPath();
        return this.lock(path, 3000L);
    }

    public void unlockJobEngine() {
        this.unlock(this.jobEngineLockPath());
    }

    private String jobEngineLockPath() {
        return "/job_engine/global_job_engine_lock";
    }

    public static class Factory
    extends DistributedLockFactory {
        private static final ConcurrentMap<KylinConfig, CuratorFramework> CACHE = new ConcurrentHashMap<KylinConfig, CuratorFramework>();
        final String zkPathBase;
        final CuratorFramework curator;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        private static CuratorFramework getZKClient(KylinConfig config) {
            CuratorFramework zkClient = (CuratorFramework)CACHE.get(config);
            if (zkClient != null) return zkClient;
            Class<ZookeeperDistributedLock> clazz = ZookeeperDistributedLock.class;
            synchronized (ZookeeperDistributedLock.class) {
                zkClient = (CuratorFramework)CACHE.get(config);
                if (zkClient != null) return zkClient;
                ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
                String zkConnectString = Factory.getZKConnectString(config);
                ZookeeperAclBuilder zookeeperAclBuilder = new ZookeeperAclBuilder().invoke();
                zkClient = zookeeperAclBuilder.setZKAclBuilder(CuratorFrameworkFactory.builder()).connectString(zkConnectString).sessionTimeoutMs(120000).connectionTimeoutMs(15000).retryPolicy((RetryPolicy)retryPolicy).build();
                zkClient.start();
                CACHE.put(config, zkClient);
                if (CACHE.size() <= 1) return zkClient;
                logger.warn("More than one singleton exist");
                // ** MonitorExit[var2_2] (shouldn't be in output)
                return zkClient;
            }
        }

        private static String getZKConnectString(KylinConfig config) {
            return ZookeeperUtil.getZKConnectString();
        }

        public Factory() {
            this(KylinConfig.getInstanceFromEnv());
        }

        public Factory(KylinConfig config) {
            this.curator = Factory.getZKClient(config);
            this.zkPathBase = ZookeeperDistributedLock.fixSlash(config.getZookeeperBasePath() + "/" + config.getMetadataUrlPrefix());
        }

        @Override
        public DistributedLock lockForClient(String client) {
            return new ZookeeperDistributedLock(this.curator, this.zkPathBase, client);
        }

        static {
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){

                @Override
                public void run() {
                    for (CuratorFramework curator : CACHE.values()) {
                        try {
                            curator.close();
                        }
                        catch (Exception ex) {
                            logger.error("Error at closing " + curator, (Throwable)ex);
                        }
                    }
                }
            }));
        }
    }
}

