/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.job;

import com.google.common.io.Files;
import java.io.File;
import java.nio.charset.Charset;
import org.apache.commons.io.FileUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.common.util.ZKUtil;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DistributedScheduler;
import org.apache.kylin.job.lock.JobLock;
import org.apache.kylin.job.lock.zookeeper.ZookeeperDistributedLock;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseTestDistributedScheduler
extends HBaseMetadataTestCase {
    static ExecutableManager execMgr;
    static DistributedScheduler scheduler1;
    static DistributedScheduler scheduler2;
    static ZookeeperDistributedLock jobLock1;
    static ZookeeperDistributedLock jobLock2;
    static KylinConfig kylinConfig1;
    static KylinConfig kylinConfig2;
    static CuratorFramework zkClient;
    static File localMetaDir;
    static String backup;
    static final String jobId1;
    static final String jobId2;
    static final String serverName1 = "serverName1";
    static final String serverName2 = "serverName2";
    static final String confDstPath1 = "target/kylin_metadata_dist_lock_test1/kylin.properties";
    static final String confDstPath2 = "target/kylin_metadata_dist_lock_test2/kylin.properties";
    private static final Logger logger;

    @BeforeClass
    public static void setup() throws Exception {
        BaseTestDistributedScheduler.staticCreateTestMetadata();
        new File(confDstPath1).getParentFile().mkdirs();
        new File(confDstPath2).getParentFile().mkdirs();
        KylinConfig srcConfig = KylinConfig.getInstanceFromEnv();
        localMetaDir = Files.createTempDir();
        backup = srcConfig.getMetadataUrl().toString();
        srcConfig.setProperty("kylin.metadata.url", localMetaDir.getAbsolutePath());
        srcConfig.exportToFile(new File(confDstPath1));
        srcConfig.exportToFile(new File(confDstPath2));
        kylinConfig1 = KylinConfig.createInstanceFromUri((String)new File(confDstPath1).getAbsolutePath());
        kylinConfig2 = KylinConfig.createInstanceFromUri((String)new File(confDstPath2).getAbsolutePath());
        BaseTestDistributedScheduler.initZk();
        ZookeeperDistributedLock.Factory factory = new ZookeeperDistributedLock.Factory(kylinConfig1);
        jobLock1 = (ZookeeperDistributedLock)factory.lockForClient(serverName1);
        jobLock2 = (ZookeeperDistributedLock)factory.lockForClient(serverName2);
        execMgr = ExecutableManager.getInstance((KylinConfig)kylinConfig1);
        for (String jobId : execMgr.getAllJobIds()) {
            execMgr.deleteJob(jobId);
        }
        scheduler1 = DistributedScheduler.getInstance((KylinConfig)kylinConfig1);
        scheduler1.init(new JobEngineConfig(kylinConfig1), (JobLock)jobLock1);
        if (!scheduler1.hasStarted()) {
            throw new RuntimeException("scheduler1 not started");
        }
        scheduler2 = DistributedScheduler.getInstance((KylinConfig)kylinConfig2);
        scheduler2.init(new JobEngineConfig(kylinConfig2), (JobLock)jobLock2);
        if (!scheduler2.hasStarted()) {
            throw new RuntimeException("scheduler2 not started");
        }
        Thread.sleep(10000L);
    }

    @AfterClass
    public static void after() throws Exception {
        jobLock1.purgeLocks("/job_engine/lock");
        if (scheduler1 != null) {
            scheduler1.shutdown();
            scheduler1 = null;
        }
        if (scheduler2 != null) {
            scheduler2.shutdown();
            scheduler2 = null;
        }
        if (zkClient != null) {
            zkClient.close();
            zkClient = null;
        }
        FileUtils.deleteDirectory((File)localMetaDir);
        System.clearProperty("kylin.metadata.url");
        BaseTestDistributedScheduler.staticCleanupTestMetadata();
    }

    void waitForJobFinish(String jobId) {
        AbstractExecutable job;
        ExecutableState status;
        while ((status = (job = execMgr.getJob(jobId)).getStatus()) != ExecutableState.SUCCEED && status != ExecutableState.ERROR && status != ExecutableState.STOPPED && status != ExecutableState.DISCARDED) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    void waitForJobStatus(String jobId, ExecutableState state, long interval) {
        AbstractExecutable job;
        while (state != (job = execMgr.getJob(jobId)).getStatus()) {
            try {
                Thread.sleep(interval);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    boolean lock(ZookeeperDistributedLock jobLock, String segName) {
        return jobLock.lock(DistributedScheduler.getLockPath((String)segName));
    }

    private static void initZk() {
        zkClient = ZKUtil.newZookeeperClient();
    }

    String getServerName(String segName) {
        String lockPath = DistributedScheduler.getLockPath((String)segName);
        String serverName = null;
        if (zkClient.getState().equals((Object)CuratorFrameworkState.STARTED)) {
            try {
                if (zkClient.checkExists().forPath(lockPath) != null) {
                    byte[] data = (byte[])zkClient.getData().forPath(lockPath);
                    serverName = new String(data, Charset.forName("UTF-8"));
                }
            }
            catch (Exception e) {
                logger.error("get the serverName failed", (Throwable)e);
            }
        }
        return serverName;
    }

    static {
        jobId1 = RandomUtil.randomUUID().toString();
        jobId2 = RandomUtil.randomUUID().toString();
        logger = LoggerFactory.getLogger(BaseTestDistributedScheduler.class);
    }
}

