package org.apache.gobblin.cluster;

import com.google.common.eventbus.EventBus;
import com.google.common.io.Files;
import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.ExecutionException;
import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent;
import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent;
import org.apache.gobblin.runtime.api.FsSpecProducer;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
import org.apache.gobblin.util.filters.HiddenFilter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/gobblin/cluster/FsJobConfigurationManagerTest.class */
public class FsJobConfigurationManagerTest {
    private static final Logger log = LoggerFactory.getLogger(FsJobConfigurationManagerTest.class);
    private MutableJobCatalog _jobCatalog;
    private FsJobConfigurationManager jobConfigurationManager;
    private FileSystem fs;
    private SpecProducer _specProducer;
    private EventBus eventBus;
    private String jobConfDir = "/tmp/" + getClass().getSimpleName() + "/jobCatalog";
    private String fsSpecConsumerPathString = "/tmp/fsJobConfigManagerTest";
    private String jobSpecUriString = "testJobSpec";
    private int newJobConfigArrivalEventCount = 0;
    private int updateJobConfigArrivalEventCount = 0;
    private int deleteJobConfigArrivalEventCount = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.gobblin.cluster.FsJobConfigurationManagerTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/gobblin/cluster/FsJobConfigurationManagerTest$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$gobblin$runtime$api$SpecExecutor$Verb = new int[SpecExecutor.Verb.values().length];

        static {
            try {
                $SwitchMap$org$apache$gobblin$runtime$api$SpecExecutor$Verb[SpecExecutor.Verb.ADD.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$gobblin$runtime$api$SpecExecutor$Verb[SpecExecutor.Verb.DELETE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$gobblin$runtime$api$SpecExecutor$Verb[SpecExecutor.Verb.UPDATE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @BeforeClass
    public void setUp() throws IOException {
        this.eventBus = (EventBus) Mockito.mock(EventBus.class);
        ((EventBus) Mockito.doAnswer(invocationOnMock -> {
            Object obj = invocationOnMock.getArguments()[0];
            if (obj instanceof NewJobConfigArrivalEvent) {
                this.newJobConfigArrivalEventCount++;
                return null;
            }
            if (obj instanceof DeleteJobConfigArrivalEvent) {
                this.deleteJobConfigArrivalEventCount++;
                return null;
            }
            if (!(obj instanceof UpdateJobConfigArrivalEvent)) {
                throw new IOException("Unexpected event type");
            }
            this.updateJobConfigArrivalEventCount++;
            return null;
        }).when(this.eventBus)).post(Mockito.any());
        this.fs = FileSystem.getLocal(new Configuration(false));
        Path path = new Path(this.jobConfDir);
        if (!this.fs.exists(path)) {
            this.fs.mkdirs(path);
        }
        Config withValue = ConfigFactory.empty().withValue("jobconf.fullyQualifiedPath", ConfigValueFactory.fromAnyRef(this.jobConfDir)).withValue("gobblin.cluster.specConsumer.path", ConfigValueFactory.fromAnyRef(this.fsSpecConsumerPathString)).withValue("gobblin.cluster.job.spec.refresh.interval", ConfigValueFactory.fromAnyRef(1));
        this._jobCatalog = new NonObservingFSJobCatalog(withValue);
        this._jobCatalog.startAsync().awaitRunning();
        this.jobConfigurationManager = new FsJobConfigurationManager(this.eventBus, withValue, this._jobCatalog, this.fs);
        this._specProducer = new FsSpecProducer(this.fs, withValue);
    }

    private void addJobSpec(String str, String str2, String str3) throws URISyntaxException, IOException {
        JobSpec build = JobSpec.builder(new URI(Files.getNameWithoutExtension(str))).withConfig(ConfigFactory.empty()).withTemplate(new URI("FS:///")).withVersion(str2).withDescription("test").build();
        switch (AnonymousClass1.$SwitchMap$org$apache$gobblin$runtime$api$SpecExecutor$Verb[SpecExecutor.Verb.valueOf(str3).ordinal()]) {
            case 1:
                this._specProducer.addSpec(build);
                return;
            case 2:
                this._specProducer.deleteSpec(build.getUri());
                return;
            case 3:
                this._specProducer.updateSpec(build);
                return;
            default:
                throw new IOException("Unknown Spec Verb: " + str3);
        }
    }

    @Test(expectedExceptions = {JobSpecNotFoundException.class})
    public void testFetchJobSpecs() throws ExecutionException, InterruptedException, URISyntaxException, JobSpecNotFoundException, IOException {
        addJobSpec(this.jobSpecUriString, "1", SpecExecutor.Verb.ADD.name());
        this.jobConfigurationManager.fetchJobSpecs();
        JobSpec jobSpec = this._jobCatalog.getJobSpec(new URI(this.jobSpecUriString));
        Assert.assertTrue(jobSpec != null);
        Assert.assertTrue(jobSpec.getVersion().equals("1"));
        Assert.assertTrue(jobSpec.getUri().getPath().equals(this.jobSpecUriString));
        Path path = new Path(this.fsSpecConsumerPathString);
        Assert.assertEquals(this.fs.listStatus(path, new HiddenFilter()).length, 0);
        Assert.assertEquals(this.newJobConfigArrivalEventCount, 1);
        Assert.assertEquals(this.updateJobConfigArrivalEventCount, 0);
        Assert.assertEquals(this.deleteJobConfigArrivalEventCount, 0);
        addJobSpec(this.jobSpecUriString, "2", SpecExecutor.Verb.UPDATE.name());
        this.jobConfigurationManager.fetchJobSpecs();
        JobSpec jobSpec2 = this._jobCatalog.getJobSpec(new URI(this.jobSpecUriString));
        Assert.assertTrue(jobSpec2 != null);
        Assert.assertTrue(jobSpec2.getVersion().equals("2"));
        Assert.assertEquals(this.fs.listStatus(path, new HiddenFilter()).length, 0);
        Assert.assertEquals(this.newJobConfigArrivalEventCount, 1);
        Assert.assertEquals(this.updateJobConfigArrivalEventCount, 1);
        Assert.assertEquals(this.deleteJobConfigArrivalEventCount, 0);
        addJobSpec(this.jobSpecUriString, "2", SpecExecutor.Verb.DELETE.name());
        this.jobConfigurationManager.fetchJobSpecs();
        Assert.assertEquals(this.fs.listStatus(path, new HiddenFilter()).length, 0);
        this._jobCatalog.getJobSpec(new URI(this.jobSpecUriString));
        Assert.assertEquals(this.newJobConfigArrivalEventCount, 1);
        Assert.assertEquals(this.updateJobConfigArrivalEventCount, 1);
        Assert.assertEquals(this.deleteJobConfigArrivalEventCount, 1);
    }

    @Test
    public void testException() throws Exception {
        FsJobConfigurationManager fsJobConfigurationManager = (FsJobConfigurationManager) Mockito.spy(this.jobConfigurationManager);
        ((FsJobConfigurationManager) Mockito.doThrow(new Throwable[]{new ExecutionException(new IOException("Test exception"))}).when(fsJobConfigurationManager)).fetchJobSpecs();
        fsJobConfigurationManager.startUp();
        Thread.sleep(2000L);
        int size = Mockito.mockingDetails(fsJobConfigurationManager).getInvocations().size();
        ((FsJobConfigurationManager) Mockito.verify(fsJobConfigurationManager, Mockito.atLeast(1))).fetchJobSpecs();
        Thread.sleep(2000L);
        ((FsJobConfigurationManager) Mockito.verify(fsJobConfigurationManager, Mockito.atLeast(size + 1))).fetchJobSpecs();
        Assert.assertTrue((fsJobConfigurationManager.state().equals(Service.State.FAILED) || fsJobConfigurationManager.state().equals(Service.State.TERMINATED)) ? false : true);
    }

    @AfterClass
    public void tearDown() throws IOException {
        Path path = new Path(this.fsSpecConsumerPathString);
        if (this.fs.exists(path)) {
            this.fs.delete(path, true);
        }
        Path path2 = new Path(this.jobConfDir);
        if (this.fs.exists(path2)) {
            this.fs.delete(path2, true);
        }
    }
}
