package org.apache.gobblin.cluster;

import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import javax.annotation.Nullable;
import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.FileUtils;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/gobblin/cluster/TestSingleTask.class */
public class TestSingleTask {
    private static final Logger log = LoggerFactory.getLogger(TestSingleTask.class);

    private InMemorySingleTaskRunner createInMemoryTaskRunner() throws IOException {
        File createTempDir = Files.createTempDir();
        Path path = Paths.get(createTempDir.getAbsolutePath(), "clusterConf");
        new ConfigUtils(new FileUtils()).saveConfigToFile(ConfigFactory.empty().withValue("gobblin.cluster.appWorkDir", ConfigValueFactory.fromAnyRef(createTempDir.toString())), path);
        return new InMemorySingleTaskRunner(path.toString(), "testJob", Paths.get(createTempDir.getAbsolutePath(), "_workunits/store/workunit.wu").toString());
    }

    @Test
    public void testSingleTaskRerunAfterFailure() throws Exception {
        InMemorySingleTaskRunner createInMemoryTaskRunner = createInMemoryTaskRunner();
        try {
            createInMemoryTaskRunner.run(true);
        } catch (Exception e) {
            createInMemoryTaskRunner.run();
        }
        Assert.assertTrue(true);
    }

    @Test
    public void testTaskCancelBeforeRunFailure() throws Exception {
        InMemorySingleTaskRunner createInMemoryTaskRunner = createInMemoryTaskRunner();
        createInMemoryTaskRunner.initClusterSingleTask(false);
        try {
            createInMemoryTaskRunner.task.cancel();
        } catch (Exception e) {
            Assert.assertTrue(e instanceof IllegalStateException);
            Assert.assertTrue(e.getMessage().contains("Failed to initialize"));
        }
    }

    @Test
    public void testNormalSequence() throws Exception {
        InMemorySingleTaskRunner createInMemoryTaskRunner = createInMemoryTaskRunner();
        createInMemoryTaskRunner.startServices();
        createInMemoryTaskRunner.initClusterSingleTask(false);
        final SingleTask singleTask = createInMemoryTaskRunner.task;
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        final FutureTask futureTask = new FutureTask(() -> {
            try {
                singleTask.cancel();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, "cancelled");
        newFixedThreadPool.submit(new FutureTask(() -> {
            try {
                singleTask.run();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, "completed"));
        AssertWithBackoff.create().timeoutMs(2000L).backoffFactor(1.0d).assertTrue(new Predicate<Void>() { // from class: org.apache.gobblin.cluster.TestSingleTask.1
            public boolean apply(@Nullable Void r3) {
                return singleTask._taskAttempt != null;
            }
        }, "wait until task attempt available");
        newFixedThreadPool.submit(futureTask);
        AssertWithBackoff.create().timeoutMs(2000L).backoffFactor(1.0d).assertTrue(new Predicate<Void>() { // from class: org.apache.gobblin.cluster.TestSingleTask.2
            public boolean apply(@Nullable Void r3) {
                return futureTask.isDone();
            }
        }, "wait until task attempt available");
        Assert.assertEquals(futureTask.get(), "cancelled");
    }

    @Test
    public void testTaskCancelBeforeRun() throws Exception {
        InMemorySingleTaskRunner createInMemoryTaskRunner = createInMemoryTaskRunner();
        createInMemoryTaskRunner.setInjectedConfig(ConfigFactory.parseMap(ImmutableMap.of("maxRetryBlockedOnTaskAttemptInit", Integer.MAX_VALUE)));
        createInMemoryTaskRunner.startServices();
        createInMemoryTaskRunner.initClusterSingleTask(false);
        SingleTask singleTask = createInMemoryTaskRunner.task;
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        FutureTask futureTask = new FutureTask(() -> {
            try {
                singleTask.cancel();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, "cancelled");
        newFixedThreadPool.submit(futureTask);
        final FutureTask futureTask2 = new FutureTask(() -> {
            try {
                singleTask.run();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, "completed");
        newFixedThreadPool.submit(futureTask2);
        AssertWithBackoff.create().backoffFactor(1.0d).maxSleepMs(1000L).timeoutMs(500000L).assertTrue(new Predicate<Void>() { // from class: org.apache.gobblin.cluster.TestSingleTask.3
            public boolean apply(@Nullable Void r3) {
                return futureTask2.isDone();
            }
        }, "waiting for future to complete");
        Assert.assertEquals(futureTask2.get(), "completed");
        Assert.assertTrue(futureTask.isDone());
        Assert.assertEquals(futureTask.get(), "cancelled");
    }
}
