package org.apache.gobblin.cluster;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.Schema;
import org.apache.curator.test.TestingServer;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.listeners.AbstractJobListener;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.task.TaskDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"gobblin.cluster"})
/* loaded from: input_file:org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.class */
public class GobblinHelixJobLauncherTest {
    public static final Logger LOG = LoggerFactory.getLogger(GobblinHelixJobLauncherTest.class);
    private HelixManager helixManager;
    private FileSystem localFs;
    private Path appWorkDir;
    private GobblinTaskRunner gobblinTaskRunner;
    private DatasetStateStore datasetStateStore;
    private Thread thread;
    private final Closer closer = Closer.create();
    private Config baseConfig;

    /* loaded from: input_file:org/apache/gobblin/cluster/GobblinHelixJobLauncherTest$SuspendJobListener.class */
    private static class SuspendJobListener extends AbstractJobListener {
        private AtomicInteger completes = new AtomicInteger();
        private CountDownLatch stg1;
        private CountDownLatch stg2;

        public SuspendJobListener(CountDownLatch countDownLatch, CountDownLatch countDownLatch2) {
            this.stg1 = countDownLatch;
            this.stg2 = countDownLatch2;
        }

        public void onJobStart(JobContext jobContext) throws Exception {
            this.stg1.countDown();
            this.stg2.await();
        }

        public void onJobCompletion(JobContext jobContext) throws Exception {
            this.completes.addAndGet(1);
        }

        public AtomicInteger getCompletes() {
            return this.completes;
        }
    }

    @BeforeClass
    public void setUp() throws Exception {
        TestingServer register = this.closer.register(new TestingServer(-1));
        LOG.info("Testing ZK Server listening on: " + register.getConnectString());
        URL resource = GobblinHelixJobLauncherTest.class.getClassLoader().getResource(GobblinHelixJobLauncherTest.class.getSimpleName() + ".conf");
        Assert.assertNotNull(resource, "Could not find resource " + resource);
        this.appWorkDir = new Path(GobblinHelixJobLauncherTest.class.getSimpleName());
        File file = new File(this.appWorkDir.toString(), "TestJob.json");
        TestHelper.createSourceJsonFile(file);
        this.baseConfig = ConfigFactory.parseURL(resource).withValue("gobblin.cluster.zk.connection.string", ConfigValueFactory.fromAnyRef(register.getConnectString())).withValue("source.filebased.files.to.pull", ConfigValueFactory.fromAnyRef(file.getAbsolutePath())).withValue("state.store.jobStateInStateStore", ConfigValueFactory.fromAnyRef("true")).resolve();
        String string = this.baseConfig.getString("gobblin.cluster.zk.connection.string");
        String string2 = this.baseConfig.getString("gobblin.cluster.helix.cluster.name");
        HelixUtils.createGobblinHelixCluster(string, string2);
        this.helixManager = HelixManagerFactory.getZKHelixManager(string2, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER, string);
        this.closer.register(new Closeable() { // from class: org.apache.gobblin.cluster.GobblinHelixJobLauncherTest.1
            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
                GobblinHelixJobLauncherTest.this.helixManager.disconnect();
            }
        });
        this.helixManager.connect();
        this.localFs = FileSystem.getLocal(new Configuration());
        this.closer.register(new Closeable() { // from class: org.apache.gobblin.cluster.GobblinHelixJobLauncherTest.2
            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
                if (GobblinHelixJobLauncherTest.this.localFs.exists(GobblinHelixJobLauncherTest.this.appWorkDir)) {
                    GobblinHelixJobLauncherTest.this.localFs.delete(GobblinHelixJobLauncherTest.this.appWorkDir, true);
                }
            }
        });
        this.gobblinTaskRunner = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, TestHelper.TEST_HELIX_INSTANCE_NAME, "1", "1", this.baseConfig, Optional.of(this.appWorkDir));
        this.datasetStateStore = ((DatasetStateStore.Factory) new ClassAliasResolver(DatasetStateStore.Factory.class).resolveClass(ConfigUtils.getString(this.baseConfig, "state.store.type", "fs")).newInstance()).createStateStore(this.baseConfig);
        this.thread = new Thread(new Runnable() { // from class: org.apache.gobblin.cluster.GobblinHelixJobLauncherTest.3
            @Override // java.lang.Runnable
            public void run() {
                GobblinHelixJobLauncherTest.this.gobblinTaskRunner.start();
            }
        });
        this.thread.start();
    }

    private Properties generateJobProperties(Config config, String str, String str2) {
        Properties configToProperties = ConfigUtils.configToProperties(config);
        String str3 = configToProperties.getProperty("job.name") + str;
        configToProperties.setProperty("job.name", str3);
        configToProperties.setProperty("job.id", "job_" + str3 + str2);
        configToProperties.setProperty("writer.file.path", str3);
        configToProperties.setProperty("gobblin.cluster.workflow.expirySeconds", "5");
        return configToProperties;
    }

    private File getJobOutputFile(Properties properties) {
        return new File(properties.getProperty("data.publisher.final.dir"), properties.getProperty("writer.file.path") + File.separator + properties.getProperty("writer.file.name"));
    }

    public void testLaunchJob() throws Exception {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Properties generateJobProperties = generateJobProperties(this.baseConfig, "1", "_1504201348470");
        this.closer.register(new GobblinHelixJobLauncher(generateJobProperties, this.helixManager, this.appWorkDir, ImmutableList.of(), concurrentHashMap, java.util.Optional.empty())).launchJob((JobListener) null);
        File jobOutputFile = getJobOutputFile(generateJobProperties);
        Assert.assertTrue(jobOutputFile.exists());
        TestHelper.assertGenericRecords(jobOutputFile, new Schema.Parser().parse(TestHelper.SOURCE_SCHEMA));
        List all = this.datasetStateStore.getAll(generateJobProperties.getProperty("job.name"), "current.jst");
        Assert.assertEquals(all.size(), 1);
        JobState.DatasetState datasetState = (JobState.DatasetState) all.get(0);
        Assert.assertEquals(datasetState.getCompletedTasks(), 1);
        Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED);
        Assert.assertEquals(datasetState.getTaskStates().size(), 1);
        Assert.assertEquals(((TaskState) datasetState.getTaskStates().get(0)).getWorkingState(), WorkUnitState.WorkingState.COMMITTED);
    }

    public void testLaunchMultipleJobs() throws Exception {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        GobblinHelixJobLauncher register = this.closer.register(new GobblinHelixJobLauncher(generateJobProperties(this.baseConfig, "2", "_1504201348471"), this.helixManager, this.appWorkDir, ImmutableList.of(), concurrentHashMap, java.util.Optional.empty()));
        GobblinHelixJobLauncher register2 = this.closer.register(new GobblinHelixJobLauncher(generateJobProperties(this.baseConfig, "2", "_1504201348472"), this.helixManager, this.appWorkDir, ImmutableList.of(), concurrentHashMap, java.util.Optional.empty()));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        SuspendJobListener suspendJobListener = new SuspendJobListener(countDownLatch, countDownLatch2);
        new Thread(() -> {
            try {
                register.launchJob(suspendJobListener);
                countDownLatch3.countDown();
            } catch (JobException e) {
            }
        }).start();
        countDownLatch.await();
        register2.launchJob(suspendJobListener);
        countDownLatch2.countDown();
        countDownLatch3.await();
        Assert.assertEquals(suspendJobListener.getCompletes().get() == 1, true);
    }

    @Test(enabled = false, dependsOnMethods = {"testLaunchJob", "testLaunchMultipleJobs"})
    public void testJobCleanup() throws Exception {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Properties generateJobProperties = generateJobProperties(this.baseConfig, "3", "_1504201348473");
        GobblinHelixJobLauncher gobblinHelixJobLauncher = new GobblinHelixJobLauncher(generateJobProperties, this.helixManager, this.appWorkDir, ImmutableList.of(), concurrentHashMap, java.util.Optional.empty());
        Properties generateJobProperties2 = generateJobProperties(this.baseConfig, "33", "_1504201348474");
        GobblinHelixJobLauncher gobblinHelixJobLauncher2 = new GobblinHelixJobLauncher(generateJobProperties2, this.helixManager, this.appWorkDir, ImmutableList.of(), concurrentHashMap, java.util.Optional.empty());
        gobblinHelixJobLauncher.launchJob((JobListener) null);
        gobblinHelixJobLauncher2.launchJob((JobListener) null);
        TaskDriver taskDriver = new TaskDriver(this.helixManager);
        String property = generateJobProperties.getProperty("job.id");
        String property2 = generateJobProperties2.getProperty("job.id");
        org.apache.helix.task.JobContext jobContext = taskDriver.getJobContext(property);
        org.apache.helix.task.JobContext jobContext2 = taskDriver.getJobContext(property2);
        waitForWorkFlowStartup(taskDriver, property);
        waitForWorkFlowStartup(taskDriver, property2);
        Assert.assertNotNull(jobContext);
        Assert.assertNotNull(jobContext2);
        gobblinHelixJobLauncher.close();
        waitForWorkFlowCleanup(taskDriver, property);
        Assert.assertNull(taskDriver.getJobContext(property));
        Assert.assertNull(taskDriver.getWorkflowConfig(property));
        Assert.assertNull(taskDriver.getWorkflowContext(property));
        Assert.assertNotNull(taskDriver.getWorkflowConfig(property2));
        gobblinHelixJobLauncher2.close();
        waitForWorkFlowCleanup(taskDriver, property2);
        Assert.assertNull(taskDriver.getWorkflowConfig(property2));
        File file = new File(this.appWorkDir + File.separator + "_workunits" + File.separator + property);
        File file2 = new File(this.appWorkDir + File.separator + "_taskstates" + File.separator + property);
        Assert.assertFalse(file.exists());
        Assert.assertFalse(file2.exists());
        Assert.assertFalse(new File(GobblinClusterUtils.getJobStateFilePath(true, this.appWorkDir, property).toString()).exists());
    }

    @AfterClass
    public void tearDown() throws IOException {
        try {
            this.gobblinTaskRunner.stop();
            this.thread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            this.closer.close();
        }
    }

    private void waitForWorkFlowCleanup(TaskDriver taskDriver, String str) {
        for (int i = 0; i < 60 && taskDriver.getWorkflowConfig(str) != null; i++) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
        }
    }

    private void waitForWorkFlowStartup(TaskDriver taskDriver, String str) {
        for (int i = 0; i < 5 && taskDriver.getWorkflowConfig(str) == null; i++) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
        }
    }
}
