package org.apache.gobblin.cluster;

import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.IOException;
import java.net.URL;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.curator.test.TestingServer;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
import org.apache.gobblin.util.eventbus.EventBusFactory;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"gobblin.cluster"})
/* loaded from: input_file:org/apache/gobblin/cluster/GobblinTaskRunnerTest.class */
public class GobblinTaskRunnerTest {
    public static final Logger LOG = LoggerFactory.getLogger(GobblinTaskRunnerTest.class);
    private static final String JOB_ID = "job_taskRunnerTestJob_" + System.currentTimeMillis();
    private static final String TASK_STATE_FILE = "/tmp/" + GobblinTaskRunnerTest.class.getSimpleName() + "/taskState/_RUNNING";
    public static final String HADOOP_OVERRIDE_PROPERTY_NAME = "prop";
    private TestingServer testingZKServer;
    private GobblinTaskRunner gobblinTaskRunner;
    private GobblinTaskRunner gobblinTaskRunnerHealthCheck;
    private GobblinTaskRunner corruptGobblinTaskRunner;
    private GobblinTaskRunner gobblinTaskRunnerFailedReporter;
    private GobblinClusterManager gobblinClusterManager;
    private String clusterName;
    private String corruptHelixInstance;
    private TaskAssignmentAfterConnectionRetry suite;

    /* loaded from: input_file:org/apache/gobblin/cluster/GobblinTaskRunnerTest$TaskAssignmentAfterConnectionRetry.class */
    public static class TaskAssignmentAfterConnectionRetry extends IntegrationBasicSuite {
        TaskAssignmentAfterConnectionRetry(Config config) {
            super(config);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.gobblin.cluster.suite.IntegrationBasicSuite
        public void createHelixCluster() throws Exception {
            super.createHelixCluster();
            String string = super.getManagerConfig().getString("gobblin.cluster.helix.cluster.name");
            String string2 = super.getManagerConfig().getString("gobblin.cluster.zk.connection.string");
            ClusterIntegrationTestUtils.createPartialInstanceStructure(HelixManagerFactory.getZKHelixManager(string, IntegrationBasicSuite.WORKER_INSTANCE_0, InstanceType.PARTICIPANT, string2), string2);
        }
    }

    @BeforeClass
    public void setUp() throws Exception {
        this.testingZKServer = new TestingServer(-1);
        LOG.info("Testing ZK Server listening on: " + this.testingZKServer.getConnectString());
        URL resource = GobblinTaskRunnerTest.class.getClassLoader().getResource(GobblinTaskRunnerTest.class.getSimpleName() + ".conf");
        Assert.assertNotNull(resource, "Could not find resource " + resource);
        Config resolve = ConfigFactory.parseURL(resource).withValue("gobblin.cluster.zk.connection.string", ConfigValueFactory.fromAnyRef(this.testingZKServer.getConnectString())).withValue("gobblin.cluster.hadoop.inject.prop", ConfigValueFactory.fromAnyRef("value")).withValue("gobblin.cluster.hadoop.inject.fs.file.impl.disable.cache", ConfigValueFactory.fromAnyRef("true")).resolve();
        String string = resolve.getString("gobblin.cluster.zk.connection.string");
        this.clusterName = resolve.getString("gobblin.cluster.helix.cluster.name");
        HelixUtils.createGobblinHelixCluster(string, this.clusterName);
        this.gobblinTaskRunner = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, TestHelper.TEST_HELIX_INSTANCE_NAME, "1", "1", resolve, Optional.absent());
        this.gobblinTaskRunnerHealthCheck = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, HelixUtils.getHelixInstanceName("HealthCheckHelixInstance", 0), "1", "1", resolve.withValue("gobblin.cluster.container.exitOnHealthCheckFailure", ConfigValueFactory.fromAnyRef(true)), Optional.absent());
        this.gobblinTaskRunnerFailedReporter = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, HelixUtils.getHelixInstanceName("MetricReporterFailureInstance", 0), "1", "2", resolve.withValue("metrics.enabled", ConfigValueFactory.fromAnyRef(true)).withValue("metrics.reporting.kafka.enabled", ConfigValueFactory.fromAnyRef(true)).withValue("metrics.reporting.kafka.topic.metrics", ConfigValueFactory.fromAnyRef("metricTopic")).withValue("gobblin.task.isMetricReportingFailureFatal", ConfigValueFactory.fromAnyRef(true)), Optional.absent());
        this.corruptHelixInstance = HelixUtils.getHelixInstanceName("CorruptHelixInstance", 0);
        this.corruptGobblinTaskRunner = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, this.corruptHelixInstance, "1", "1", resolve, Optional.absent());
        this.gobblinClusterManager = new GobblinClusterManager(TestHelper.TEST_APPLICATION_NAME, "1", resolve, Optional.absent());
        this.gobblinClusterManager.connectHelixManager();
    }

    @Test
    public void testSendReceiveShutdownMessage() throws Exception {
        this.gobblinTaskRunner.connectHelixManager();
        Executors.newSingleThreadExecutor().submit(() -> {
            this.gobblinTaskRunner.start();
        });
        Logger logger = LoggerFactory.getLogger("testSendReceiveShutdownMessage");
        AssertWithBackoff.create().logger(logger).timeoutMs(20000L).assertTrue(new Predicate<Void>() { // from class: org.apache.gobblin.cluster.GobblinTaskRunnerTest.1
            public boolean apply(Void r3) {
                return GobblinTaskRunnerTest.this.gobblinTaskRunner.isStarted();
            }
        }, "gobblinTaskRunner started");
        this.gobblinClusterManager.sendShutdownRequest();
        AssertWithBackoff.create().logger(logger).timeoutMs(20000L).assertTrue(new Predicate<Void>() { // from class: org.apache.gobblin.cluster.GobblinTaskRunnerTest.2
            public boolean apply(Void r3) {
                return GobblinTaskRunnerTest.this.gobblinTaskRunner.isStopped();
            }
        }, "gobblinTaskRunner stopped");
    }

    @Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = ".*Could not create one or more reporters.*")
    public void testStartUpFailsDueToMetricReporterFailure() {
        this.gobblinTaskRunnerFailedReporter.start();
    }

    @Test
    public void testBuildFileSystemConfig() {
        Assert.assertEquals(this.gobblinTaskRunner.getFs().getConf().get("prop"), "value");
    }

    @Test
    public void testConnectHelixManagerWithRetry() {
        ClusterIntegrationTestUtils.createPartialInstanceStructure(HelixManagerFactory.getZKHelixManager(this.clusterName, this.corruptHelixInstance, InstanceType.PARTICIPANT, this.testingZKServer.getConnectString()), this.testingZKServer.getConnectString());
        try {
            this.corruptGobblinTaskRunner.connectHelixManager();
            Assert.fail("Unexpected success in connecting to HelixManager");
        } catch (Exception e) {
            Assert.assertTrue(e.getClass().equals(HelixException.class));
        }
        this.corruptGobblinTaskRunner.connectHelixManagerWithRetry();
        Assert.assertTrue(true);
    }

    @Test(groups = {"disabledOnCI"})
    public void testTaskAssignmentAfterHelixConnectionRetry() throws Exception {
        this.suite = new TaskAssignmentAfterConnectionRetry(ClusterIntegrationTestUtils.buildSleepingJob(JOB_ID, TASK_STATE_FILE));
        this.suite.startCluster();
        HelixManager zKHelixManager = HelixManagerFactory.getZKHelixManager(this.suite.getManagerConfig().getString("gobblin.cluster.helix.cluster.name"), "TestManager", InstanceType.SPECTATOR, this.suite.getManagerConfig().getString("gobblin.cluster.zk.connection.string"));
        zKHelixManager.connect();
        AssertWithBackoff.create().maxSleepMs(1000L).backoffFactor(1.0d).assertTrue(ClusterIntegrationTest.isTaskStarted(zKHelixManager, JOB_ID), "Waiting for the job to start...");
        AssertWithBackoff.create().maxSleepMs(100L).timeoutMs(2000L).backoffFactor(1.0d).assertTrue(ClusterIntegrationTest.isTaskRunning(TASK_STATE_FILE), "Waiting for the task to enter running state");
        zKHelixManager.disconnect();
    }

    @Test(groups = {"disabledOnCI"}, dependsOnMethods = {"testSendReceiveShutdownMessage"}, expectedExceptions = {ExecutionException.class}, expectedExceptionsMessageRegExp = ".*ContainerHealthCheckException.*")
    public void testShutdownOnHealthCheckFailure() throws Exception {
        this.gobblinTaskRunnerHealthCheck.connectHelixManager();
        Future<?> submit = Executors.newSingleThreadExecutor().submit(() -> {
            this.gobblinTaskRunnerHealthCheck.start();
        });
        Logger logger = LoggerFactory.getLogger("testHandleContainerHealthCheckFailure");
        AssertWithBackoff.create().logger(logger).timeoutMs(20000L).assertTrue(new Predicate<Void>() { // from class: org.apache.gobblin.cluster.GobblinTaskRunnerTest.3
            public boolean apply(Void r3) {
                return GobblinTaskRunnerTest.this.gobblinTaskRunnerHealthCheck.isStarted();
            }
        }, "gobblinTaskRunner started");
        EventBusFactory.get("ContainerHealthCheckEventBus", SharedResourcesBrokerFactory.getImplicitBroker()).post(new ContainerHealthCheckFailureEvent(ConfigFactory.empty(), getClass().getName()));
        AssertWithBackoff.create().logger(logger).timeoutMs(30000L).assertTrue(new Predicate<Void>() { // from class: org.apache.gobblin.cluster.GobblinTaskRunnerTest.4
            public boolean apply(Void r3) {
                return GobblinTaskRunnerTest.this.gobblinTaskRunnerHealthCheck.isStopped();
            }
        }, "gobblinTaskRunner stopped");
        submit.get();
    }

    @AfterClass
    public void tearDown() throws IOException, InterruptedException {
        try {
            this.gobblinClusterManager.disconnectHelixManager();
            this.gobblinTaskRunner.disconnectHelixManager();
            this.corruptGobblinTaskRunner.disconnectHelixManager();
            this.gobblinTaskRunnerFailedReporter.disconnectHelixManager();
            this.gobblinTaskRunnerHealthCheck.disconnectHelixManager();
            if (this.suite != null) {
                this.suite.shutdownCluster();
            }
        } finally {
            this.testingZKServer.close();
        }
    }
}
