package co.cask.cdap.internal.app.runtime.schedule.queue;

import co.cask.cdap.api.dataset.DatasetManagementException;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.dataset.module.DatasetDefinitionRegistry;
import co.cask.cdap.api.dataset.module.DatasetModule;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.LocationRuntimeModule;
import co.cask.cdap.common.namespace.InMemoryNamespaceClient;
import co.cask.cdap.common.namespace.NamespaceQueryAdmin;
import co.cask.cdap.data.runtime.DynamicTransactionExecutorFactory;
import co.cask.cdap.data.runtime.SystemDatasetRuntimeModule;
import co.cask.cdap.data2.datafabric.dataset.DatasetsUtil;
import co.cask.cdap.data2.dataset2.DatasetDefinitionRegistryFactory;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.DefaultDatasetDefinitionRegistry;
import co.cask.cdap.data2.dataset2.InMemoryDatasetFramework;
import co.cask.cdap.internal.app.AppFabricDatasetModule;
import co.cask.cdap.internal.app.runtime.schedule.ProgramSchedule;
import co.cask.cdap.internal.app.runtime.schedule.ProgramScheduleMeta;
import co.cask.cdap.internal.app.runtime.schedule.ProgramScheduleRecord;
import co.cask.cdap.internal.app.runtime.schedule.ProgramScheduleStatus;
import co.cask.cdap.internal.app.runtime.schedule.queue.Job;
import co.cask.cdap.internal.app.runtime.schedule.store.Schedulers;
import co.cask.cdap.internal.app.runtime.schedule.trigger.PartitionTrigger;
import co.cask.cdap.internal.app.runtime.schedule.trigger.TimeTrigger;
import co.cask.cdap.proto.Notification;
import co.cask.cdap.proto.id.ApplicationId;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.WorkflowId;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Module;
import com.google.inject.Scopes;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.multibindings.MapBinder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.tephra.TransactionExecutor;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.inmemory.InMemoryTxSystemClient;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/schedule/queue/JobQueueDatasetTest.class */
public class JobQueueDatasetTest {

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    private static final NamespaceId TEST_NS = new NamespaceId("jobQueueTest");
    private static final ApplicationId APP_ID = TEST_NS.app("app1");
    private static final WorkflowId WORKFLOW_ID = APP_ID.workflow("wf1");
    private static final DatasetId DATASET_ID = TEST_NS.dataset("pfs1");
    private static final ProgramSchedule SCHED1 = new ProgramSchedule("SCHED1", "one partition schedule", WORKFLOW_ID, ImmutableMap.of("prop3", "abc"), new PartitionTrigger(DATASET_ID, 1), ImmutableList.of());
    private static final ProgramSchedule SCHED2 = new ProgramSchedule("SCHED2", "time schedule", WORKFLOW_ID, ImmutableMap.of("prop3", "abc"), new TimeTrigger("* * * * *"), ImmutableList.of());
    private static final Job SCHED1_JOB = new SimpleJob(SCHED1, System.currentTimeMillis(), Lists.newArrayList(), Job.State.PENDING_TRIGGER, 0);
    private static final Job SCHED2_JOB = new SimpleJob(SCHED2, System.currentTimeMillis(), Lists.newArrayList(), Job.State.PENDING_TRIGGER, 0);
    private static TransactionManager txManager;
    private static TransactionExecutor txExecutor;
    private static JobQueueDataset jobQueue;

    @BeforeClass
    public static void beforeClass() throws IOException, DatasetManagementException {
        CConfiguration create = CConfiguration.create();
        create.set("local.data.dir", TEMP_FOLDER.newFolder().getAbsolutePath());
        txManager = new TransactionManager(new Configuration());
        txManager.startAndWait();
        DatasetFramework datasetFramework = (DatasetFramework) Guice.createInjector(new Module[]{new ConfigModule(create), new LocationRuntimeModule().getInMemoryModules(), new SystemDatasetRuntimeModule().getInMemoryModules(), new AbstractModule() { // from class: co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDatasetTest.1
            protected void configure() {
                MapBinder.newMapBinder(binder(), String.class, DatasetModule.class, Constants.Dataset.Manager.DefaultDatasetModules.class).addBinding("app-fabric").toInstance(new AppFabricDatasetModule());
                install(new FactoryModuleBuilder().implement(DatasetDefinitionRegistry.class, DefaultDatasetDefinitionRegistry.class).build(DatasetDefinitionRegistryFactory.class));
                bind(DatasetFramework.class).to(InMemoryDatasetFramework.class);
                bind(NamespaceQueryAdmin.class).to(InMemoryNamespaceClient.class).in(Scopes.SINGLETON);
            }
        }}).getInstance(DatasetFramework.class);
        DatasetsUtil.createIfNotExists(datasetFramework, Schedulers.JOB_QUEUE_DATASET_ID, JobQueueDataset.class.getName(), DatasetProperties.EMPTY);
        jobQueue = datasetFramework.getDataset(Schedulers.JOB_QUEUE_DATASET_ID, new HashMap(), (ClassLoader) null);
        txExecutor = new DynamicTransactionExecutorFactory(new InMemoryTxSystemClient(txManager)).createExecutor(Collections.singleton(jobQueue));
    }

    @AfterClass
    public static void afterClass() {
        txManager.stopAndWait();
    }

    @After
    public void tearDown() throws Exception {
        txExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDatasetTest.2
            public void apply() throws Exception {
                Iterator it = JobQueueDatasetTest.this.getAllJobs(JobQueueDatasetTest.jobQueue, true).iterator();
                while (it.hasNext()) {
                    JobQueueDatasetTest.jobQueue.deleteJob((Job) it.next());
                }
            }
        });
    }

    @Test
    public void testMessageId() throws Exception {
        txExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDatasetTest.3
            public void apply() throws Exception {
                Assert.assertNull(JobQueueDatasetTest.jobQueue.retrieveSubscriberState("topic1"));
                JobQueueDatasetTest.jobQueue.persistSubscriberState("topic1", "messageIdToPut");
                Assert.assertEquals("messageIdToPut", JobQueueDatasetTest.jobQueue.retrieveSubscriberState("topic1"));
                Assert.assertNull(JobQueueDatasetTest.jobQueue.retrieveSubscriberState("topic2"));
            }
        });
        txExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDatasetTest.4
            public void apply() throws Exception {
                Assert.assertEquals("messageIdToPut", JobQueueDatasetTest.jobQueue.retrieveSubscriberState("topic1"));
                Assert.assertNull(JobQueueDatasetTest.jobQueue.retrieveSubscriberState("topic2"));
            }
        });
    }

    @Test
    public void testJobQueue() throws Exception {
        txExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDatasetTest.5
            public void apply() throws Exception {
                Assert.assertEquals(0L, JobQueueDatasetTest.this.getAllJobs(JobQueueDatasetTest.jobQueue).size());
                Assert.assertEquals(0L, JobQueueDatasetTest.this.toSet(JobQueueDatasetTest.jobQueue.getJobsForSchedule(JobQueueDatasetTest.SCHED1.getScheduleId())).size());
                Assert.assertEquals(0L, JobQueueDatasetTest.this.toSet(JobQueueDatasetTest.jobQueue.getJobsForSchedule(JobQueueDatasetTest.SCHED2.getScheduleId())).size());
                JobQueueDatasetTest.jobQueue.put(JobQueueDatasetTest.SCHED1_JOB);
                Assert.assertEquals(JobQueueDatasetTest.SCHED1_JOB, JobQueueDatasetTest.jobQueue.getJob(JobQueueDatasetTest.SCHED1_JOB.getJobKey()));
                Assert.assertEquals(ImmutableSet.of(JobQueueDatasetTest.SCHED1_JOB), JobQueueDatasetTest.this.getAllJobs(JobQueueDatasetTest.jobQueue));
                Assert.assertEquals(ImmutableSet.of(JobQueueDatasetTest.SCHED1_JOB), JobQueueDatasetTest.this.toSet(JobQueueDatasetTest.jobQueue.getJobsForSchedule(JobQueueDatasetTest.SCHED1.getScheduleId())));
                Assert.assertEquals(0L, JobQueueDatasetTest.this.toSet(JobQueueDatasetTest.jobQueue.getJobsForSchedule(JobQueueDatasetTest.SCHED2.getScheduleId())).size());
                JobQueueDatasetTest.jobQueue.put(JobQueueDatasetTest.SCHED2_JOB);
                Assert.assertEquals(ImmutableSet.of(JobQueueDatasetTest.SCHED1_JOB, JobQueueDatasetTest.SCHED2_JOB), JobQueueDatasetTest.this.getAllJobs(JobQueueDatasetTest.jobQueue));
                Assert.assertEquals(ImmutableSet.of(JobQueueDatasetTest.SCHED2_JOB), JobQueueDatasetTest.this.toSet(JobQueueDatasetTest.jobQueue.getJobsForSchedule(JobQueueDatasetTest.SCHED2.getScheduleId())));
                JobQueueDatasetTest.jobQueue.markJobsForDeletion(JobQueueDatasetTest.SCHED1.getScheduleId(), System.currentTimeMillis());
            }
        });
        txExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDatasetTest.6
            public void apply() throws Exception {
                Assert.assertEquals(ImmutableSet.of(JobQueueDatasetTest.SCHED2_JOB), JobQueueDatasetTest.this.getAllJobs(JobQueueDatasetTest.jobQueue));
                Assert.assertEquals(0L, JobQueueDatasetTest.this.toSet(JobQueueDatasetTest.jobQueue.getJobsForSchedule(JobQueueDatasetTest.SCHED1.getScheduleId())).size());
                Assert.assertEquals(ImmutableSet.of(JobQueueDatasetTest.SCHED2_JOB), JobQueueDatasetTest.this.toSet(JobQueueDatasetTest.jobQueue.getJobsForSchedule(JobQueueDatasetTest.SCHED2.getScheduleId())));
            }
        });
        txExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDatasetTest.7
            public void apply() throws Exception {
                for (Job job : JobQueueDatasetTest.this.getAllJobs(JobQueueDatasetTest.jobQueue, true)) {
                    if (job.isToBeDeleted()) {
                        JobQueueDatasetTest.jobQueue.deleteJob(job);
                    }
                }
            }
        });
        txExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDatasetTest.8
            public void apply() throws Exception {
                Assert.assertEquals(ImmutableSet.of(JobQueueDatasetTest.SCHED2_JOB), JobQueueDatasetTest.this.getAllJobs(JobQueueDatasetTest.jobQueue, true));
                Assert.assertEquals(0L, JobQueueDatasetTest.this.toSet(JobQueueDatasetTest.jobQueue.getJobsForSchedule(JobQueueDatasetTest.SCHED1.getScheduleId())).size());
                Assert.assertEquals(ImmutableSet.of(JobQueueDatasetTest.SCHED2_JOB), JobQueueDatasetTest.this.toSet(JobQueueDatasetTest.jobQueue.getJobsForSchedule(JobQueueDatasetTest.SCHED2.getScheduleId())));
            }
        });
    }

    @Test
    public void testGetAllJobs() throws Exception {
        txExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDatasetTest.9
            public void apply() throws Exception {
                Assert.assertEquals(0L, JobQueueDatasetTest.this.getAllJobs(JobQueueDatasetTest.jobQueue).size());
                JobQueueDatasetTest.jobQueue.persistSubscriberState("someTopic", "someMessageId");
                JobQueueDatasetTest.jobQueue.put(JobQueueDatasetTest.SCHED1_JOB);
                Assert.assertEquals(ImmutableSet.of(JobQueueDatasetTest.SCHED1_JOB), JobQueueDatasetTest.this.getAllJobs(JobQueueDatasetTest.jobQueue));
                Assert.assertEquals(ImmutableSet.of(JobQueueDatasetTest.SCHED1_JOB), JobQueueDatasetTest.this.toSet(JobQueueDatasetTest.jobQueue.fullScan()));
            }
        });
    }

    @Test
    public void testJobQueueIteration() throws Exception {
        txExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDatasetTest.10
            public void apply() throws Exception {
                Assert.assertEquals(0L, JobQueueDatasetTest.this.getAllJobs(JobQueueDatasetTest.jobQueue).size());
                HashMultimap create = HashMultimap.create();
                for (int i = 0; i < 100; i++) {
                    ProgramSchedule programSchedule = new ProgramSchedule("sched" + i, "one partition schedule", JobQueueDatasetTest.WORKFLOW_ID, ImmutableMap.of(), new PartitionTrigger(JobQueueDatasetTest.DATASET_ID, 1), ImmutableList.of());
                    SimpleJob simpleJob = new SimpleJob(programSchedule, 1494353984967L + i, ImmutableList.of(), Job.State.PENDING_TRIGGER, 0L);
                    create.put(Integer.valueOf(JobQueueDatasetTest.jobQueue.getPartition(programSchedule.getScheduleId())), simpleJob);
                    JobQueueDatasetTest.jobQueue.put(simpleJob);
                }
                HashSet hashSet = new HashSet(create.get(0));
                Assert.assertTrue(hashSet.size() > 1);
                Assert.assertEquals(create.size(), JobQueueDatasetTest.this.getAllJobs(JobQueueDatasetTest.jobQueue).size());
                Assert.assertEquals(hashSet, JobQueueDatasetTest.this.toSet(JobQueueDatasetTest.jobQueue.getJobs(0, (Job) null)));
                CloseableIterator jobs = JobQueueDatasetTest.jobQueue.getJobs(0, (Job) null);
                Throwable th = null;
                try {
                    try {
                        Assert.assertTrue(jobs.hasNext());
                        Job job = (Job) jobs.next();
                        if (jobs != null) {
                            if (0 != 0) {
                                try {
                                    jobs.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                jobs.close();
                            }
                        }
                        Set set = JobQueueDatasetTest.this.toSet(JobQueueDatasetTest.jobQueue.getJobs(0, job));
                        Assert.assertEquals(hashSet.size() - 1, set.size());
                        HashSet hashSet2 = new HashSet(set);
                        hashSet2.add(job);
                        Assert.assertEquals(hashSet, hashSet2);
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (jobs != null) {
                        if (th != null) {
                            try {
                                jobs.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            jobs.close();
                        }
                    }
                    throw th3;
                }
            }
        });
    }

    @Test
    public void testAddNotifications() throws Exception {
        txExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDatasetTest.11
            public void apply() throws Exception {
                Assert.assertEquals(0L, JobQueueDatasetTest.this.getAllJobs(JobQueueDatasetTest.jobQueue, false).size());
                Notification forPartitions = Notification.forPartitions(JobQueueDatasetTest.DATASET_ID, ImmutableList.of());
                Assert.assertNull(JobQueueDatasetTest.jobQueue.getJob(JobQueueDatasetTest.SCHED1_JOB.getJobKey()));
                JobQueueDatasetTest.jobQueue.put(JobQueueDatasetTest.SCHED1_JOB);
                Assert.assertEquals(JobQueueDatasetTest.SCHED1_JOB, JobQueueDatasetTest.jobQueue.getJob(JobQueueDatasetTest.SCHED1_JOB.getJobKey()));
                JobQueueDatasetTest.jobQueue.addNotification(new ProgramScheduleRecord(JobQueueDatasetTest.SCHED1, new ProgramScheduleMeta(ProgramScheduleStatus.SCHEDULED, 0L)), forPartitions);
                Assert.assertEquals(ImmutableList.of(forPartitions), JobQueueDatasetTest.jobQueue.getJob(JobQueueDatasetTest.SCHED1_JOB.getJobKey()).getNotifications());
            }
        });
    }

    @Test
    public void testJobTimeout() throws Exception {
        txExecutor.execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDatasetTest.12
            public void apply() throws Exception {
                Assert.assertEquals(0L, JobQueueDatasetTest.this.getAllJobs(JobQueueDatasetTest.jobQueue, false).size());
                Notification forPartitions = Notification.forPartitions(JobQueueDatasetTest.DATASET_ID, ImmutableList.of());
                Assert.assertNull(JobQueueDatasetTest.jobQueue.getJob(JobQueueDatasetTest.SCHED1_JOB.getJobKey()));
                ProgramSchedule programSchedule = new ProgramSchedule("SCHED1", "one partition schedule", JobQueueDatasetTest.WORKFLOW_ID, ImmutableMap.of("prop3", "abc"), new PartitionTrigger(JobQueueDatasetTest.DATASET_ID, 1), ImmutableList.of());
                SimpleJob simpleJob = new SimpleJob(programSchedule, System.currentTimeMillis() - Schedulers.JOB_QUEUE_TIMEOUT_MILLIS, Lists.newArrayList(), Job.State.PENDING_TRIGGER, 0L);
                JobQueueDatasetTest.jobQueue.put(simpleJob);
                Assert.assertEquals(simpleJob, JobQueueDatasetTest.jobQueue.getJob(simpleJob.getJobKey()));
                Assert.assertEquals(1L, JobQueueDatasetTest.this.toSet(JobQueueDatasetTest.jobQueue.getJobsForSchedule(programSchedule.getScheduleId()), true).size());
                JobQueueDatasetTest.jobQueue.addNotification(new ProgramScheduleRecord(JobQueueDatasetTest.SCHED1, new ProgramScheduleMeta(ProgramScheduleStatus.SCHEDULED, 0L)), forPartitions);
                ArrayList arrayList = new ArrayList(JobQueueDatasetTest.this.toSet(JobQueueDatasetTest.jobQueue.getJobsForSchedule(programSchedule.getScheduleId()), true));
                Collections.sort(arrayList, new Comparator<Job>() { // from class: co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDatasetTest.12.1
                    @Override // java.util.Comparator
                    public int compare(Job job, Job job2) {
                        return Long.valueOf(job.getCreationTime()).compareTo(Long.valueOf(job2.getCreationTime()));
                    }
                });
                Assert.assertEquals(2L, arrayList.size());
                Job job = (Job) arrayList.get(0);
                Assert.assertEquals(simpleJob.getCreationTime(), job.getCreationTime());
                Assert.assertEquals(0L, job.getNotifications().size());
                Assert.assertTrue(job.isToBeDeleted());
                Job job2 = (Job) arrayList.get(1);
                Assert.assertNotEquals(simpleJob.getCreationTime(), job2.getCreationTime());
                Assert.assertEquals(1L, job2.getNotifications().size());
                Assert.assertEquals(forPartitions, job2.getNotifications().get(0));
                Assert.assertFalse(job2.isToBeDeleted());
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Set<Job> getAllJobs(JobQueueDataset jobQueueDataset) {
        return getAllJobs(jobQueueDataset, false);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Set<Job> getAllJobs(JobQueueDataset jobQueueDataset, boolean z) {
        HashSet hashSet = new HashSet();
        for (int i = 0; i < jobQueueDataset.getNumPartitions(); i++) {
            CloseableIterator<Job> jobs = jobQueueDataset.getJobs(i, (Job) null);
            Throwable th = null;
            try {
                try {
                    hashSet.addAll(toSet(jobs, z));
                    if (jobs != null) {
                        if (0 != 0) {
                            try {
                                jobs.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            jobs.close();
                        }
                    }
                } catch (Throwable th3) {
                    if (jobs != null) {
                        if (th != null) {
                            try {
                                jobs.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            jobs.close();
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Set<Job> toSet(CloseableIterator<Job> closeableIterator) {
        return toSet(closeableIterator, false);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Set<Job> toSet(CloseableIterator<Job> closeableIterator, boolean z) {
        try {
            HashSet hashSet = new HashSet();
            while (closeableIterator.hasNext()) {
                Job job = (Job) closeableIterator.next();
                if (z || !job.isToBeDeleted()) {
                    hashSet.add(job);
                }
            }
            return hashSet;
        } finally {
            closeableIterator.close();
        }
    }
}
