package org.apache.gobblin.cluster;

import com.google.common.base.Predicate;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
import org.apache.gobblin.cluster.suite.IntegrationDedicatedManagerClusterSuite;
import org.apache.gobblin.cluster.suite.IntegrationDedicatedTaskDriverClusterSuite;
import org.apache.gobblin.cluster.suite.IntegrationJobCancelSuite;
import org.apache.gobblin.cluster.suite.IntegrationJobFactorySuite;
import org.apache.gobblin.cluster.suite.IntegrationJobRestartViaSpecSuite;
import org.apache.gobblin.cluster.suite.IntegrationJobTagSuite;
import org.apache.gobblin.cluster.suite.IntegrationSeparateProcessSuite;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ChainedPathZkSerializer;
import org.apache.helix.manager.zk.ZNRecordStreamingSerializer;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test
/* loaded from: input_file:org/apache/gobblin/cluster/ClusterIntegrationTest.class */
public class ClusterIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(ClusterIntegrationTest.class);
    private IntegrationBasicSuite suite;
    private String zkConnectString;

    @Test
    public void testJobShouldComplete() throws Exception {
        this.suite = new IntegrationBasicSuite();
        runAndVerify();
    }

    private HelixManager getHelixManager() {
        Config managerConfig = this.suite.getManagerConfig();
        String string = managerConfig.getString("gobblin.cluster.helix.cluster.name");
        String string2 = ConfigUtils.getString(managerConfig, "gobblin.cluster.helixInstanceName", GobblinClusterManager.class.getSimpleName());
        this.zkConnectString = managerConfig.getString("gobblin.cluster.zk.connection.string");
        return HelixManagerFactory.getZKHelixManager(string, string2, InstanceType.CONTROLLER, this.zkConnectString);
    }

    @Test
    void testJobShouldGetCancelled() throws Exception {
        this.suite = new IntegrationJobCancelSuite(ClusterIntegrationTestUtils.buildSleepingJob(IntegrationJobCancelSuite.JOB_ID, IntegrationJobCancelSuite.TASK_STATE_FILE).withValue("data.publisher.sleep.time.in.seconds", ConfigValueFactory.fromAnyRef(100)));
        HelixManager helixManager = getHelixManager();
        this.suite.startCluster();
        helixManager.connect();
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        final FutureTask futureTask = new FutureTask(() -> {
            try {
                TaskDriver taskDriver = new TaskDriver(helixManager);
                AssertWithBackoff.create().maxSleepMs(1000L).backoffFactor(1.0d).assertTrue(isTaskStarted(helixManager, IntegrationJobCancelSuite.JOB_ID), "Waiting for the job to start...");
                AssertWithBackoff.create().maxSleepMs(100L).timeoutMs(2000L).backoffFactor(1.0d).assertTrue(isTaskRunning(IntegrationJobCancelSuite.TASK_STATE_FILE), "Waiting for the task to enter running state");
                log.info("Stopping the job");
                taskDriver.stop(IntegrationJobCancelSuite.JOB_ID);
                this.suite.shutdownCluster();
            } catch (Exception e) {
                throw new RuntimeException("Failure in canceling tasks");
            }
        }, "cancelled");
        newSingleThreadExecutor.submit(futureTask);
        AssertWithBackoff.create().backoffFactor(1.0d).maxSleepMs(1000L).timeoutMs(500000L).assertTrue(new Predicate<Void>() { // from class: org.apache.gobblin.cluster.ClusterIntegrationTest.1
            public boolean apply(Void r3) {
                return futureTask.isDone();
            }
        }, "waiting for future to complete");
        Assert.assertEquals((String) futureTask.get(), "cancelled");
        this.suite.waitForAndVerifyOutputFiles();
    }

    @Test(enabled = false, dependsOnMethods = {"testJobShouldGetCancelled"}, groups = {"disabledOnCI"})
    public void testJobRestartViaSpec() throws Exception {
        this.suite = new IntegrationJobRestartViaSpecSuite(ClusterIntegrationTestUtils.buildSleepingJob(IntegrationJobCancelSuite.JOB_ID, IntegrationJobCancelSuite.TASK_STATE_FILE));
        HelixManager helixManager = getHelixManager();
        IntegrationJobRestartViaSpecSuite integrationJobRestartViaSpecSuite = (IntegrationJobRestartViaSpecSuite) this.suite;
        integrationJobRestartViaSpecSuite.startCluster();
        helixManager.connect();
        AssertWithBackoff.create().timeoutMs(30000L).maxSleepMs(1000L).backoffFactor(1.0d).assertTrue(isTaskStarted(helixManager, IntegrationJobCancelSuite.JOB_ID), "Waiting for the job to start...");
        AssertWithBackoff.create().maxSleepMs(100L).timeoutMs(2000L).backoffFactor(1.0d).assertTrue(isTaskRunning(IntegrationJobCancelSuite.TASK_STATE_FILE), "Waiting for the task to enter running state");
        ZkClient zkClient = new ZkClient(this.zkConnectString);
        zkClient.setZkSerializer(ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build());
        String path = Paths.get("/", getHelixManager().getClusterName(), "CONFIGS", "RESOURCE", IntegrationJobCancelSuite.JOB_ID).toString();
        Assert.assertEquals(((ZNRecord) zkClient.readData(path)).getSimpleField("TargetState"), TargetState.START.name());
        integrationJobRestartViaSpecSuite.addJobSpec("HelloWorldTestJob", SpecExecutor.Verb.UPDATE.name());
        AssertWithBackoff.create().maxSleepMs(1000L).timeoutMs(12000L).backoffFactor(1.0d).assertTrue(r6 -> {
            ZNRecord zNRecord = (ZNRecord) zkClient.readData(path, true);
            String str = null;
            if (zNRecord != null) {
                str = zNRecord.getSimpleField("TargetState");
            }
            return zNRecord == null || str.equals(TargetState.STOP.name());
        }, "Waiting for Workflow TargetState to be STOP");
        this.suite.waitForAndVerifyOutputFiles();
        AssertWithBackoff.create().maxSleepMs(1000L).timeoutMs(120000L).backoffFactor(1.0d).assertTrue(r62 -> {
            ZNRecord zNRecord = (ZNRecord) zkClient.readData(path, true);
            if (zNRecord != null) {
                return zNRecord.getSimpleField("TargetState").equals(TargetState.START.name());
            }
            return false;
        }, "Waiting for Workflow TargetState to be START");
    }

    public static Predicate<Void> isTaskStarted(HelixManager helixManager, String str) {
        return r5 -> {
            return TaskDriver.getWorkflowContext(helixManager, str) != null;
        };
    }

    public static Predicate<Void> isTaskRunning(String str) {
        return r5 -> {
            return new File(str).exists();
        };
    }

    @Test
    public void testSeparateProcessMode() throws Exception {
        this.suite = new IntegrationSeparateProcessSuite();
        runAndVerify();
    }

    @Test
    public void testDedicatedManagerCluster() throws Exception {
        this.suite = new IntegrationDedicatedManagerClusterSuite();
        runAndVerify();
    }

    @Test(enabled = false)
    public void testDedicatedTaskDriverCluster() throws Exception {
        this.suite = new IntegrationDedicatedTaskDriverClusterSuite();
        runAndVerify();
    }

    @Test(enabled = false)
    public void testJobWithTag() throws Exception {
        this.suite = new IntegrationJobTagSuite();
        runAndVerify();
    }

    @Test
    public void testPlanningJobFactory() throws Exception {
        this.suite = new IntegrationJobFactorySuite();
        runAndVerify();
    }

    private void runAndVerify() throws Exception {
        this.suite.startCluster();
        this.suite.waitForAndVerifyOutputFiles();
        ensureJobLauncherFinished();
        this.suite.verifyMetricsCleaned();
        this.suite.shutdownCluster();
    }

    private void ensureJobLauncherFinished() throws Exception {
        AssertWithBackoff.create().logger(log).timeoutMs(120000L).maxSleepMs(100L).backoffFactor(1.5d).assertTrue(this::isJobLauncherFinished, "Waiting for job launcher completion");
    }

    protected boolean isJobLauncherFinished(Void r4) {
        Iterator<Map.Entry<Thread, StackTraceElement[]>> it = Thread.getAllStackTraces().entrySet().iterator();
        while (it.hasNext()) {
            for (StackTraceElement stackTraceElement : it.next().getValue()) {
                if (stackTraceElement.toString().contains(HelixRetriggeringJobCallable.class.getSimpleName())) {
                    return false;
                }
            }
        }
        return true;
    }

    @AfterMethod
    public void tearDown() throws IOException {
        this.suite.deleteWorkDir();
    }
}
