package co.cask.cdap.scheduler;

import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.Transactionals;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.AlreadyExistsException;
import co.cask.cdap.common.BadRequestException;
import co.cask.cdap.common.ConflictException;
import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.common.ProfileConflictException;
import co.cask.cdap.common.ServiceUnavailableException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.service.RetryOnStartFailureService;
import co.cask.cdap.common.transaction.MultiThreadTransactionAware;
import co.cask.cdap.data.dataset.SystemDatasetInstantiator;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.MultiThreadDatasetCache;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.SystemArguments;
import co.cask.cdap.internal.app.runtime.schedule.ProgramSchedule;
import co.cask.cdap.internal.app.runtime.schedule.ProgramScheduleRecord;
import co.cask.cdap.internal.app.runtime.schedule.ProgramScheduleStatus;
import co.cask.cdap.internal.app.runtime.schedule.SchedulerException;
import co.cask.cdap.internal.app.runtime.schedule.TimeSchedulerService;
import co.cask.cdap.internal.app.runtime.schedule.queue.Job;
import co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDataset;
import co.cask.cdap.internal.app.runtime.schedule.store.ProgramScheduleStoreDataset;
import co.cask.cdap.internal.app.runtime.schedule.store.Schedulers;
import co.cask.cdap.internal.app.store.profile.ProfileDataset;
import co.cask.cdap.internal.profile.AdminEventPublisher;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.context.MultiThreadMessagingContext;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.ApplicationId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProfileId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ScheduleId;
import co.cask.cdap.runtime.spi.profile.ProfileStatus;
import co.cask.cdap.security.impersonation.Impersonator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.gson.Gson;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.tephra.RetryStrategies;
import org.apache.tephra.TransactionFailureException;
import org.apache.tephra.TransactionSystemClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/scheduler/CoreSchedulerService.class */
public class CoreSchedulerService extends AbstractIdleService implements Scheduler {
    private static final Logger LOG = LoggerFactory.getLogger(CoreSchedulerService.class);
    private static final Gson GSON = new Gson();
    private final CountDownLatch startedLatch = new CountDownLatch(1);
    private final Transactional transactional;
    private final Service internalService;
    private final DatasetFramework datasetFramework;
    private final TimeSchedulerService timeSchedulerService;
    private final AdminEventPublisher adminEventPublisher;
    private final Store appMetaStore;
    private final Impersonator impersonator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/scheduler/CoreSchedulerService$StoreAndProfileTxRunnable.class */
    public interface StoreAndProfileTxRunnable<V, T extends Throwable> {
        V run(ProgramScheduleStoreDataset programScheduleStoreDataset, ProfileDataset profileDataset) throws Throwable;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/scheduler/CoreSchedulerService$StoreAndQueueTxRunnable.class */
    public interface StoreAndQueueTxRunnable<V, T extends Throwable> {
        V run(ProgramScheduleStoreDataset programScheduleStoreDataset, JobQueueDataset jobQueueDataset) throws Throwable;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/scheduler/CoreSchedulerService$StoreQueueAndProfileTxRunnable.class */
    public interface StoreQueueAndProfileTxRunnable<V, T extends Throwable> {
        V run(ProgramScheduleStoreDataset programScheduleStoreDataset, JobQueueDataset jobQueueDataset, ProfileDataset profileDataset) throws Throwable;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/scheduler/CoreSchedulerService$StoreTxRunnable.class */
    public interface StoreTxRunnable<V, T extends Throwable> {
        V run(ProgramScheduleStoreDataset programScheduleStoreDataset) throws Throwable;
    }

    @Inject
    CoreSchedulerService(TransactionSystemClient transactionSystemClient, DatasetFramework datasetFramework, TimeSchedulerService timeSchedulerService, ScheduleNotificationSubscriberService scheduleNotificationSubscriberService, ConstraintCheckerService constraintCheckerService, MessagingService messagingService, CConfiguration cConfiguration, Store store, Impersonator impersonator) {
        this.datasetFramework = datasetFramework;
        MultiThreadTransactionAware multiThreadMessagingContext = new MultiThreadMessagingContext(messagingService);
        this.transactional = Transactions.createTransactionalWithRetry(Transactions.createTransactional(new MultiThreadDatasetCache(new SystemDatasetInstantiator(datasetFramework), transactionSystemClient, Schedulers.STORE_DATASET_ID.getParent(), Collections.emptyMap(), (MetricsContext) null, (Map) null, new MultiThreadTransactionAware[]{multiThreadMessagingContext})), RetryStrategies.retryOnConflict(10, 100L));
        this.timeSchedulerService = timeSchedulerService;
        this.appMetaStore = store;
        this.impersonator = impersonator;
        this.internalService = new RetryOnStartFailureService(() -> {
            return new AbstractIdleService() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.1
                protected void startUp() throws Exception {
                    if (!datasetFramework.hasInstance(Schedulers.STORE_DATASET_ID)) {
                        datasetFramework.addInstance(Schedulers.STORE_TYPE_NAME, Schedulers.STORE_DATASET_ID, DatasetProperties.EMPTY);
                    }
                    timeSchedulerService.startAndWait();
                    CoreSchedulerService.this.cleanupJobs();
                    constraintCheckerService.startAndWait();
                    scheduleNotificationSubscriberService.startAndWait();
                    CoreSchedulerService.this.startedLatch.countDown();
                    CoreSchedulerService.LOG.info("Started core scheduler service.");
                }

                protected void shutDown() throws Exception {
                    scheduleNotificationSubscriberService.stopAndWait();
                    constraintCheckerService.stopAndWait();
                    timeSchedulerService.stopAndWait();
                    CoreSchedulerService.LOG.info("Stopped core scheduler service.");
                }
            };
        }, co.cask.cdap.common.service.RetryStrategies.exponentialDelay(200L, 5000L, TimeUnit.MILLISECONDS));
        this.adminEventPublisher = new AdminEventPublisher(cConfiguration, multiThreadMessagingContext);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanupJobs() {
        try {
            this.transactional.execute(datasetContext -> {
                JobQueueDataset jobQueue = Schedulers.getJobQueue(datasetContext, this.datasetFramework);
                CloseableIterator<Job> fullScan = jobQueue.fullScan();
                Throwable th = null;
                try {
                    try {
                        LOG.info("Cleaning up jobs in state {}.", Job.State.PENDING_LAUNCH);
                        while (fullScan.hasNext()) {
                            Job job = (Job) fullScan.next();
                            if (job.getState() == Job.State.PENDING_LAUNCH) {
                                LOG.warn("Removing job because it was left in state {} from a previous run of the scheduler: {} .", Job.State.PENDING_LAUNCH, job);
                                jobQueue.deleteJob(job);
                            }
                        }
                        if (fullScan != null) {
                            if (0 == 0) {
                                fullScan.close();
                                return;
                            }
                            try {
                                fullScan.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (fullScan != null) {
                        if (th != null) {
                            try {
                                fullScan.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            fullScan.close();
                        }
                    }
                    throw th4;
                }
            });
        } catch (TransactionFailureException e) {
            LOG.warn("Failed to cleanup jobs upon startup.", e);
        }
    }

    @VisibleForTesting
    public void waitUntilFunctional(long j, TimeUnit timeUnit) throws TimeoutException, InterruptedException {
        if (!isRunning()) {
            throw new IllegalStateException(getClass().getSimpleName() + " is not running. Cannot wait for it to be functional.");
        }
        if (!this.startedLatch.await(j, timeUnit)) {
            throw new TimeoutException(getClass().getSimpleName() + " is not completely started after " + j + " " + timeUnit);
        }
    }

    private void checkStarted() {
        if (!Uninterruptibles.awaitUninterruptibly(this.startedLatch, 0L, TimeUnit.SECONDS)) {
            throw new ServiceUnavailableException("Core scheduler");
        }
    }

    protected void startUp() throws Exception {
        this.internalService.startAndWait();
    }

    protected void shutDown() throws Exception {
        this.internalService.stopAndWait();
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public void addSchedule(ProgramSchedule programSchedule) throws ProfileConflictException, BadRequestException, NotFoundException, AlreadyExistsException {
        addSchedules(Collections.singleton(programSchedule));
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public void addSchedules(Iterable<? extends ProgramSchedule> iterable) throws ProfileConflictException, BadRequestException, NotFoundException, AlreadyExistsException {
        checkStarted();
        for (ProgramSchedule programSchedule : iterable) {
            if (!programSchedule.getProgramId().getType().equals(ProgramType.WORKFLOW)) {
                throw new BadRequestException(String.format("Cannot schedule program %s of type %s: Only workflows can be scheduled", programSchedule.getProgramId().getProgram(), programSchedule.getProgramId().getType()));
            }
        }
        try {
            execute((programScheduleStoreDataset, profileDataset) -> {
                programScheduleStoreDataset.addSchedules(iterable);
                Iterator it = iterable.iterator();
                while (it.hasNext()) {
                    ProgramSchedule programSchedule2 = (ProgramSchedule) it.next();
                    if (programSchedule2.getProperties() != null) {
                        Optional<ProfileId> profileIdFromArgs = SystemArguments.getProfileIdFromArgs(programSchedule2.getProgramId().getNamespaceId(), programSchedule2.getProperties());
                        if (profileIdFromArgs.isPresent()) {
                            ProfileId profileId = profileIdFromArgs.get();
                            if (profileDataset.getProfile(profileId).getStatus() == ProfileStatus.DISABLED) {
                                throw new ProfileConflictException(String.format("Profile %s in namespace %s is disabled. It cannot be assigned to schedule %s", profileId.getProfile(), profileId.getNamespace(), programSchedule2.getName()), profileId);
                            }
                        }
                    }
                    try {
                        this.timeSchedulerService.addProgramSchedule(programSchedule2);
                    } catch (SchedulerException e) {
                        LOG.error("Exception occurs when adding schedule {}", programSchedule2, e);
                        throw new RuntimeException(e);
                    }
                }
                Iterator it2 = iterable.iterator();
                while (it2.hasNext()) {
                    ProgramSchedule programSchedule3 = (ProgramSchedule) it2.next();
                    ScheduleId scheduleId = programSchedule3.getScheduleId();
                    this.adminEventPublisher.publishScheduleCreation(scheduleId);
                    Optional<ProfileId> profileIdFromArgs2 = SystemArguments.getProfileIdFromArgs(scheduleId.getNamespaceId(), programSchedule3.getProperties());
                    if (profileIdFromArgs2.isPresent()) {
                        profileDataset.addProfileAssignment(profileIdFromArgs2.get(), scheduleId);
                    }
                }
                return null;
            }, Exception.class);
        } catch (Exception e) {
            Throwables.propagate(e);
        } catch (NotFoundException | ProfileConflictException | AlreadyExistsException e2) {
            throw e2;
        }
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public void updateSchedule(ProgramSchedule programSchedule) throws NotFoundException, BadRequestException, ProfileConflictException {
        checkStarted();
        ProgramScheduleStatus scheduleStatus = getScheduleStatus(programSchedule.getScheduleId());
        deleteSchedule(programSchedule.getScheduleId());
        try {
            addSchedule(programSchedule);
            if (ProgramScheduleStatus.SCHEDULED == scheduleStatus) {
                try {
                    enableSchedule(programSchedule.getScheduleId());
                } catch (ConflictException e) {
                    throw new IllegalStateException("Schedule '" + programSchedule.getScheduleId() + "' already enabled despite just being added.");
                }
            }
        } catch (AlreadyExistsException e2) {
            throw new IllegalStateException("Schedule '" + programSchedule.getScheduleId() + "' already exists despite just being deleted.");
        }
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public void enableSchedule(ScheduleId scheduleId) throws NotFoundException, ConflictException {
        checkStarted();
        try {
            execute(programScheduleStoreDataset -> {
                ProgramScheduleRecord scheduleRecord = programScheduleStoreDataset.getScheduleRecord(scheduleId);
                if (ProgramScheduleStatus.SUSPENDED != scheduleRecord.getMeta().getStatus()) {
                    throw new ConflictException("Schedule '" + scheduleId + "' is already enabled");
                }
                this.timeSchedulerService.resumeProgramSchedule(scheduleRecord.getSchedule());
                programScheduleStoreDataset.updateScheduleStatus(scheduleId, ProgramScheduleStatus.SCHEDULED);
                return null;
            }, Exception.class);
        } catch (NotFoundException | ConflictException e) {
            throw e;
        } catch (SchedulerException e2) {
            throw new RuntimeException("Exception occurs when enabling schedule " + scheduleId, e2);
        } catch (Exception e3) {
            throw Throwables.propagate(e3);
        }
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public void disableSchedule(ScheduleId scheduleId) throws NotFoundException, ConflictException {
        checkStarted();
        try {
            execute((programScheduleStoreDataset, jobQueueDataset) -> {
                ProgramScheduleRecord scheduleRecord = programScheduleStoreDataset.getScheduleRecord(scheduleId);
                if (ProgramScheduleStatus.SCHEDULED != scheduleRecord.getMeta().getStatus()) {
                    throw new ConflictException("Schedule '" + scheduleId + "' is already disabled");
                }
                this.timeSchedulerService.suspendProgramSchedule(scheduleRecord.getSchedule());
                programScheduleStoreDataset.updateScheduleStatus(scheduleId, ProgramScheduleStatus.SUSPENDED);
                jobQueueDataset.markJobsForDeletion(scheduleId, System.currentTimeMillis());
                return null;
            }, Exception.class);
        } catch (NotFoundException | ConflictException e) {
            throw e;
        } catch (SchedulerException e2) {
            throw new RuntimeException("Exception occurs when enabling schedule " + scheduleId, e2);
        } catch (Exception e3) {
            throw Throwables.propagate(e3);
        }
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public void deleteSchedule(ScheduleId scheduleId) throws NotFoundException {
        deleteSchedules(Collections.singleton(scheduleId));
    }

    private void deleteSchedulesInScheduler(List<ProgramSchedule> list) {
        for (ProgramSchedule programSchedule : list) {
            try {
                this.timeSchedulerService.deleteProgramSchedule(programSchedule);
            } catch (Exception e) {
                LOG.error("Exception occurs when deleting schedule {}", programSchedule, e);
                throw new RuntimeException(e);
            }
        }
    }

    private void deleteScheduleInScheduler(ProgramSchedule programSchedule) throws NotFoundException {
        try {
            this.timeSchedulerService.deleteProgramSchedule(programSchedule);
        } catch (SchedulerException e) {
            LOG.error("Exception occurs when deleting schedule {}", programSchedule, e);
            throw new RuntimeException(e);
        }
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public void deleteSchedules(Iterable<? extends ScheduleId> iterable) throws NotFoundException {
        checkStarted();
        execute((programScheduleStoreDataset, jobQueueDataset, profileDataset) -> {
            long currentTimeMillis = System.currentTimeMillis();
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                ScheduleId scheduleId = (ScheduleId) it.next();
                ProgramSchedule schedule = programScheduleStoreDataset.getSchedule(scheduleId);
                deleteScheduleInScheduler(schedule);
                jobQueueDataset.markJobsForDeletion(scheduleId, currentTimeMillis);
                this.adminEventPublisher.publishScheduleDeletion(scheduleId, schedule);
                Optional<ProfileId> profileIdFromArgs = SystemArguments.getProfileIdFromArgs(scheduleId.getNamespaceId(), schedule.getProperties());
                if (profileIdFromArgs.isPresent()) {
                    try {
                        profileDataset.removeProfileAssignment(profileIdFromArgs.get(), scheduleId);
                    } catch (NotFoundException e) {
                        LOG.warn("Unable to find the profile {} when deleting schedule {}, skipping assignment deletion.", profileIdFromArgs.get(), scheduleId);
                    }
                }
            }
            programScheduleStoreDataset.deleteSchedules((Iterable<? extends ScheduleId>) iterable);
            return null;
        }, NotFoundException.class);
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public void deleteSchedules(ApplicationId applicationId) {
        checkStarted();
        execute((programScheduleStoreDataset, jobQueueDataset, profileDataset) -> {
            long currentTimeMillis = System.currentTimeMillis();
            List<ProgramSchedule> listSchedules = programScheduleStoreDataset.listSchedules(applicationId);
            deleteSchedulesInScheduler(listSchedules);
            Iterator<ScheduleId> it = programScheduleStoreDataset.deleteSchedules(applicationId).iterator();
            while (it.hasNext()) {
                jobQueueDataset.markJobsForDeletion(it.next(), currentTimeMillis);
            }
            for (ProgramSchedule programSchedule : listSchedules) {
                ScheduleId scheduleId = programSchedule.getScheduleId();
                this.adminEventPublisher.publishScheduleDeletion(scheduleId, programSchedule);
                Optional<ProfileId> profileIdFromArgs = SystemArguments.getProfileIdFromArgs(scheduleId.getNamespaceId(), programSchedule.getProperties());
                if (profileIdFromArgs.isPresent()) {
                    try {
                        profileDataset.removeProfileAssignment(profileIdFromArgs.get(), scheduleId);
                    } catch (NotFoundException e) {
                        LOG.warn("Unable to find the profile {} when deleting schedule {}, skipping assignment deletion.", profileIdFromArgs.get(), scheduleId);
                    }
                }
            }
            return null;
        }, RuntimeException.class);
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public void deleteSchedules(ProgramId programId) {
        checkStarted();
        execute((programScheduleStoreDataset, jobQueueDataset, profileDataset) -> {
            long currentTimeMillis = System.currentTimeMillis();
            List<ProgramSchedule> listSchedules = programScheduleStoreDataset.listSchedules(programId);
            deleteSchedulesInScheduler(listSchedules);
            Iterator<ScheduleId> it = programScheduleStoreDataset.deleteSchedules(programId).iterator();
            while (it.hasNext()) {
                jobQueueDataset.markJobsForDeletion(it.next(), currentTimeMillis);
            }
            for (ProgramSchedule programSchedule : listSchedules) {
                ScheduleId scheduleId = programSchedule.getScheduleId();
                this.adminEventPublisher.publishScheduleDeletion(scheduleId, programSchedule);
                Optional<ProfileId> profileIdFromArgs = SystemArguments.getProfileIdFromArgs(scheduleId.getNamespaceId(), programSchedule.getProperties());
                if (profileIdFromArgs.isPresent()) {
                    try {
                        profileDataset.removeProfileAssignment(profileIdFromArgs.get(), scheduleId);
                    } catch (NotFoundException e) {
                        LOG.warn("Unable to find the profile {} when deleting schedule {}, skipping assignment deletion.", profileIdFromArgs.get(), scheduleId);
                    }
                }
            }
            return null;
        }, RuntimeException.class);
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public void modifySchedulesTriggeredByDeletedProgram(final ProgramId programId) {
        checkStarted();
        execute(new StoreAndQueueTxRunnable<Void, RuntimeException>() { // from class: co.cask.cdap.scheduler.CoreSchedulerService.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // co.cask.cdap.scheduler.CoreSchedulerService.StoreAndQueueTxRunnable
            public Void run(ProgramScheduleStoreDataset programScheduleStoreDataset, JobQueueDataset jobQueueDataset) {
                programScheduleStoreDataset.modifySchedulesTriggeredByDeletedProgram(programId);
                return null;
            }
        }, RuntimeException.class);
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public ProgramSchedule getSchedule(ScheduleId scheduleId) throws NotFoundException {
        checkStarted();
        return (ProgramSchedule) execute(programScheduleStoreDataset -> {
            return programScheduleStoreDataset.getSchedule(scheduleId);
        }, NotFoundException.class);
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public ProgramScheduleStatus getScheduleStatus(ScheduleId scheduleId) throws NotFoundException {
        checkStarted();
        return (ProgramScheduleStatus) execute(programScheduleStoreDataset -> {
            return programScheduleStoreDataset.getScheduleRecord(scheduleId).getMeta().getStatus();
        }, NotFoundException.class);
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public List<ProgramSchedule> listSchedules(ApplicationId applicationId) {
        checkStarted();
        return (List) execute(programScheduleStoreDataset -> {
            return programScheduleStoreDataset.listSchedules(applicationId);
        }, RuntimeException.class);
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public List<ProgramSchedule> listSchedules(ProgramId programId) {
        checkStarted();
        return (List) execute(programScheduleStoreDataset -> {
            return programScheduleStoreDataset.listSchedules(programId);
        }, RuntimeException.class);
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public List<ProgramSchedule> listSchedules(NamespaceId namespaceId, Predicate<ProgramSchedule> predicate) {
        checkStarted();
        return (List) execute(programScheduleStoreDataset -> {
            return (List) programScheduleStoreDataset.listSchedules(namespaceId, predicate).stream().map(this::getProgramScheduleWithUserAndArtifactId).collect(Collectors.toList());
        }, RuntimeException.class);
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public List<ProgramScheduleRecord> listScheduleRecords(ApplicationId applicationId) {
        checkStarted();
        return (List) execute(programScheduleStoreDataset -> {
            return programScheduleStoreDataset.listScheduleRecords(applicationId);
        }, RuntimeException.class);
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public List<ProgramScheduleRecord> listScheduleRecords(ProgramId programId) {
        checkStarted();
        return (List) execute(programScheduleStoreDataset -> {
            return programScheduleStoreDataset.listScheduleRecords(programId);
        }, RuntimeException.class);
    }

    @Override // co.cask.cdap.scheduler.Scheduler
    public Collection<ProgramScheduleRecord> findSchedules(String str) {
        checkStarted();
        return (Collection) execute(programScheduleStoreDataset -> {
            return programScheduleStoreDataset.findSchedules(str);
        }, RuntimeException.class);
    }

    private ProgramSchedule getProgramScheduleWithUserAndArtifactId(ProgramSchedule programSchedule) {
        HashMap hashMap = new HashMap();
        try {
            hashMap.put(ProgramOptionConstants.ARTIFACT_ID, GSON.toJson(this.appMetaStore.loadProgram(programSchedule.getProgramId()).getArtifactId().toApiArtifactId()));
            try {
                hashMap.put(ProgramOptionConstants.USER_ID, this.impersonator.getUGI(programSchedule.getProgramId()).getUserName());
                HashMap hashMap2 = new HashMap(programSchedule.getProperties());
                hashMap2.putAll(hashMap);
                return new ProgramSchedule(programSchedule.getName(), programSchedule.getDescription(), programSchedule.getProgramId(), hashMap2, programSchedule.getTrigger(), programSchedule.getConstraints(), programSchedule.getTimeoutMillis());
            } catch (IOException e) {
                LOG.error("Exception occurs when looking up user group information for program {} in schedule {}", new Object[]{programSchedule.getProgramId(), programSchedule, e});
                throw new RuntimeException(String.format("Exception occurs when looking up user group information for program %s in schedule %s", programSchedule.getProgramId(), programSchedule), e);
            }
        } catch (Exception e2) {
            LOG.error("Exception occurs when looking up program descriptor for program {} in schedule {}", new Object[]{programSchedule.getProgramId(), programSchedule, e2});
            throw new RuntimeException(String.format("Exception occurs when looking up program descriptor for program %s in schedule %s", programSchedule.getProgramId(), programSchedule), e2);
        }
    }

    private <V, T extends Exception> V execute(StoreTxRunnable<V, T> storeTxRunnable, Class<? extends T> cls) throws Exception {
        return (V) Transactionals.execute(this.transactional, datasetContext -> {
            return storeTxRunnable.run(Schedulers.getScheduleStore(datasetContext, this.datasetFramework));
        }, cls);
    }

    private <V, T extends Exception> V execute(StoreAndQueueTxRunnable<V, T> storeAndQueueTxRunnable, Class<? extends T> cls) throws Exception {
        return (V) Transactionals.execute(this.transactional, datasetContext -> {
            return storeAndQueueTxRunnable.run(Schedulers.getScheduleStore(datasetContext, this.datasetFramework), Schedulers.getJobQueue(datasetContext, this.datasetFramework));
        }, cls);
    }

    private <V, T extends Exception> V execute(StoreAndProfileTxRunnable<V, T> storeAndProfileTxRunnable, Class<? extends T> cls) throws Exception {
        return (V) Transactionals.execute(this.transactional, datasetContext -> {
            return storeAndProfileTxRunnable.run(Schedulers.getScheduleStore(datasetContext, this.datasetFramework), ProfileDataset.get(datasetContext, this.datasetFramework));
        }, cls);
    }

    private <V, T extends Exception> V execute(StoreQueueAndProfileTxRunnable<V, T> storeQueueAndProfileTxRunnable, Class<? extends T> cls) throws Exception {
        return (V) Transactionals.execute(this.transactional, datasetContext -> {
            return storeQueueAndProfileTxRunnable.run(Schedulers.getScheduleStore(datasetContext, this.datasetFramework), Schedulers.getJobQueue(datasetContext, this.datasetFramework), ProfileDataset.get(datasetContext, this.datasetFramework));
        }, cls);
    }
}
