package org.apache.gobblin.cluster;

import com.google.common.base.Optional;
import com.google.common.eventbus.EventBus;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import org.apache.curator.test.TestingServer;
import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent;
import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.assertj.core.util.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"gobblin.cluster"})
/* loaded from: input_file:org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.class */
public class GobblinHelixJobSchedulerTest {
    public static final Logger LOG = LoggerFactory.getLogger(GobblinHelixJobSchedulerTest.class);
    private HelixManager helixManager;
    private FileSystem localFs;
    private Path appWorkDir;
    private Config baseConfig;
    private GobblinTaskRunner gobblinTaskRunner;
    private Thread thread;
    private final Closer closer = Closer.create();
    private final String workflowIdSuffix1 = "_1504201348471";
    private final String workflowIdSuffix2 = "_1504201348472";

    @BeforeClass
    public void setUp() throws Exception {
        TestingServer register = this.closer.register(new TestingServer(-1));
        LOG.info("Testing ZK Server listening on: " + register.getConnectString());
        URL resource = GobblinHelixJobSchedulerTest.class.getClassLoader().getResource(GobblinHelixJobSchedulerTest.class.getSimpleName() + ".conf");
        Assert.assertNotNull(resource, "Could not find resource " + resource);
        this.appWorkDir = new Path(GobblinHelixJobSchedulerTest.class.getSimpleName());
        File file = new File(this.appWorkDir.toString(), "TestJob.json");
        TestHelper.createSourceJsonFile(file);
        this.baseConfig = ConfigFactory.parseURL(resource).withValue("gobblin.cluster.zk.connection.string", ConfigValueFactory.fromAnyRef(register.getConnectString())).withValue("source.filebased.files.to.pull", ConfigValueFactory.fromAnyRef(file.getAbsolutePath())).withValue("state.store.jobStateInStateStore", ConfigValueFactory.fromAnyRef("true")).resolve();
        String string = this.baseConfig.getString("gobblin.cluster.zk.connection.string");
        String string2 = this.baseConfig.getString("gobblin.cluster.helix.cluster.name");
        HelixUtils.createGobblinHelixCluster(string, string2);
        this.helixManager = HelixManagerFactory.getZKHelixManager(string2, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER, string);
        this.closer.register(() -> {
            this.helixManager.disconnect();
        });
        this.helixManager.connect();
        this.localFs = FileSystem.getLocal(new Configuration());
        this.closer.register(() -> {
            if (this.localFs.exists(this.appWorkDir)) {
                this.localFs.delete(this.appWorkDir, true);
            }
        });
        this.closer.register(() -> {
            if (this.localFs.exists(this.appWorkDir)) {
                this.localFs.delete(this.appWorkDir, true);
            }
        });
        this.gobblinTaskRunner = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, TestHelper.TEST_HELIX_INSTANCE_NAME, "1", "1", this.baseConfig, Optional.of(this.appWorkDir));
        this.thread = new Thread(() -> {
            this.gobblinTaskRunner.start();
        });
        this.thread.start();
    }

    @Test
    public void testNewJobAndUpdate() throws Exception {
        Config withValue = ConfigFactory.empty().withValue("jobconf.fullyQualifiedPath", ConfigValueFactory.fromAnyRef("/tmp/" + GobblinHelixJobScheduler.class.getSimpleName()));
        SchedulerService schedulerService = new SchedulerService(new Properties());
        NonObservingFSJobCatalog nonObservingFSJobCatalog = new NonObservingFSJobCatalog(withValue);
        nonObservingFSJobCatalog.startAsync();
        GobblinHelixJobScheduler gobblinHelixJobScheduler = new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, java.util.Optional.empty(), new EventBus(), this.appWorkDir, Lists.emptyList(), schedulerService, nonObservingFSJobCatalog);
        Properties generateJobProperties = GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, "1", "_1504201348471");
        generateJobProperties.setProperty("gobblin.cluster.job.cancelRunningJobOnDelete", "true");
        NewJobConfigArrivalEvent newJobConfigArrivalEvent = new NewJobConfigArrivalEvent(generateJobProperties.getProperty("job.name"), generateJobProperties);
        gobblinHelixJobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
        generateJobProperties.setProperty("job.id", "job_" + generateJobProperties.getProperty("job.name") + "_1504201348472");
        this.helixManager.connect();
        String str = null;
        long currentTimeMillis = System.currentTimeMillis() + 30000;
        while (true) {
            if (System.currentTimeMillis() >= currentTimeMillis) {
                break;
            }
            Map workflowIdsFromJobNames = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager, Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
            if (workflowIdsFromJobNames.containsKey(newJobConfigArrivalEvent.getJobName())) {
                str = (String) workflowIdsFromJobNames.get(newJobConfigArrivalEvent.getJobName());
                break;
            }
            Thread.sleep(100L);
        }
        Assert.assertNotNull(str);
        Assert.assertTrue(str.endsWith("_1504201348471"));
        gobblinHelixJobScheduler.handleUpdateJobConfigArrival(new UpdateJobConfigArrivalEvent(generateJobProperties.getProperty("job.name"), generateJobProperties));
        this.helixManager.connect();
        long currentTimeMillis2 = System.currentTimeMillis() + 30000;
        while (true) {
            if (System.currentTimeMillis() >= currentTimeMillis2) {
                break;
            }
            Map workflowIdsFromJobNames2 = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager, Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
            if (workflowIdsFromJobNames2.containsKey(newJobConfigArrivalEvent.getJobName())) {
                str = (String) workflowIdsFromJobNames2.get(newJobConfigArrivalEvent.getJobName());
                break;
            }
            Thread.sleep(100L);
        }
        Assert.assertTrue(str.endsWith("_1504201348472"));
    }

    @AfterClass
    public void tearDown() throws IOException {
        try {
            this.gobblinTaskRunner.stop();
            this.thread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            this.closer.close();
        }
    }
}
