package org.apache.gobblin.cluster;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.net.URL;
import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.Message;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"gobblin.cluster"})
/* loaded from: input_file:org/apache/gobblin/cluster/GobblinClusterManagerTest.class */
public class GobblinClusterManagerTest implements HelixMessageTestBase {
    public static final Logger LOG = LoggerFactory.getLogger(GobblinClusterManagerTest.class);
    public static final String HADOOP_OVERRIDE_PROPERTY_NAME = "prop";
    private TestingServer testingZKServer;
    private HelixManager helixManager;
    private GobblinClusterManager gobblinClusterManager;

    /* loaded from: input_file:org/apache/gobblin/cluster/GobblinClusterManagerTest$GetInstanceMessageNumFunc.class */
    static class GetInstanceMessageNumFunc implements Function<Void, Integer> {
        private final CuratorFramework curatorFramework;
        private final String testName;

        public GetInstanceMessageNumFunc(String str, CuratorFramework curatorFramework) {
            this.curatorFramework = curatorFramework;
            this.testName = str;
        }

        public Integer apply(Void r8) {
            try {
                return Integer.valueOf(((List) this.curatorFramework.getChildren().forPath(String.format("/%s/INSTANCES/%s/MESSAGES", this.testName, TestHelper.TEST_HELIX_INSTANCE_NAME))).size());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    @BeforeClass
    public void setUp() throws Exception {
        this.testingZKServer = new TestingServer(-1);
        LOG.info("Testing ZK Server listening on: " + this.testingZKServer.getConnectString());
        URL resource = GobblinClusterManagerTest.class.getClassLoader().getResource(GobblinClusterManager.class.getSimpleName() + ".conf");
        Assert.assertNotNull(resource, "Could not find resource " + resource);
        Config resolve = ConfigFactory.parseURL(resource).withValue("gobblin.cluster.zk.connection.string", ConfigValueFactory.fromAnyRef(this.testingZKServer.getConnectString())).withValue("gobblin.cluster.helixTaskQuotaConfig", ConfigValueFactory.fromAnyRef("DEFAULT:1,OTHER:10")).withValue("gobblin.cluster.hadoop.inject.prop", ConfigValueFactory.fromAnyRef("value")).withValue("gobblin.cluster.hadoop.inject.fs.file.impl.disable.cache", ConfigValueFactory.fromAnyRef("true")).resolve();
        String string = resolve.getString("gobblin.cluster.zk.connection.string");
        HelixUtils.createGobblinHelixCluster(string, resolve.getString("gobblin.cluster.helix.cluster.name"));
        this.helixManager = HelixManagerFactory.getZKHelixManager(resolve.getString("gobblin.cluster.helix.cluster.name"), TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.PARTICIPANT, string);
        this.helixManager.connect();
        this.helixManager.getMessagingService().registerMessageHandlerFactory("SHUTDOWN", new TestShutdownMessageHandlerFactory(this));
        this.gobblinClusterManager = new GobblinClusterManager(GobblinClusterManagerTest.class.getSimpleName(), "1", resolve, Optional.absent());
        this.gobblinClusterManager.getEventBus().register(this.gobblinClusterManager);
        this.gobblinClusterManager.connectHelixManager();
    }

    @Test
    public void testQuotaConfig() throws Exception {
        this.gobblinClusterManager.configureHelixQuotaBasedTaskScheduling();
        ClusterConfig clusterConfig = this.gobblinClusterManager.multiManager.getJobClusterHelixManager().getConfigAccessor().getClusterConfig(GobblinClusterManagerTest.class.getSimpleName());
        Assert.assertEquals(clusterConfig.getTaskQuotaRatio("DEFAULT"), "1");
        Assert.assertEquals(clusterConfig.getTaskQuotaRatio("OTHER"), "10");
    }

    @Test
    public void testSendShutdownRequest() throws Exception {
        Logger logger = LoggerFactory.getLogger("testSendShutdownRequest");
        Closer create = Closer.create();
        try {
            CuratorFramework createZkClient = TestHelper.createZkClient(this.testingZKServer, create);
            GetInstanceMessageNumFunc getInstanceMessageNumFunc = new GetInstanceMessageNumFunc(GobblinClusterManagerTest.class.getSimpleName(), createZkClient);
            AssertWithBackoff timeoutMs = AssertWithBackoff.create().logger(logger).timeoutMs(30000L);
            this.gobblinClusterManager.sendShutdownRequest();
            Assert.assertEquals(((Stat) createZkClient.checkExists().forPath(String.format("/%s/INSTANCES/%s/MESSAGES", GobblinClusterManagerTest.class.getSimpleName(), TestHelper.TEST_HELIX_INSTANCE_NAME))).getVersion(), 0);
            timeoutMs.assertEquals(getInstanceMessageNumFunc, 1, "1 message queued");
            timeoutMs.assertEquals(getInstanceMessageNumFunc, 0, "all messages processed");
            create.close();
        } catch (Throwable th) {
            create.close();
            throw th;
        }
    }

    @Test(dependsOnMethods = {"testSendShutdownRequest"})
    public void testHandleClusterManagerShutdownRequest() throws Exception {
        Logger logger = LoggerFactory.getLogger("testHandleClusterManagerShutdownRequest");
        this.gobblinClusterManager.getEventBus().post(new ClusterManagerShutdownRequest());
        AssertWithBackoff.create().logger(logger).timeoutMs(20000L).assertTrue(new Predicate<Void>() { // from class: org.apache.gobblin.cluster.GobblinClusterManagerTest.1
            public boolean apply(Void r3) {
                return !GobblinClusterManagerTest.this.gobblinClusterManager.isHelixManagerConnected();
            }
        }, "Cluster Manager shutdown");
    }

    @Test
    public void testBuildFileSystemConfig() {
        Assert.assertEquals(this.gobblinClusterManager.getFs().getConf().get("prop"), "value");
    }

    @AfterClass
    public void tearDown() throws Exception {
        try {
            if (this.helixManager.isConnected()) {
                this.helixManager.disconnect();
            }
            this.gobblinClusterManager.disconnectHelixManager();
        } catch (Throwable th) {
            Assert.fail();
        } finally {
            this.testingZKServer.close();
        }
    }

    @Override // org.apache.gobblin.cluster.HelixMessageTestBase
    @Test(enabled = false)
    public void assertMessageReception(Message message) {
        Assert.assertEquals(message.getMsgType(), "SHUTDOWN");
        Assert.assertEquals(message.getMsgSubType(), HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString());
    }
}
