/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.storage.hbase.util;

import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.job.lock.JobLock;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZookeeperJobLock
implements JobLock {
    private Logger logger = LoggerFactory.getLogger(ZookeeperJobLock.class);
    private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
    private String scheduleID;
    private InterProcessMutex sharedLock;
    private CuratorFramework zkClient;

    @Override
    public boolean lock() {
        this.scheduleID = this.schedulerId();
        String zkConnectString = this.getZKConnectString();
        this.logger.info("zk connection string:" + zkConnectString);
        this.logger.info("schedulerId:" + this.scheduleID);
        if (StringUtils.isEmpty((String)zkConnectString)) {
            throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
        }
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
        this.zkClient = CuratorFrameworkFactory.newClient((String)zkConnectString, (RetryPolicy)retryPolicy);
        this.zkClient.start();
        this.sharedLock = new InterProcessMutex(this.zkClient, this.scheduleID);
        boolean hasLock = false;
        try {
            hasLock = this.sharedLock.acquire(3L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            this.logger.warn("error acquire lock", (Throwable)e);
        }
        if (!hasLock) {
            this.logger.warn("fail to acquire lock, scheduler has not been started");
            this.zkClient.close();
            return false;
        }
        return true;
    }

    @Override
    public void unlock() {
        this.releaseLock();
    }

    private String getZKConnectString() {
        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
        String serverList = conf.get("hbase.zookeeper.quorum");
        final String port = conf.get("hbase.zookeeper.property.clientPort");
        return org.apache.commons.lang3.StringUtils.join((Iterable)Iterables.transform(Arrays.asList(serverList.split(",")), (Function)new Function<String, String>(){

            @Nullable
            public String apply(String input) {
                return input + ":" + port;
            }
        }), (String)",");
    }

    private void releaseLock() {
        try {
            if (this.zkClient.getState().equals((Object)CuratorFrameworkState.STARTED) && this.zkClient.checkExists().forPath(this.scheduleID) != null) {
                this.zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(this.scheduleID);
            }
        }
        catch (Exception e) {
            this.logger.error("error release lock:" + this.scheduleID);
            throw new RuntimeException(e);
        }
    }

    private String schedulerId() {
        return "/kylin/job_engine/lock/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
    }
}

