package co.cask.cdap.scheduler;

import co.cask.cdap.api.ProgramStatus;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.utils.ImmutablePair;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.schedule.ProgramScheduleRecord;
import co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDataset;
import co.cask.cdap.internal.app.runtime.schedule.store.ProgramScheduleStoreDataset;
import co.cask.cdap.internal.app.runtime.schedule.store.Schedulers;
import co.cask.cdap.internal.app.services.AbstractNotificationSubscriberService;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.proto.Notification;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.id.ScheduleId;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Service;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.google.inject.Inject;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.tephra.TransactionSystemClient;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/scheduler/ScheduleNotificationSubscriberService.class */
public class ScheduleNotificationSubscriberService extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(ScheduleNotificationSubscriberService.class);
    private static final Gson GSON = new Gson();
    private final CConfiguration cConf;
    private final MessagingService messagingService;
    private final DatasetFramework datasetFramework;
    private final TransactionSystemClient txClient;
    private final MetricsCollectionService metricsCollectionService;
    private final List<Service> subscriberServices = Arrays.asList(new SchedulerEventSubscriberService(), new DataEventSubscriberService(), new ProgramStatusEventSubscriberService());
    private ScheduledExecutorService subscriberExecutor;

    /* loaded from: input_file:co/cask/cdap/scheduler/ScheduleNotificationSubscriberService$AbstractSchedulerSubscriberService.class */
    private abstract class AbstractSchedulerSubscriberService extends AbstractNotificationSubscriberService {
        AbstractSchedulerSubscriberService(String str, String str2, int i, boolean z) {
            super(str, ScheduleNotificationSubscriberService.this.cConf, str2, z, i, ScheduleNotificationSubscriberService.this.cConf.getLong("scheduler.event.poll.delay.millis"), ScheduleNotificationSubscriberService.this.messagingService, ScheduleNotificationSubscriberService.this.datasetFramework, ScheduleNotificationSubscriberService.this.txClient, ScheduleNotificationSubscriberService.this.metricsCollectionService);
        }

        @Nullable
        protected String loadMessageId(DatasetContext datasetContext) throws Exception {
            return getJobQueue(datasetContext).retrieveSubscriberState(getTopicId().getTopic());
        }

        protected void storeMessageId(DatasetContext datasetContext, String str) throws Exception {
            getJobQueue(datasetContext).persistSubscriberState(getTopicId().getTopic(), str);
        }

        protected void processMessages(DatasetContext datasetContext, Iterator<ImmutablePair<String, Notification>> it) throws Exception {
            ProgramScheduleStoreDataset scheduleStore = getScheduleStore(datasetContext);
            JobQueueDataset jobQueue = getJobQueue(datasetContext);
            while (it.hasNext()) {
                processNotification(scheduleStore, jobQueue, (Notification) it.next().getSecond());
            }
        }

        protected ScheduledExecutorService executor() {
            return ScheduleNotificationSubscriberService.this.subscriberExecutor;
        }

        protected abstract void processNotification(ProgramScheduleStoreDataset programScheduleStoreDataset, JobQueueDataset jobQueueDataset, Notification notification);

        private JobQueueDataset getJobQueue(DatasetContext datasetContext) {
            return Schedulers.getJobQueue(datasetContext, ScheduleNotificationSubscriberService.this.datasetFramework);
        }

        private ProgramScheduleStoreDataset getScheduleStore(DatasetContext datasetContext) {
            return Schedulers.getScheduleStore(datasetContext, ScheduleNotificationSubscriberService.this.datasetFramework);
        }
    }

    /* loaded from: input_file:co/cask/cdap/scheduler/ScheduleNotificationSubscriberService$DataEventSubscriberService.class */
    private final class DataEventSubscriberService extends AbstractSchedulerSubscriberService {
        DataEventSubscriberService() {
            super("scheduler.data.event", ScheduleNotificationSubscriberService.this.cConf.get("data.event.topic"), ScheduleNotificationSubscriberService.this.cConf.getInt("scheduler.data.event.fetch.size"), true);
        }

        @Override // co.cask.cdap.scheduler.ScheduleNotificationSubscriberService.AbstractSchedulerSubscriberService
        protected void processNotification(ProgramScheduleStoreDataset programScheduleStoreDataset, JobQueueDataset jobQueueDataset, Notification notification) {
            String str = (String) notification.getProperties().get("datasetId");
            if (str == null) {
                return;
            }
            Iterator<ProgramScheduleRecord> it = programScheduleStoreDataset.findSchedules(Schedulers.triggerKeyForPartition(DatasetId.fromString(str))).iterator();
            while (it.hasNext()) {
                jobQueueDataset.addNotification(it.next(), notification);
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/scheduler/ScheduleNotificationSubscriberService$ProgramStatusEventSubscriberService.class */
    private final class ProgramStatusEventSubscriberService extends AbstractSchedulerSubscriberService {
        ProgramStatusEventSubscriberService() {
            super("scheduler.program.event", ScheduleNotificationSubscriberService.this.cConf.get("program.status.record.event.topic"), ScheduleNotificationSubscriberService.this.cConf.getInt("scheduler.program.status.event.fetch.size"), true);
        }

        @Override // co.cask.cdap.scheduler.ScheduleNotificationSubscriberService.AbstractSchedulerSubscriberService
        protected void processNotification(ProgramScheduleStoreDataset programScheduleStoreDataset, JobQueueDataset jobQueueDataset, Notification notification) {
            String str = (String) notification.getProperties().get(ProgramOptionConstants.PROGRAM_RUN_ID);
            try {
                ProgramStatus programStatus = ProgramRunStatus.toProgramStatus(ProgramRunStatus.valueOf((String) notification.getProperties().get(ProgramOptionConstants.PROGRAM_STATUS)));
                if (str == null || programStatus == null) {
                    return;
                }
                Iterator<ProgramScheduleRecord> it = programScheduleStoreDataset.findSchedules(Schedulers.triggerKeyForProgramStatus(((ProgramRunId) ScheduleNotificationSubscriberService.GSON.fromJson(str, ProgramRunId.class)).getParent(), programStatus)).iterator();
                while (it.hasNext()) {
                    jobQueueDataset.addNotification(it.next(), notification);
                }
            } catch (IllegalArgumentException e) {
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/scheduler/ScheduleNotificationSubscriberService$SchedulerEventSubscriberService.class */
    private final class SchedulerEventSubscriberService extends AbstractSchedulerSubscriberService {
        SchedulerEventSubscriberService() {
            super("scheduler.event", ScheduleNotificationSubscriberService.this.cConf.get("time.event.topic"), ScheduleNotificationSubscriberService.this.cConf.getInt("scheduler.time.event.fetch.size"), false);
        }

        @Override // co.cask.cdap.scheduler.ScheduleNotificationSubscriberService.AbstractSchedulerSubscriberService
        protected void processNotification(ProgramScheduleStoreDataset programScheduleStoreDataset, JobQueueDataset jobQueueDataset, Notification notification) {
            ScheduleId fromString;
            String str = (String) notification.getProperties().get(ProgramOptionConstants.SCHEDULE_ID);
            if (str == null) {
                ScheduleNotificationSubscriberService.LOG.warn("Ignore notification that misses schedule id, {}", notification);
                return;
            }
            try {
                fromString = (ScheduleId) ScheduleNotificationSubscriberService.GSON.fromJson(str, ScheduleId.class);
            } catch (JsonSyntaxException e) {
                fromString = ScheduleId.fromString(str);
            }
            try {
                jobQueueDataset.addNotification(programScheduleStoreDataset.getScheduleRecord(fromString), notification);
            } catch (NotFoundException e2) {
                ScheduleNotificationSubscriberService.LOG.warn("Ignore notification that doesn't have a schedule {} associated with, {}", fromString, notification);
            }
        }
    }

    @Inject
    ScheduleNotificationSubscriberService(CConfiguration cConfiguration, MessagingService messagingService, DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient, MetricsCollectionService metricsCollectionService) {
        this.cConf = cConfiguration;
        this.messagingService = messagingService;
        this.datasetFramework = datasetFramework;
        this.txClient = transactionSystemClient;
        this.metricsCollectionService = metricsCollectionService;
    }

    protected void startUp() throws Exception {
        LOG.info("Starting {}", getClass().getSimpleName());
        this.subscriberExecutor = Executors.newScheduledThreadPool(1, Threads.createDaemonThreadFactory("scheduler-notification-subscriber-%d"));
        Futures.successfulAsList((Iterable) this.subscriberServices.stream().map((v0) -> {
            return v0.start();
        }).collect(Collectors.toList())).get();
    }

    protected void shutDown() throws Exception {
        Futures.successfulAsList((Iterable) this.subscriberServices.stream().map((v0) -> {
            return v0.stop();
        }).collect(Collectors.toList())).get();
        for (Service service : this.subscriberServices) {
            try {
                service.stop().get();
            } catch (ExecutionException e) {
                LOG.warn("Exception raised when stopping service {}", service, e.getCause());
            }
        }
        this.subscriberExecutor.shutdownNow();
        LOG.info("Stopped {}", getClass().getSimpleName());
    }
}
