package org.apache.kylin.job.impl.curator;

import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.common.util.ZKUtil;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.impl.curator.CuratorScheduler;
import org.apache.kylin.shaded.com.google.common.base.Function;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/job/impl/curator/CuratorSchedulerTest.class */
public class CuratorSchedulerTest extends LocalFileMetadataTestCase {
    private static final Logger logger = LoggerFactory.getLogger(CuratorSchedulerTest.class);
    private TestingServer zkTestServer;
    protected ExecutableManager jobService;

    @Before
    public void setup() throws Exception {
        this.zkTestServer = new TestingServer();
        this.zkTestServer.start();
        System.setProperty("kylin.env.zookeeper-connect-string", this.zkTestServer.getConnectString());
        System.setProperty("kylin.server.mode", "query");
        createTestMetadata(new String[0]);
    }

    @After
    public void after() throws Exception {
        this.zkTestServer.close();
        cleanupTestMetadata();
        System.clearProperty("kylin.env.zookeeper-connect-string");
        System.clearProperty("kylin.server.host-address");
        System.clearProperty("kylin.server.cluster-servers");
        System.clearProperty("kylin.server.mode");
    }

    @Test
    public void test() throws Exception {
        String connectString = this.zkTestServer.getConnectString();
        ServiceDiscovery serviceDiscovery = null;
        CuratorFramework curatorFramework = null;
        try {
            CuratorScheduler.JsonInstanceSerializer jsonInstanceSerializer = new CuratorScheduler.JsonInstanceSerializer(LinkedHashMap.class);
            curatorFramework = ZKUtil.newZookeeperClient(connectString, new ExponentialBackoffRetry(3000, 3));
            serviceDiscovery = ServiceDiscoveryBuilder.builder(LinkedHashMap.class).client(curatorFramework).basePath("/service").serializer(jsonInstanceSerializer).build();
            serviceDiscovery.start();
            ExampleServer exampleServer = new ExampleServer("localhost:1111");
            ExampleServer exampleServer2 = new ExampleServer("localhost:2222");
            Collection queryForNames = serviceDiscovery.queryForNames();
            Assert.assertTrue(queryForNames.size() == 1);
            Assert.assertTrue("kylin".equals(queryForNames.iterator().next()));
            Collection queryForInstances = serviceDiscovery.queryForInstances("kylin");
            Assert.assertTrue(queryForInstances.size() == 2);
            List transform = Lists.transform(Lists.newArrayList(queryForInstances), new Function<ServiceInstance<LinkedHashMap>, String>() { // from class: org.apache.kylin.job.impl.curator.CuratorSchedulerTest.1
                @Nullable
                public String apply(@Nullable ServiceInstance<LinkedHashMap> serviceInstance) {
                    return (String) ((LinkedHashMap) serviceInstance.getPayload()).get("description");
                }
            });
            Assert.assertTrue(transform.contains(exampleServer.getAddress() + ":query"));
            Assert.assertTrue(transform.contains(exampleServer2.getAddress() + ":query"));
            exampleServer.close();
            Collection queryForInstances2 = serviceDiscovery.queryForInstances("kylin");
            Assert.assertTrue(queryForInstances2.size() == 1);
            Assert.assertEquals(exampleServer2.getAddress() + ":query", ((LinkedHashMap) ((ServiceInstance) queryForInstances2.iterator().next()).getPayload()).get("description"));
            exampleServer2.close();
            Assert.assertTrue(serviceDiscovery.queryForInstances("kylin").size() == 0);
            CloseableUtils.closeQuietly(serviceDiscovery);
            CloseableUtils.closeQuietly(curatorFramework);
        } catch (Throwable th) {
            CloseableUtils.closeQuietly(serviceDiscovery);
            CloseableUtils.closeQuietly(curatorFramework);
            throw th;
        }
    }
}
