package co.cask.cdap.scheduler;

import co.cask.cdap.AppWithFrequentScheduledWorkflows;
import co.cask.cdap.AppWithMultipleSchedules;
import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.Transactionals;
import co.cask.cdap.api.artifact.ArtifactSummary;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.dataset.lib.PartitionKey;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.api.schedule.TriggerInfo;
import co.cask.cdap.api.schedule.TriggeringScheduleInfo;
import co.cask.cdap.api.workflow.WorkflowToken;
import co.cask.cdap.app.program.ProgramDescriptor;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.AlreadyExistsException;
import co.cask.cdap.common.ConflictException;
import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.common.ProfileConflictException;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.id.Id;
import co.cask.cdap.common.transaction.MultiThreadTransactionAware;
import co.cask.cdap.common.utils.Tasks;
import co.cask.cdap.data.dataset.SystemDatasetInstantiator;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.MultiThreadDatasetCache;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.internal.app.DefaultApplicationSpecification;
import co.cask.cdap.internal.app.program.MessagingProgramStateWriter;
import co.cask.cdap.internal.app.runtime.BasicArguments;
import co.cask.cdap.internal.app.runtime.SimpleProgramOptions;
import co.cask.cdap.internal.app.runtime.schedule.ProgramSchedule;
import co.cask.cdap.internal.app.runtime.schedule.ProgramScheduleStatus;
import co.cask.cdap.internal.app.runtime.schedule.TriggeringScheduleInfoAdapter;
import co.cask.cdap.internal.app.runtime.schedule.queue.Job;
import co.cask.cdap.internal.app.runtime.schedule.store.Schedulers;
import co.cask.cdap.internal.app.runtime.schedule.trigger.PartitionTrigger;
import co.cask.cdap.internal.app.runtime.schedule.trigger.TimeTrigger;
import co.cask.cdap.internal.app.services.http.AppFabricTestBase;
import co.cask.cdap.internal.app.store.RunRecordMeta;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.client.StoreRequestBuilder;
import co.cask.cdap.messaging.data.MessageId;
import co.cask.cdap.proto.Notification;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.ProtoTrigger;
import co.cask.cdap.proto.WorkflowTokenDetail;
import co.cask.cdap.proto.artifact.AppRequest;
import co.cask.cdap.proto.codec.WorkflowTokenDetailCodec;
import co.cask.cdap.proto.id.ApplicationId;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProfileId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.id.ScheduleId;
import co.cask.cdap.proto.id.TopicId;
import co.cask.cdap.proto.id.WorkflowId;
import co.cask.cdap.proto.profile.Profile;
import co.cask.cdap.test.XSlowTests;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Service;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.tephra.RetryStrategies;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:co/cask/cdap/scheduler/CoreSchedulerServiceTest.class */
public class CoreSchedulerServiceTest extends AppFabricTestBase {
    private static final NamespaceId NS_ID = new NamespaceId("schedtest");
    private static final ApplicationId APP1_ID = NS_ID.app("app1");
    private static final ApplicationId APP2_ID = NS_ID.app("app2");
    private static final WorkflowId PROG1_ID = APP1_ID.workflow("wf1");
    private static final WorkflowId PROG2_ID = APP2_ID.workflow("wf2");
    private static final WorkflowId PROG11_ID = APP1_ID.workflow("wf11");
    private static final ScheduleId PSCHED1_ID = APP1_ID.schedule("psched1");
    private static final ScheduleId PSCHED2_ID = APP2_ID.schedule("psched2");
    private static final ScheduleId TSCHED1_ID = APP1_ID.schedule("tsched1");
    private static final ScheduleId TSCHED11_ID = APP1_ID.schedule("tsched11");
    private static final DatasetId DS1_ID = NS_ID.dataset("pfs1");
    private static final DatasetId DS2_ID = NS_ID.dataset("pfs2");
    private static final ApplicationId APP_ID = NamespaceId.DEFAULT.app(AppWithFrequentScheduledWorkflows.NAME, "1.0.0");
    private static final ApplicationId APP_MULT_ID = NamespaceId.DEFAULT.app(AppWithMultipleSchedules.NAME);
    private static final ProgramId WORKFLOW_1 = APP_ID.program(ProgramType.WORKFLOW, "SomeWorkflow");
    private static final ProgramId WORKFLOW_2 = APP_ID.program(ProgramType.WORKFLOW, "AnotherWorkflow");
    private static final ProgramId SCHEDULED_WORKFLOW_1 = APP_ID.program(ProgramType.WORKFLOW, AppWithFrequentScheduledWorkflows.SCHEDULED_WORKFLOW_1);
    private static final ProgramId SCHEDULED_WORKFLOW_2 = APP_ID.program(ProgramType.WORKFLOW, AppWithFrequentScheduledWorkflows.SCHEDULED_WORKFLOW_2);
    private static final ProgramId SOME_WORKFLOW = APP_MULT_ID.program(ProgramType.WORKFLOW, "SomeWorkflow");
    private static final ProgramId ANOTHER_WORKFLOW = APP_MULT_ID.program(ProgramType.WORKFLOW, "AnotherWorkflow");
    private static final ProgramId TRIGGERED_WORKFLOW = APP_MULT_ID.program(ProgramType.WORKFLOW, "TriggeredWorkflow");

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    private static final Gson GSON = TriggeringScheduleInfoAdapter.addTypeAdapters(new GsonBuilder()).registerTypeAdapter(WorkflowTokenDetail.class, new WorkflowTokenDetailCodec()).create();
    private static MessagingService messagingService;
    private static Store store;
    private static TopicId dataEventTopic;
    private static Scheduler scheduler;
    private static Transactional transactional;

    @BeforeClass
    public static void beforeClass() throws Throwable {
        AppFabricTestBase.beforeClass();
        scheduler = (Scheduler) getInjector().getInstance(Scheduler.class);
        if (scheduler instanceof Service) {
            scheduler.startAndWait();
        }
        messagingService = (MessagingService) getInjector().getInstance(MessagingService.class);
        store = (Store) getInjector().getInstance(Store.class);
        transactional = Transactions.createTransactionalWithRetry(Transactions.createTransactional(new MultiThreadDatasetCache(new SystemDatasetInstantiator((DatasetFramework) getInjector().getInstance(DatasetFramework.class)), getTxClient(), NamespaceId.SYSTEM, Collections.emptyMap(), (MetricsContext) null, (Map) null, new MultiThreadTransactionAware[0]), 30), RetryStrategies.retryOnConflict(20, 100L));
    }

    @AfterClass
    public static void afterClass() throws Exception {
        AppFabricTestBase.afterClass();
        if (scheduler instanceof Service) {
            scheduler.stopAndWait();
        }
    }

    @Test
    public void addListDeleteSchedules() throws Exception {
        Assert.assertTrue(scheduler.listSchedules(APP1_ID).isEmpty());
        Assert.assertTrue(scheduler.listSchedules(PROG1_ID).isEmpty());
        ProgramSchedule programSchedule = new ProgramSchedule("tsched1", "one time schedule", PROG1_ID, ImmutableMap.of("prop1", "nn"), new TimeTrigger("* * ? * 1"), ImmutableList.of());
        scheduler.addSchedule(programSchedule);
        Assert.assertEquals(programSchedule, scheduler.getSchedule(TSCHED1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule), scheduler.listSchedules(APP1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule), scheduler.listSchedules(PROG1_ID));
        ProgramSchedule programSchedule2 = new ProgramSchedule("psched1", "one partition schedule", PROG1_ID, ImmutableMap.of("prop3", "abc"), new PartitionTrigger(DS1_ID, 1), Collections.emptyList());
        ProgramSchedule programSchedule3 = new ProgramSchedule("tsched11", "two times schedule", PROG11_ID, ImmutableMap.of("prop2", "xx"), new TimeTrigger("* * ? * 1,2"), Collections.emptyList());
        ProgramSchedule programSchedule4 = new ProgramSchedule("psched2", "two partition schedule", PROG2_ID, ImmutableMap.of("propper", "popper"), new PartitionTrigger(DS2_ID, 2), Collections.emptyList());
        scheduler.addSchedules(ImmutableList.of(programSchedule2, programSchedule3, programSchedule4));
        Assert.assertEquals(programSchedule2, scheduler.getSchedule(PSCHED1_ID));
        Assert.assertEquals(programSchedule3, scheduler.getSchedule(TSCHED11_ID));
        Assert.assertEquals(programSchedule4, scheduler.getSchedule(PSCHED2_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule2, programSchedule), scheduler.listSchedules(PROG1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule3), scheduler.listSchedules(PROG11_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(PROG2_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule2, programSchedule, programSchedule3), scheduler.listSchedules(APP1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(APP2_ID));
        scheduler.deleteSchedule(TSCHED1_ID);
        verifyNotFound(scheduler, TSCHED1_ID);
        Assert.assertEquals(ImmutableList.of(programSchedule2), scheduler.listSchedules(PROG1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule3), scheduler.listSchedules(PROG11_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(PROG2_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule2, programSchedule3), scheduler.listSchedules(APP1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(APP2_ID));
        try {
            scheduler.deleteSchedules(ImmutableList.of(TSCHED1_ID, TSCHED11_ID));
            Assert.fail("expected NotFoundException");
        } catch (NotFoundException e) {
        }
        Assert.assertEquals(ImmutableList.of(programSchedule2), scheduler.listSchedules(PROG1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule3), scheduler.listSchedules(PROG11_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(PROG2_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule2, programSchedule3), scheduler.listSchedules(APP1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(APP2_ID));
        try {
            scheduler.addSchedules(ImmutableList.of(programSchedule, programSchedule3));
            Assert.fail("expected AlreadyExistsException");
        } catch (AlreadyExistsException e2) {
        }
        Assert.assertEquals(ImmutableList.of(programSchedule2), scheduler.listSchedules(PROG1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule3), scheduler.listSchedules(PROG11_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(PROG2_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule2, programSchedule3), scheduler.listSchedules(APP1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(APP2_ID));
        scheduler.addSchedule(programSchedule);
        scheduler.deleteSchedules(APP1_ID);
        verifyNotFound(scheduler, TSCHED1_ID);
        verifyNotFound(scheduler, PSCHED1_ID);
        verifyNotFound(scheduler, TSCHED11_ID);
        Assert.assertEquals(ImmutableList.of(), scheduler.listSchedules(PROG1_ID));
        Assert.assertEquals(ImmutableList.of(), scheduler.listSchedules(PROG11_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(PROG2_ID));
        Assert.assertEquals(ImmutableList.of(), scheduler.listSchedules(APP1_ID));
        Assert.assertEquals(ImmutableList.of(programSchedule4), scheduler.listSchedules(PROG2_ID));
    }

    private static void verifyNotFound(Scheduler scheduler2, ScheduleId scheduleId) {
        try {
            scheduler2.getSchedule(scheduleId);
            Assert.fail("expected NotFoundException");
        } catch (NotFoundException e) {
        }
    }

    @Test
    @Category({XSlowTests.class})
    public void testRunScheduledJobs() throws Exception {
        dataEventTopic = NamespaceId.SYSTEM.topic(((CConfiguration) getInjector().getInstance(CConfiguration.class)).get("data.event.topic"));
        Id.Artifact from = Id.Artifact.from(Id.Namespace.DEFAULT, "appwithschedules", "1.0.0");
        addAppArtifact(from, AppWithFrequentScheduledWorkflows.class);
        deploy(APP_ID, new AppRequest<>(new ArtifactSummary(from.getName(), from.getVersion().getVersion())));
        enableSchedule(AppWithFrequentScheduledWorkflows.TEN_SECOND_SCHEDULE_1);
        enableSchedule(AppWithFrequentScheduledWorkflows.TEN_SECOND_SCHEDULE_2);
        enableSchedule(AppWithFrequentScheduledWorkflows.DATASET_PARTITION_SCHEDULE_1);
        enableSchedule(AppWithFrequentScheduledWorkflows.DATASET_PARTITION_SCHEDULE_2);
        for (int i = 0; i < 5; i++) {
            testNewPartition(i + 1);
        }
        enableSchedule(AppWithFrequentScheduledWorkflows.COMPOSITE_SCHEDULE);
        int runs = getRuns(WORKFLOW_1, ProgramRunStatus.ALL);
        int runs2 = getRuns(WORKFLOW_2, ProgramRunStatus.ALL);
        disableSchedule(AppWithFrequentScheduledWorkflows.DATASET_PARTITION_SCHEDULE_1);
        disableSchedule(AppWithFrequentScheduledWorkflows.DATASET_PARTITION_SCHEDULE_2);
        publishNotification(dataEventTopic, NamespaceId.DEFAULT, AppWithFrequentScheduledWorkflows.DATASET_NAME1);
        long currentTimeMillis = System.currentTimeMillis();
        publishNotification(dataEventTopic, NamespaceId.DEFAULT, AppWithFrequentScheduledWorkflows.DATASET_NAME2);
        waitUntilProcessed(dataEventTopic, currentTimeMillis);
        Tasks.waitFor(true, () -> {
            return Boolean.valueOf(getRuns(SCHEDULED_WORKFLOW_1, ProgramRunStatus.COMPLETED) > 0 && getRuns(SCHEDULED_WORKFLOW_2, ProgramRunStatus.COMPLETED) > 0);
        }, 10L, TimeUnit.SECONDS);
        Assert.assertFalse(Iterables.any(getAllJobs(), job -> {
            return job.getSchedule().getTrigger() instanceof ProtoTrigger.PartitionTrigger;
        }));
        WorkflowId workflow = APP_ID.workflow(AppWithFrequentScheduledWorkflows.COMPOSITE_WORKFLOW);
        Assert.assertEquals(0L, getRuns(workflow, ProgramRunStatus.ALL));
        publishNotification(dataEventTopic, NamespaceId.DEFAULT, AppWithFrequentScheduledWorkflows.DATASET_NAME2);
        long currentTimeMillis2 = System.currentTimeMillis();
        publishNotification(dataEventTopic, NamespaceId.DEFAULT, AppWithFrequentScheduledWorkflows.DATASET_NAME2);
        waitUntilProcessed(dataEventTopic, currentTimeMillis2);
        waitForCompleteRuns(1, workflow);
        Iterator it = store.getRuns(SCHEDULED_WORKFLOW_1, ProgramRunStatus.ALL, 0L, Long.MAX_VALUE, Integer.MAX_VALUE).values().iterator();
        while (it.hasNext()) {
            Map systemArgs = ((RunRecordMeta) it.next()).getSystemArgs();
            Assert.assertNotNull(systemArgs);
            TriggeringScheduleInfo triggeringScheduleInfo = (TriggeringScheduleInfo) GSON.fromJson((String) systemArgs.get("triggeringScheduleInfo"), TriggeringScheduleInfo.class);
            Assert.assertEquals(AppWithFrequentScheduledWorkflows.TEN_SECOND_SCHEDULE_1, triggeringScheduleInfo.getName());
            List triggerInfos = triggeringScheduleInfo.getTriggerInfos();
            Assert.assertEquals(1L, triggerInfos.size());
            Assert.assertEquals(TriggerInfo.Type.TIME, ((TriggerInfo) triggerInfos.get(0)).getType());
        }
        Assert.assertEquals(runs, getRuns(WORKFLOW_1, ProgramRunStatus.ALL));
        Assert.assertEquals(runs2, getRuns(WORKFLOW_2, ProgramRunStatus.ALL));
        enableSchedule(AppWithFrequentScheduledWorkflows.DATASET_PARTITION_SCHEDULE_2);
        testScheduleUpdate("disable");
        testScheduleUpdate("update");
        testScheduleUpdate("delete");
    }

    @Test
    @Category({XSlowTests.class})
    public void testProgramEvents() throws Exception {
        deploy(AppWithMultipleSchedules.class, 200);
        CConfiguration cConfiguration = (CConfiguration) getInjector().getInstance(CConfiguration.class);
        TopicId topicId = NamespaceId.SYSTEM.topic(cConfiguration.get("program.status.record.event.topic"));
        MessagingProgramStateWriter messagingProgramStateWriter = new MessagingProgramStateWriter(cConfiguration, messagingService);
        ProgramRunId run = ANOTHER_WORKFLOW.run(RunIds.generate());
        DefaultApplicationSpecification defaultApplicationSpecification = new DefaultApplicationSpecification(AppWithMultipleSchedules.NAME, "-SNAPSHOT", "desc", (String) null, ANOTHER_WORKFLOW.getNamespaceId().artifact("test", "1.0").toApiArtifactId(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
        ProgramDescriptor programDescriptor = new ProgramDescriptor(run.getParent(), defaultApplicationSpecification);
        BasicArguments basicArguments = new BasicArguments(ImmutableMap.of("skipProvisioning", Boolean.TRUE.toString()));
        messagingProgramStateWriter.start(run, new SimpleProgramOptions(run.getParent(), basicArguments, new BasicArguments(), false), (String) null, programDescriptor);
        messagingProgramStateWriter.running(run, (String) null);
        long seconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
        messagingProgramStateWriter.error(run, (Throwable) null);
        waitUntilProcessed(topicId, seconds);
        ProgramRunId run2 = SOME_WORKFLOW.run(RunIds.generate());
        messagingProgramStateWriter.start(run2, new SimpleProgramOptions(run2.getParent(), basicArguments, new BasicArguments()), (String) null, new ProgramDescriptor(run2.getParent(), defaultApplicationSpecification));
        messagingProgramStateWriter.running(run2, (String) null);
        long seconds2 = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
        messagingProgramStateWriter.killed(run2);
        waitUntilProcessed(topicId, seconds2);
        Assert.assertEquals(0L, getRuns(TRIGGERED_WORKFLOW, ProgramRunStatus.ALL));
        scheduler.enableSchedule(APP_MULT_ID.schedule(AppWithMultipleSchedules.WORKFLOW_COMPLETED_SCHEDULE));
        startProgram(ANOTHER_WORKFLOW, (Map<String, String>) ImmutableMap.of(AppWithMultipleSchedules.ANOTHER_RUNTIME_ARG_KEY, AppWithMultipleSchedules.ANOTHER_RUNTIME_ARG_VALUE), 200);
        waitForCompleteRuns(1, TRIGGERED_WORKFLOW);
        assertProgramRuns(TRIGGERED_WORKFLOW, ProgramRunStatus.COMPLETED, 1);
        Map tokenData = getWorkflowToken(TRIGGERED_WORKFLOW, getProgramRuns(TRIGGERED_WORKFLOW, ProgramRunStatus.COMPLETED).get(0).getPid(), null, null).getTokenData();
        Assert.assertEquals(2L, tokenData.size());
        Assert.assertEquals(AppWithMultipleSchedules.ANOTHER_RUNTIME_ARG_VALUE, ((WorkflowTokenDetail.NodeValueDetail) ((List) tokenData.get(AppWithMultipleSchedules.TRIGGERED_RUNTIME_ARG_KEY)).get(0)).getValue());
        Assert.assertEquals(AppWithMultipleSchedules.ANOTHER_TOKEN_VALUE, ((WorkflowTokenDetail.NodeValueDetail) ((List) tokenData.get(AppWithMultipleSchedules.TRIGGERED_TOKEN_KEY)).get(0)).getValue());
    }

    @Test
    public void testAddScheduleWithDisabledProfile() throws Exception {
        ProfileId profile = NS_ID.profile("MyProfile");
        putProfile(profile, new Profile("MyProfile", Profile.NATIVE.getLabel(), Profile.NATIVE.getDescription(), Profile.NATIVE.getScope(), Profile.NATIVE.getProvisioner()), 200);
        ProgramSchedule programSchedule = new ProgramSchedule("tsched1", "one time schedule", PROG1_ID, ImmutableMap.of("prop1", "nn", "system.profile.name", "USER:MyProfile"), new TimeTrigger("* * ? * 1"), ImmutableList.of());
        scheduler.addSchedule(programSchedule);
        Assert.assertEquals(Collections.singletonList(programSchedule), scheduler.listSchedules(PROG1_ID));
        disableProfile(profile, 200);
        scheduler.deleteSchedule(TSCHED1_ID);
        Assert.assertEquals(Collections.emptyList(), scheduler.listSchedules(PROG1_ID));
        try {
            scheduler.addSchedule(programSchedule);
            Assert.fail();
        } catch (ProfileConflictException e) {
        }
        deleteProfile(profile, 200);
    }

    @Test
    public void testAddDeleteScheduleWithProfileProperty() throws Exception {
        ProfileId profile = NS_ID.profile("MyProfile");
        putProfile(profile, Profile.NATIVE, 200);
        scheduler.addSchedule(new ProgramSchedule("tsched1", "one time schedule", PROG1_ID, ImmutableMap.of("prop1", "nn", "system.profile.name", profile.getScopedName()), new TimeTrigger("* * ? * 1"), ImmutableList.of()));
        disableProfile(profile, 200);
        deleteProfile(profile, 409);
        scheduler.deleteSchedule(TSCHED1_ID);
        deleteProfile(profile, 200);
    }

    /* JADX WARN: Type inference failed for: r1v4, types: [co.cask.cdap.scheduler.CoreSchedulerServiceTest$1] */
    private WorkflowTokenDetail getWorkflowToken(ProgramId programId, String str, @Nullable WorkflowToken.Scope scope, @Nullable String str2) throws Exception {
        return (WorkflowTokenDetail) readResponse(doGet(getVersionedAPIPath(appendScopeAndKeyToUrl(String.format("apps/%s/workflows/%s/runs/%s/token", programId.getApplication(), programId.getProgram(), str), scope, str2), "v3", programId.getNamespace())), new TypeToken<WorkflowTokenDetail>() { // from class: co.cask.cdap.scheduler.CoreSchedulerServiceTest.1
        }.getType(), GSON);
    }

    private String appendScopeAndKeyToUrl(String str, @Nullable WorkflowToken.Scope scope, String str2) {
        StringBuilder sb = new StringBuilder(str);
        if (scope != null) {
            sb.append(String.format("?scope=%s", scope.name()));
            if (str2 != null) {
                sb.append(String.format("&key=%s", str2));
            }
        } else if (str2 != null) {
            sb.append(String.format("?key=%s", str2));
        }
        return sb.toString();
    }

    private void testScheduleUpdate(String str) throws Exception {
        int runs = getRuns(WORKFLOW_2, ProgramRunStatus.ALL);
        ScheduleId schedule = APP_ID.schedule(AppWithFrequentScheduledWorkflows.DATASET_PARTITION_SCHEDULE_2);
        long currentTimeMillis = System.currentTimeMillis();
        publishNotification(dataEventTopic, NamespaceId.DEFAULT, AppWithFrequentScheduledWorkflows.DATASET_NAME2);
        waitUntilProcessed(dataEventTopic, currentTimeMillis);
        Assert.assertTrue("Expected a PENDING_TRIGGER job for " + schedule, Iterables.any(getAllJobs(), job -> {
            return (job.getSchedule().getTrigger() instanceof ProtoTrigger.PartitionTrigger) && schedule.equals(job.getJobKey().getScheduleId()) && job.getState() == Job.State.PENDING_TRIGGER;
        }));
        Assert.assertEquals(runs, getRuns(WORKFLOW_2, ProgramRunStatus.ALL));
        if ("disable".equals(str)) {
            disableSchedule(AppWithFrequentScheduledWorkflows.DATASET_PARTITION_SCHEDULE_2);
            enableSchedule(AppWithFrequentScheduledWorkflows.DATASET_PARTITION_SCHEDULE_2);
        } else {
            ProgramSchedule schedule2 = scheduler.getSchedule(schedule);
            ProgramSchedule programSchedule = new ProgramSchedule(schedule2.getName(), schedule2.getDescription(), schedule2.getProgramId(), ImmutableMap.builder().putAll(schedule2.getProperties()).put(str, str).build(), schedule2.getTrigger(), schedule2.getConstraints());
            if ("update".equals(str)) {
                scheduler.updateSchedule(programSchedule);
                Assert.assertEquals(ProgramScheduleStatus.SCHEDULED, scheduler.getScheduleStatus(schedule));
            } else if ("delete".equals(str)) {
                scheduler.deleteSchedule(schedule);
                scheduler.addSchedule(programSchedule);
                enableSchedule(schedule.getSchedule());
            } else {
                Assert.fail("invalid howToUpdate: " + str);
            }
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        publishNotification(dataEventTopic, NamespaceId.DEFAULT, AppWithFrequentScheduledWorkflows.DATASET_NAME2);
        waitUntilProcessed(dataEventTopic, currentTimeMillis2);
        Assert.assertTrue("Expected a PENDING_TRIGGER job for " + schedule, Iterables.any(getAllJobs(), job2 -> {
            return (job2.getSchedule().getTrigger() instanceof ProtoTrigger.PartitionTrigger) && schedule.equals(job2.getJobKey().getScheduleId()) && job2.getState() == Job.State.PENDING_TRIGGER;
        }));
        Assert.assertEquals(runs, getRuns(WORKFLOW_2, ProgramRunStatus.ALL));
        publishNotification(dataEventTopic, NamespaceId.DEFAULT, AppWithFrequentScheduledWorkflows.DATASET_NAME2);
        waitForCompleteRuns(runs + 1, WORKFLOW_2);
    }

    private void enableSchedule(String str) throws NotFoundException, ConflictException {
        ScheduleId schedule = APP_ID.schedule(str);
        scheduler.enableSchedule(schedule);
        Assert.assertEquals(ProgramScheduleStatus.SCHEDULED, scheduler.getScheduleStatus(schedule));
    }

    private void disableSchedule(String str) throws NotFoundException, ConflictException {
        ScheduleId schedule = APP_ID.schedule(str);
        scheduler.disableSchedule(schedule);
        Assert.assertEquals(ProgramScheduleStatus.SUSPENDED, scheduler.getScheduleStatus(schedule));
    }

    private void testNewPartition(int i) throws Exception {
        publishNotification(dataEventTopic, NamespaceId.DEFAULT, AppWithFrequentScheduledWorkflows.DATASET_NAME1);
        publishNotification(dataEventTopic, NamespaceId.DEFAULT, AppWithFrequentScheduledWorkflows.DATASET_NAME2);
        publishNotification(dataEventTopic, NamespaceId.DEFAULT, AppWithFrequentScheduledWorkflows.DATASET_NAME2);
        waitForCompleteRuns(i, WORKFLOW_1);
        waitForCompleteRuns(i, WORKFLOW_2);
    }

    private void waitForCompleteRuns(int i, ProgramId programId) throws Exception {
        Tasks.waitFor(Integer.valueOf(i), () -> {
            return Integer.valueOf(getRuns(programId, ProgramRunStatus.COMPLETED));
        }, 30L, TimeUnit.SECONDS);
    }

    private int getRuns(ProgramId programId, ProgramRunStatus programRunStatus) {
        return store.getRuns(programId, programRunStatus, 0L, Long.MAX_VALUE, Integer.MAX_VALUE).size();
    }

    private void publishNotification(TopicId topicId, NamespaceId namespaceId, String str) throws Exception {
        messagingService.publish(StoreRequestBuilder.of(topicId).addPayloads(new String[]{GSON.toJson(Notification.forPartitions(namespaceId.dataset(str), ImmutableList.of(PartitionKey.builder().addIntField("part1", 1).build())))}).build());
    }

    @Nullable
    private MessageId getLastMessageId(TopicId topicId) {
        return (MessageId) Transactionals.execute(transactional, datasetContext -> {
            String retrieveSubscriberState = datasetContext.getDataset(Schedulers.JOB_QUEUE_DATASET_ID.getNamespace(), Schedulers.JOB_QUEUE_DATASET_ID.getDataset()).retrieveSubscriberState(topicId.getTopic());
            if (retrieveSubscriberState == null) {
                return null;
            }
            return new MessageId(Bytes.fromHexString(retrieveSubscriberState));
        });
    }

    private void waitUntilProcessed(TopicId topicId, long j) throws Exception {
        Tasks.waitFor(true, () -> {
            MessageId lastMessageId = getLastMessageId(topicId);
            return Boolean.valueOf(lastMessageId != null && lastMessageId.getPublishTimestamp() >= j);
        }, 5L, TimeUnit.SECONDS);
    }

    private List<Job> getAllJobs() {
        return (List) Transactionals.execute(transactional, datasetContext -> {
            CloseableIterator fullScan = datasetContext.getDataset(Schedulers.JOB_QUEUE_DATASET_ID.getNamespace(), Schedulers.JOB_QUEUE_DATASET_ID.getDataset()).fullScan();
            Throwable th = null;
            try {
                ArrayList newArrayList = Lists.newArrayList(fullScan);
                if (fullScan != null) {
                    if (0 != 0) {
                        try {
                            fullScan.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fullScan.close();
                    }
                }
                return newArrayList;
            } catch (Throwable th3) {
                if (fullScan != null) {
                    if (0 != 0) {
                        try {
                            fullScan.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        fullScan.close();
                    }
                }
                throw th3;
            }
        });
    }
}
