package org.apache.gobblin.cluster;

import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.avro.Schema;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.cluster.TaskRunnerSuiteBase;
import org.apache.gobblin.example.simplejson.SimpleJsonConverter;
import org.apache.gobblin.example.simplejson.SimpleJsonSource;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.SerializationUtils;
import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
import org.apache.gobblin.util.eventbus.EventBusFactory;
import org.apache.gobblin.util.retry.RetryerFactory;
import org.apache.gobblin.writer.AvroDataWriterBuilder;
import org.apache.gobblin.writer.Destination;
import org.apache.gobblin.writer.WriterOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskResult;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"gobblin.cluster"})
/* loaded from: input_file:org/apache/gobblin/cluster/GobblinHelixTaskTest.class */
public class GobblinHelixTaskTest {
    private TaskExecutor taskExecutor;
    private GobblinHelixTaskStateTracker taskStateTracker;
    private GobblinHelixTask gobblinHelixTask;
    private HelixManager helixManager;
    private FileSystem localFs;
    private Path appWorkDir;
    private Path taskOutputDir;
    private CountDownLatch countDownLatchForFailInTaskCreation;

    @BeforeClass
    public void setUp() throws IOException {
        Configuration configuration = new Configuration();
        configuration.setInt("taskexecutor.threadpool.size", 1);
        this.taskExecutor = new TaskExecutor(configuration);
        this.helixManager = (HelixManager) Mockito.mock(HelixManager.class);
        Mockito.when(this.helixManager.getInstanceName()).thenReturn(GobblinHelixTaskTest.class.getSimpleName());
        Properties properties = new Properties();
        properties.setProperty("helixTaskTracker.isNewTaskRegFailureFatal", "true");
        this.taskStateTracker = new GobblinHelixTaskStateTracker(properties);
        this.localFs = FileSystem.getLocal(configuration);
        this.appWorkDir = new Path(GobblinHelixTaskTest.class.getSimpleName());
        this.taskOutputDir = new Path(this.appWorkDir, "output");
    }

    @Test
    public void testPrepareTask() throws IOException, InterruptedException {
        EventBusFactory.get("ContainerHealthCheckEventBus", SharedResourcesBrokerFactory.getImplicitBroker()).register(this);
        this.countDownLatchForFailInTaskCreation = new CountDownLatch(1);
        Path path = new Path(this.appWorkDir, TestHelper.TEST_JOB_ID + ".job.state");
        JobState jobState = new JobState();
        jobState.setJobName(TestHelper.TEST_JOB_NAME);
        jobState.setJobId(TestHelper.TEST_JOB_ID);
        SerializationUtils.serializeState(this.localFs, path, jobState);
        WorkUnit createEmpty = WorkUnit.createEmpty();
        prepareWorkUnit(createEmpty);
        File file = new File(this.appWorkDir.toString(), "TestJob.json");
        TestHelper.createSourceJsonFile(file);
        createEmpty.setProp("source.file", file.getAbsolutePath());
        Path path2 = new Path(this.appWorkDir, "_workunits");
        FsStateStore fsStateStore = new FsStateStore(this.localFs, path2.toString(), WorkUnit.class);
        Path path3 = new Path(new Path(path2, TestHelper.TEST_JOB_ID), "TestJob.wu");
        fsStateStore.put(TestHelper.TEST_JOB_ID, "TestJob.wu", createEmpty);
        Assert.assertTrue(this.localFs.exists(path3));
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("gobblin.cluster.work.unit.file.path", path3.toString());
        newHashMap.put("job.name", TestHelper.TEST_JOB_NAME);
        newHashMap.put("job.id", TestHelper.TEST_JOB_ID);
        newHashMap.put("task.key", Long.toString(Id.parse(TestHelper.TEST_JOB_ID).getSequence().longValue()));
        TaskConfig taskConfig = new TaskConfig("", newHashMap, true);
        TaskCallbackContext taskCallbackContext = (TaskCallbackContext) Mockito.mock(TaskCallbackContext.class);
        Mockito.when(taskCallbackContext.getTaskConfig()).thenReturn(taskConfig);
        Mockito.when(taskCallbackContext.getManager()).thenReturn(this.helixManager);
        TaskDriver createTaskDriverWithMockedAttributes = createTaskDriverWithMockedAttributes(taskCallbackContext, taskConfig);
        TaskRunnerSuiteBase.Builder builder = new TaskRunnerSuiteBase.Builder(ConfigFactory.empty().withValue("retry_type", ConfigValueFactory.fromAnyRef(RetryerFactory.RetryType.FIXED_ATTEMPT.name())).withValue("retry_times", ConfigValueFactory.fromAnyRef(2)));
        TaskRunnerSuiteBase build = builder.setInstanceName("TestInstance").setApplicationName(TestHelper.TEST_APPLICATION_NAME).setAppWorkPath(this.appWorkDir).setContainerMetrics(Optional.absent()).setFileSystem(this.localFs).setJobHelixManager(this.helixManager).setApplicationId("TestApplication-1").build();
        this.gobblinHelixTask = new GobblinHelixTaskFactory(builder, build.metricContext, this.taskStateTracker, ConfigFactory.empty(), Optional.of(createTaskDriverWithMockedAttributes)).createNewTask(taskCallbackContext);
        TaskRunnerSuiteBase.Builder builder2 = (TaskRunnerSuiteBase.Builder) Mockito.spy(builder);
        Mockito.when(builder2.getFs()).thenThrow(new Throwable[]{new RuntimeException("failure on purpose")});
        try {
            new GobblinHelixTaskFactory(builder2, build.metricContext, this.taskStateTracker, ConfigFactory.empty(), Optional.of(createTaskDriverWithMockedAttributes)).createNewTask(taskCallbackContext).run();
            Assert.fail();
        } catch (Throwable th) {
        }
    }

    @Subscribe
    public void handleContainerHealthCheckFailureEvent(ContainerHealthCheckFailureEvent containerHealthCheckFailureEvent) {
        this.countDownLatchForFailInTaskCreation.countDown();
    }

    private TaskDriver createTaskDriverWithMockedAttributes(TaskCallbackContext taskCallbackContext, TaskConfig taskConfig) {
        String join = Joiner.on("_").join(TestHelper.TEST_JOB_ID, TestHelper.TEST_JOB_ID, new Object[0]);
        JobConfig jobConfig = (JobConfig) Mockito.mock(JobConfig.class);
        Mockito.when(jobConfig.getJobId()).thenReturn(join);
        Mockito.when(taskCallbackContext.getJobConfig()).thenReturn(jobConfig);
        JobContext jobContext = (JobContext) Mockito.mock(JobContext.class);
        Mockito.when(jobContext.getTaskIdPartitionMap()).thenReturn(ImmutableMap.of(taskConfig.getId(), 0));
        TaskDriver taskDriver = (TaskDriver) Mockito.mock(TaskDriver.class);
        Mockito.when(taskDriver.getJobContext(Mockito.anyString())).thenReturn(jobContext);
        return taskDriver;
    }

    @Test(dependsOnMethods = {"testPrepareTask"})
    public void testRun() throws IOException {
        TaskResult run = this.gobblinHelixTask.run();
        System.out.println(run.getInfo());
        Assert.assertEquals(run.getStatus(), TaskResult.Status.COMPLETED);
        File file = new File(this.taskOutputDir.toString(), TestHelper.REL_WRITER_FILE_PATH + File.separator + TestHelper.WRITER_FILE_NAME);
        Assert.assertTrue(file.exists());
        TestHelper.assertGenericRecords(file, new Schema.Parser().parse(TestHelper.SOURCE_SCHEMA));
    }

    @AfterClass
    public void tearDown() throws IOException {
        try {
            if (this.localFs.exists(this.appWorkDir)) {
                this.localFs.delete(this.appWorkDir, true);
            }
        } finally {
            this.taskExecutor.stopAsync().awaitTerminated();
            this.taskStateTracker.stopAsync().awaitTerminated();
        }
    }

    private void prepareWorkUnit(WorkUnit workUnit) {
        workUnit.setProp("task.id", TestHelper.TEST_TASK_ID);
        workUnit.setProp("task.key", Long.toString(Id.parse(TestHelper.TEST_TASK_ID).getSequence().longValue()));
        workUnit.setProp("source.class", SimpleJsonSource.class.getName());
        workUnit.setProp("converter.classes", SimpleJsonConverter.class.getName());
        workUnit.setProp("writer.output.format", WriterOutputFormat.AVRO.toString());
        workUnit.setProp("writer.destination.type", Destination.DestinationType.HDFS.toString());
        workUnit.setProp("writer.staging.dir", this.appWorkDir.toString() + "/staging");
        workUnit.setProp("writer.output.dir", this.taskOutputDir.toString());
        workUnit.setProp("writer.file.name", TestHelper.WRITER_FILE_NAME);
        workUnit.setProp("writer.file.path", TestHelper.REL_WRITER_FILE_PATH);
        workUnit.setProp("writer.builder.class", AvroDataWriterBuilder.class.getName());
        workUnit.setProp("source.schema", TestHelper.SOURCE_SCHEMA);
    }
}
