package co.cask.cdap.internal.app.runtime.schedule;

import co.cask.cdap.api.app.ApplicationSpecification;
import co.cask.cdap.api.dataset.lib.cube.AggregationFunction;
import co.cask.cdap.api.dataset.lib.cube.TimeValue;
import co.cask.cdap.api.metrics.MetricDataQuery;
import co.cask.cdap.api.metrics.MetricStore;
import co.cask.cdap.api.metrics.MetricTimeSeries;
import co.cask.cdap.api.schedule.SchedulableProgramType;
import co.cask.cdap.api.schedule.Schedule;
import co.cask.cdap.api.schedule.ScheduleSpecification;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.namespace.NamespaceQueryAdmin;
import co.cask.cdap.common.stream.notification.StreamSizeNotification;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.schedule.Scheduler;
import co.cask.cdap.internal.app.runtime.schedule.store.DatasetBasedStreamSizeScheduleStore;
import co.cask.cdap.internal.app.services.ProgramLifecycleService;
import co.cask.cdap.internal.app.services.PropertiesResolver;
import co.cask.cdap.internal.schedule.StreamSizeSchedule;
import co.cask.cdap.notifications.service.NotificationContext;
import co.cask.cdap.notifications.service.NotificationHandler;
import co.cask.cdap.notifications.service.NotificationService;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.ScheduledRuntime;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/schedule/StreamSizeScheduler.class */
public class StreamSizeScheduler implements Scheduler {
    private static final Logger LOG = LoggerFactory.getLogger(StreamSizeScheduler.class);
    private static final int STREAM_POLLING_THREAD_POOL_SIZE = 10;
    private static final int POLLING_AFTER_NOTIFICATION_RETRY = 3;
    private final long pollingDelay;
    private final NotificationService notificationService;
    private final MetricStore metricStore;
    private final Provider<Store> storeProvider;
    private final ProgramLifecycleService lifecycleService;
    private final PropertiesResolver propertiesResolver;
    private final DatasetBasedStreamSizeScheduleStore scheduleStore;
    private final NamespaceQueryAdmin namespaceQueryAdmin;
    private final CConfiguration cConf;
    private Store store;
    private Executor sendPollingInfoExecutor;
    private ScheduledExecutorService pollBookingExecutor;
    private ScheduledExecutorService streamPollingExecutor;
    private ListeningExecutorService taskExecutorService;
    private final ConcurrentMap<Id.Stream, StreamSubscriber> streamSubscribers = Maps.newConcurrentMap();
    private final ConcurrentSkipListMap<String, StreamSubscriber> scheduleSubscribers = new ConcurrentSkipListMap<>();
    private boolean schedulerStarted = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/schedule/StreamSizeScheduler$StreamSize.class */
    public final class StreamSize {
        private final long size;
        private final long timestamp;

        private StreamSize(long j, long j2) {
            this.size = j;
            this.timestamp = j2;
        }

        public long getSize() {
            return this.size;
        }

        public long getTimestamp() {
            return this.timestamp;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/schedule/StreamSizeScheduler$StreamSizeScheduleTask.class */
    public final class StreamSizeScheduleTask {
        private final Id.Program programId;
        private final SchedulableProgramType programType;
        private final AtomicBoolean active;
        private final Map<String, String> properties;
        private StreamSizeSchedule streamSizeSchedule;
        private long basePollSize;
        private long basePollTs;
        private long lastRunSize;
        private long lastRunTs;

        private StreamSizeScheduleTask(Id.Program program, SchedulableProgramType schedulableProgramType, StreamSizeSchedule streamSizeSchedule, Map<String, String> map) {
            this.programId = program;
            this.programType = schedulableProgramType;
            this.streamSizeSchedule = streamSizeSchedule;
            this.properties = map == null ? ImmutableMap.of() : ImmutableMap.copyOf(map);
            this.active = new AtomicBoolean(false);
        }

        public void storeNewSchedule(long j, long j2) throws SchedulerException {
            StreamSizeScheduler.LOG.debug("Starting new schedule {} with basePollSize {}, basePollTs {}", new Object[]{this.streamSizeSchedule.getName(), Long.valueOf(j), Long.valueOf(j2)});
            this.basePollSize = j;
            this.basePollTs = j2;
            this.lastRunSize = -1L;
            this.lastRunTs = -1L;
            try {
                StreamSizeScheduler.this.scheduleStore.persist(this.programId, this.programType, this.streamSizeSchedule, this.properties, j, j2, this.lastRunSize, this.lastRunTs, this.active.get());
            } catch (Throwable th) {
                throw new SchedulerException("Error when persisting schedule " + this.streamSizeSchedule.getName() + "in store", th);
            }
        }

        public void startScheduleFromStore(long j, long j2, long j3, long j4, boolean z) {
            StreamSizeScheduler.LOG.debug("Starting schedule from store {} with basePollSize {}, basePollTs {}, active {}", new Object[]{this.streamSizeSchedule.getName(), Long.valueOf(j), Long.valueOf(j2), Boolean.valueOf(z)});
            this.basePollSize = j;
            this.basePollTs = j2;
            this.lastRunSize = j3;
            this.lastRunTs = j4;
            this.active.set(z);
        }

        public void deleteFromStore() throws SchedulerException {
            try {
                StreamSizeScheduler.this.scheduleStore.delete(this.programId, this.programType, this.streamSizeSchedule.getName());
            } catch (Throwable th) {
                throw new SchedulerException("Error when deleting schedule " + this.streamSizeSchedule.getName() + "from store", th);
            }
        }

        public boolean isActive() {
            return this.active.get();
        }

        public void receivedPollingInformation(@Nonnull StreamSize streamSize) {
            Preconditions.checkNotNull(streamSize);
            if (this.active.get()) {
                final ImmutableMap.Builder builder = ImmutableMap.builder();
                if (streamSize.getSize() - this.basePollSize < StreamSizeScheduler.toBytes(this.streamSizeSchedule.getDataTriggerMB())) {
                    return;
                }
                builder.put(ProgramOptionConstants.SCHEDULE_NAME, this.streamSizeSchedule.getName());
                builder.put(ProgramOptionConstants.LOGICAL_START_TIME, Long.toString(streamSize.getTimestamp()));
                builder.put(ProgramOptionConstants.RUN_DATA_SIZE, Long.toString(streamSize.getSize()));
                builder.put(ProgramOptionConstants.RUN_BASE_COUNT_TIME, Long.toString(this.basePollTs));
                builder.put(ProgramOptionConstants.RUN_BASE_COUNT_SIZE, Long.toString(this.basePollSize));
                builder.putAll(this.properties);
                final ImmutableMap of = ImmutableMap.of(ProgramOptionConstants.LOGICAL_START_TIME, Long.toString(streamSize.getTimestamp()));
                if (this.lastRunSize != -1 && this.lastRunTs != -1) {
                    builder.put(ProgramOptionConstants.LAST_SCHEDULED_RUN_LOGICAL_START_TIME, Long.toString(this.lastRunTs));
                    builder.put(ProgramOptionConstants.LAST_SCHEDULED_RUN_DATA_SIZE, Long.toString(this.lastRunSize));
                }
                try {
                    StreamSizeScheduler.this.scheduleStore.updateBaseRun(this.programId, this.programType, this.streamSizeSchedule.getName(), streamSize.getSize(), streamSize.getTimestamp());
                    final StreamSizeSchedule streamSizeSchedule = this.streamSizeSchedule;
                    this.basePollSize = streamSize.getSize();
                    this.basePollTs = streamSize.getTimestamp();
                    final ScheduleTaskRunner scheduleTaskRunner = new ScheduleTaskRunner(StreamSizeScheduler.this.store, StreamSizeScheduler.this.lifecycleService, StreamSizeScheduler.this.propertiesResolver, StreamSizeScheduler.this.taskExecutorService, StreamSizeScheduler.this.namespaceQueryAdmin, StreamSizeScheduler.this.cConf);
                    try {
                        StreamSizeScheduler.this.scheduleStore.updateLastRun(this.programId, this.programType, this.streamSizeSchedule.getName(), streamSize.getSize(), streamSize.getTimestamp(), new DatasetBasedStreamSizeScheduleStore.TransactionMethod() { // from class: co.cask.cdap.internal.app.runtime.schedule.StreamSizeScheduler.StreamSizeScheduleTask.1
                            @Override // co.cask.cdap.internal.app.runtime.schedule.store.DatasetBasedStreamSizeScheduleStore.TransactionMethod
                            public void execute() throws Exception {
                                StreamSizeScheduler.LOG.info("About to start streamSizeSchedule {}", streamSizeSchedule.getName());
                                scheduleTaskRunner.run(StreamSizeScheduleTask.this.programId, builder.build(), of);
                            }
                        });
                        this.lastRunSize = streamSize.getSize();
                        this.lastRunTs = streamSize.getTimestamp();
                    } catch (Throwable th) {
                        StreamSizeScheduler.LOG.error("Error when persisting last run information for schedule {} in store", this.streamSizeSchedule.getName(), th);
                    }
                } catch (Throwable th2) {
                    StreamSizeScheduler.LOG.error("Error when persisting new base information for schedule {} in store. Program will not be executed", this.streamSizeSchedule.getName(), th2);
                }
            }
        }

        public boolean suspend() throws SchedulerException {
            if (!this.active.compareAndSet(true, false)) {
                return false;
            }
            try {
                StreamSizeScheduler.this.scheduleStore.suspend(this.programId, this.programType, this.streamSizeSchedule.getName());
                return true;
            } catch (Throwable th) {
                this.active.set(true);
                throw new SchedulerException("Error when suspending schedule " + this.streamSizeSchedule.getName() + "in store", th);
            }
        }

        public boolean resume() throws SchedulerException {
            if (!this.active.compareAndSet(false, true)) {
                return false;
            }
            try {
                StreamSizeScheduler.this.scheduleStore.resume(this.programId, this.programType, this.streamSizeSchedule.getName());
                return true;
            } catch (Throwable th) {
                this.active.set(false);
                throw new SchedulerException("Error when resuming schedule " + this.streamSizeSchedule.getName() + "in store", th);
            }
        }

        public void updateSchedule(StreamSizeSchedule streamSizeSchedule) throws SchedulerException {
            if (streamSizeSchedule.equals(this.streamSizeSchedule)) {
                return;
            }
            try {
                StreamSizeScheduler.this.scheduleStore.updateSchedule(this.programId, this.programType, this.streamSizeSchedule.getName(), streamSizeSchedule);
                this.streamSizeSchedule = streamSizeSchedule;
            } catch (Throwable th) {
                throw new SchedulerException("Error when updating schedule " + this.streamSizeSchedule.getName() + "in store", th);
            }
        }

        public boolean shouldTriggerProgram(long j) {
            return this.active.get() && j >= this.basePollSize + StreamSizeScheduler.toBytes(this.streamSizeSchedule.getDataTriggerMB());
        }

        public void reset(long j) {
            this.basePollSize = 0L;
            this.basePollTs = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/schedule/StreamSizeScheduler$StreamSubscriber.class */
    public final class StreamSubscriber extends AbstractScheduledService implements NotificationHandler<StreamSizeNotification> {
        private final ConcurrentMap<String, StreamSizeScheduleTask> scheduleTasks;
        private final Id.Stream streamId;
        private final AtomicInteger activeTasks;
        private final Object deltaLock;
        private Cancellable notificationSubscription;
        private StreamSizeNotification lastNotification;
        private StreamSize lastPollingInfo;
        private Long delta;

        private StreamSubscriber(Id.Stream stream) {
            this.streamId = stream;
            this.scheduleTasks = Maps.newConcurrentMap();
            this.activeTasks = new AtomicInteger(0);
            this.delta = null;
            this.deltaLock = new Object();
        }

        protected void startUp() throws Exception {
            this.notificationSubscription = StreamSizeScheduler.this.notificationService.subscribe(getFeed(), this, StreamSizeScheduler.this.sendPollingInfoExecutor);
        }

        protected void shutDown() throws Exception {
            if (this.notificationSubscription != null) {
                this.notificationSubscription.cancel();
            }
        }

        protected void runOneIteration() throws Exception {
            if (this.activeTasks.get() == 0) {
                return;
            }
            try {
                sendPollingInfoToActiveTasks(pollOnce());
            } catch (IOException e) {
                StreamSizeScheduler.LOG.error("Could not poll stream {}", this.streamId.getId(), e);
            } catch (Throwable th) {
                StreamSizeScheduler.LOG.error("Error in scheduled polling for stream {}", this.streamId.getId(), th);
            }
        }

        protected AbstractScheduledService.Scheduler scheduler() {
            return AbstractScheduledService.Scheduler.newFixedRateSchedule(StreamSizeScheduler.this.pollingDelay, StreamSizeScheduler.this.pollingDelay, TimeUnit.MILLISECONDS);
        }

        protected ScheduledExecutorService executor() {
            return StreamSizeScheduler.this.streamPollingExecutor;
        }

        public Type getNotificationType() {
            return StreamSizeNotification.class;
        }

        public void received(StreamSizeNotification streamSizeNotification, NotificationContext notificationContext) {
            if (this.lastNotification == null || streamSizeNotification.getTimestamp() > this.lastNotification.getTimestamp()) {
                this.lastNotification = streamSizeNotification;
                if (this.activeTasks.get() <= 0) {
                    return;
                }
                boolean z = false;
                Long l = null;
                synchronized (this.deltaLock) {
                    if (this.delta == null) {
                        z = true;
                    } else {
                        Iterator<StreamSizeScheduleTask> it = this.scheduleTasks.values().iterator();
                        while (true) {
                            if (!it.hasNext()) {
                                break;
                            }
                            if (it.next().shouldTriggerProgram(streamSizeNotification.getSize() - this.delta.longValue())) {
                                z = true;
                                l = Long.valueOf(streamSizeNotification.getSize() - this.delta.longValue());
                                break;
                            }
                        }
                    }
                }
                if (z) {
                    pollAfterNotification(streamSizeNotification, l);
                }
            }
        }

        public int getActiveTasksCount() {
            return this.activeTasks.get();
        }

        private void pollAfterNotification(final StreamSizeNotification streamSizeNotification, @Nullable final Long l) {
            final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            final AtomicInteger atomicInteger = new AtomicInteger(StreamSizeScheduler.POLLING_AFTER_NOTIFICATION_RETRY);
            StreamSizeScheduler.this.pollBookingExecutor.schedule(new Runnable() { // from class: co.cask.cdap.internal.app.runtime.schedule.StreamSizeScheduler.StreamSubscriber.1
                @Override // java.lang.Runnable
                public void run() {
                    StreamSize pollOnce;
                    try {
                        boolean z = false;
                        synchronized (StreamSubscriber.this.deltaLock) {
                            pollOnce = StreamSubscriber.this.pollOnce();
                            if (atomicBoolean.compareAndSet(true, false)) {
                                StreamSubscriber.this.delta = Long.valueOf(streamSizeNotification.getSize() - pollOnce.getSize());
                            }
                            if (l != null && pollOnce.getSize() >= l.longValue()) {
                                z = true;
                            }
                        }
                        StreamSubscriber.this.sendPollingInfoToActiveTasks(pollOnce);
                        if (l != null && !z && atomicInteger.decrementAndGet() >= 0) {
                            StreamSizeScheduler.this.pollBookingExecutor.schedule(this, 1L, TimeUnit.SECONDS);
                        } else if (l != null && !z) {
                            StreamSizeScheduler.LOG.debug("Polling estimate {} was not reached for stream {} after {} retries", new Object[]{l, StreamSubscriber.this.streamId.getId(), Integer.valueOf(StreamSizeScheduler.POLLING_AFTER_NOTIFICATION_RETRY)});
                        }
                    } catch (IOException e) {
                        StreamSizeScheduler.LOG.error("Could not poll stream {}", StreamSubscriber.this.streamId.getId(), e);
                    } catch (Throwable th) {
                        StreamSizeScheduler.LOG.error("Error when polling stream {} and sending info to active tasks", StreamSubscriber.this.streamId.getId(), th);
                    }
                }
            }, 1L, TimeUnit.SECONDS);
        }

        public void createScheduleTask(Id.Program program, SchedulableProgramType schedulableProgramType, StreamSizeSchedule streamSizeSchedule, Map<String, String> map) throws SchedulerException {
            synchronized (this) {
                String scheduleIdFor = AbstractSchedulerService.scheduleIdFor(program, schedulableProgramType, streamSizeSchedule.getName());
                if (this.scheduleTasks.get(scheduleIdFor) != null) {
                    throw new SchedulerException("Tried to overwrite schedule " + streamSizeSchedule.getName());
                }
                try {
                    StreamSize pollOnce = pollOnce();
                    StreamSizeScheduleTask streamSizeScheduleTask = new StreamSizeScheduleTask(program, schedulableProgramType, streamSizeSchedule, map);
                    streamSizeScheduleTask.storeNewSchedule(pollOnce.getSize(), pollOnce.getTimestamp());
                    this.scheduleTasks.put(scheduleIdFor, streamSizeScheduleTask);
                } catch (IOException e) {
                    throw new SchedulerException("Polling could not be performed on stream " + streamSizeSchedule.getStreamName(), e);
                }
            }
        }

        public synchronized void restoreScheduleFromStore(Id.Program program, SchedulableProgramType schedulableProgramType, StreamSizeSchedule streamSizeSchedule, Map<String, String> map, boolean z, long j, long j2, long j3, long j4) throws SchedulerException {
            String scheduleIdFor = AbstractSchedulerService.scheduleIdFor(program, schedulableProgramType, streamSizeSchedule.getName());
            if (this.scheduleTasks.get(scheduleIdFor) != null) {
                throw new SchedulerException("Tried to overwrite schedule " + streamSizeSchedule.getName());
            }
            StreamSizeScheduleTask streamSizeScheduleTask = new StreamSizeScheduleTask(program, schedulableProgramType, streamSizeSchedule, map);
            streamSizeScheduleTask.startScheduleFromStore(j, j2, j3, j4, z);
            this.scheduleTasks.put(scheduleIdFor, streamSizeScheduleTask);
            if (z) {
                this.activeTasks.incrementAndGet();
            }
        }

        public synchronized void suspendScheduleTask(Id.Program program, SchedulableProgramType schedulableProgramType, String str) throws ScheduleNotFoundException, SchedulerException {
            StreamSizeScheduleTask streamSizeScheduleTask = this.scheduleTasks.get(AbstractSchedulerService.scheduleIdFor(program, schedulableProgramType, str));
            if (streamSizeScheduleTask == null) {
                throw new ScheduleNotFoundException(Id.Schedule.from(program.getApplication(), str));
            }
            if (streamSizeScheduleTask.suspend()) {
                this.activeTasks.decrementAndGet();
            }
        }

        public synchronized void resumeScheduleTask(Id.Program program, SchedulableProgramType schedulableProgramType, String str) throws ScheduleNotFoundException, SchedulerException {
            StreamSizeScheduleTask streamSizeScheduleTask = this.scheduleTasks.get(AbstractSchedulerService.scheduleIdFor(program, schedulableProgramType, str));
            if (streamSizeScheduleTask == null) {
                throw new ScheduleNotFoundException(Id.Schedule.from(program.getApplication(), str));
            }
            if (streamSizeScheduleTask.resume()) {
                this.activeTasks.incrementAndGet();
                try {
                    sendPollingInfoToActiveTasks(pollOnce());
                } catch (IOException e) {
                    StreamSizeScheduler.LOG.debug("Ignoring stream events size polling after resuming schedule {} due to error", str, e);
                }
            }
        }

        public synchronized void updateScheduleTask(Id.Program program, SchedulableProgramType schedulableProgramType, StreamSizeSchedule streamSizeSchedule) throws ScheduleNotFoundException, SchedulerException {
            Id.Schedule from = Id.Schedule.from(program.getApplication(), streamSizeSchedule.getName());
            StreamSizeScheduleTask streamSizeScheduleTask = this.scheduleTasks.get(AbstractSchedulerService.scheduleIdFor(program, schedulableProgramType, streamSizeSchedule.getName()));
            if (streamSizeScheduleTask == null) {
                throw new ScheduleNotFoundException(from);
            }
            streamSizeScheduleTask.updateSchedule(streamSizeSchedule);
            try {
                sendPollingInfoToActiveTasks(pollOnce());
            } catch (IOException e) {
                StreamSizeScheduler.LOG.debug("Ignoring stream events size polling after resuming schedule {} due to error", from.getId(), e);
            }
        }

        public synchronized void deleteSchedule(Id.Program program, SchedulableProgramType schedulableProgramType, String str) throws ScheduleNotFoundException, SchedulerException {
            String scheduleIdFor = AbstractSchedulerService.scheduleIdFor(program, schedulableProgramType, str);
            Id.Schedule from = Id.Schedule.from(program.getApplication(), str);
            StreamSizeScheduleTask streamSizeScheduleTask = this.scheduleTasks.get(scheduleIdFor);
            if (streamSizeScheduleTask == null) {
                throw new ScheduleNotFoundException(from);
            }
            streamSizeScheduleTask.deleteFromStore();
            this.scheduleTasks.remove(scheduleIdFor);
            if (streamSizeScheduleTask.isActive()) {
                this.activeTasks.decrementAndGet();
            }
        }

        public Scheduler.ScheduleState scheduleTaskState(Id.Program program, SchedulableProgramType schedulableProgramType, String str) {
            StreamSizeScheduleTask streamSizeScheduleTask = this.scheduleTasks.get(AbstractSchedulerService.scheduleIdFor(program, schedulableProgramType, str));
            return streamSizeScheduleTask == null ? Scheduler.ScheduleState.NOT_FOUND : streamSizeScheduleTask.isActive() ? Scheduler.ScheduleState.SCHEDULED : Scheduler.ScheduleState.SUSPENDED;
        }

        public Id.Stream getStreamId() {
            return this.streamId;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized StreamSize pollOnce() throws IOException {
            StreamSize queryStreamEventsSize = queryStreamEventsSize();
            if (this.lastPollingInfo != null && queryStreamEventsSize.getSize() < this.lastPollingInfo.getSize()) {
                this.delta = null;
                Iterator<StreamSizeScheduleTask> it = this.scheduleTasks.values().iterator();
                while (it.hasNext()) {
                    it.next().reset(queryStreamEventsSize.getTimestamp());
                }
            }
            this.lastPollingInfo = queryStreamEventsSize;
            return queryStreamEventsSize;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void sendPollingInfoToActiveTasks(final StreamSize streamSize) {
            for (final StreamSizeScheduleTask streamSizeScheduleTask : this.scheduleTasks.values()) {
                if (streamSizeScheduleTask.isActive()) {
                    StreamSizeScheduler.this.sendPollingInfoExecutor.execute(new Runnable() { // from class: co.cask.cdap.internal.app.runtime.schedule.StreamSizeScheduler.StreamSubscriber.2
                        @Override // java.lang.Runnable
                        public void run() {
                            streamSizeScheduleTask.receivedPollingInformation(streamSize);
                        }
                    });
                }
            }
        }

        private Id.NotificationFeed getFeed() {
            return new Id.NotificationFeed.Builder().setNamespaceId(this.streamId.getNamespaceId()).setCategory("stream").setName(String.format("%sSize", this.streamId.getId())).build();
        }

        private StreamSize queryStreamEventsSize() throws IOException {
            try {
                Collection query = StreamSizeScheduler.this.metricStore.query(new MetricDataQuery(0L, TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()), Integer.MAX_VALUE, "system.collect.bytes", AggregationFunction.SUM, ImmutableMap.of("ns", this.streamId.getNamespaceId(), "str", this.streamId.getId()), ImmutableList.of()));
                if (query == null || query.isEmpty()) {
                    return new StreamSize(0L, System.currentTimeMillis());
                }
                List timeValues = ((MetricTimeSeries) query.iterator().next()).getTimeValues();
                if (timeValues == null || timeValues.size() != 1) {
                    throw new IOException("Should collect exactly one time value");
                }
                return new StreamSize(((TimeValue) timeValues.get(0)).getValue(), System.currentTimeMillis());
            } catch (Exception e) {
                Throwables.propagateIfInstanceOf(e, IOException.class);
                throw new IOException(e);
            }
        }
    }

    @Inject
    public StreamSizeScheduler(CConfiguration cConfiguration, NotificationService notificationService, MetricStore metricStore, Provider<Store> provider, ProgramLifecycleService programLifecycleService, PropertiesResolver propertiesResolver, DatasetBasedStreamSizeScheduleStore datasetBasedStreamSizeScheduleStore, NamespaceQueryAdmin namespaceQueryAdmin) {
        this.pollingDelay = TimeUnit.SECONDS.toMillis(cConfiguration.getLong("stream.size.schedule.polling.delay"));
        this.notificationService = notificationService;
        this.metricStore = metricStore;
        this.storeProvider = provider;
        this.lifecycleService = programLifecycleService;
        this.propertiesResolver = propertiesResolver;
        this.scheduleStore = datasetBasedStreamSizeScheduleStore;
        this.namespaceQueryAdmin = namespaceQueryAdmin;
        this.cConf = cConfiguration;
    }

    public void init() throws SchedulerException {
        this.sendPollingInfoExecutor = Executors.newCachedThreadPool(Threads.createDaemonThreadFactory("stream-size-scheduler-%d"));
        this.streamPollingExecutor = Executors.newScheduledThreadPool(STREAM_POLLING_THREAD_POOL_SIZE, Threads.createDaemonThreadFactory("stream-polling-%d"));
        this.pollBookingExecutor = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("polling-booking-executor"));
        this.taskExecutorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(Threads.createDaemonThreadFactory("stream-schedule-task")));
        this.store = (Store) this.storeProvider.get();
        initializeScheduleStore();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() throws SchedulerException {
        this.schedulerStarted = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isStarted() {
        return this.schedulerStarted;
    }

    private void initializeScheduleStore() throws SchedulerException {
        try {
            this.scheduleStore.initialize();
            try {
                for (StreamSizeScheduleState streamSizeScheduleState : this.scheduleStore.list()) {
                    try {
                        restoreScheduleFromStore(streamSizeScheduleState.getProgramId(), streamSizeScheduleState.getProgramType(), streamSizeScheduleState.getStreamSizeSchedule(), streamSizeScheduleState.getProperties(), streamSizeScheduleState.isRunning(), streamSizeScheduleState.getBaseRunSize(), streamSizeScheduleState.getBaseRunTs(), streamSizeScheduleState.getLastRunSize(), streamSizeScheduleState.getLastRunTs());
                    } catch (SchedulerException e) {
                        LOG.error("Could not schedule task '{}' from persistent store", streamSizeScheduleState, e);
                    }
                }
                for (StreamSubscriber streamSubscriber : this.streamSubscribers.values()) {
                    if (streamSubscriber.getActiveTasksCount() > 0) {
                        try {
                            streamSubscriber.sendPollingInfoToActiveTasks(streamSubscriber.pollOnce());
                        } catch (IOException e2) {
                            LOG.warn("Could not poll size for stream '{}'", streamSubscriber.getStreamId(), e2);
                        }
                    }
                }
            } catch (Throwable th) {
                throw new SchedulerException("Error when listing stream size schedules from store", th);
            }
        } catch (Throwable th2) {
            throw new SchedulerException("Error when initializing store for stream size schedules", th2);
        }
    }

    public void stop() {
        Iterator<StreamSubscriber> it = this.streamSubscribers.values().iterator();
        while (it.hasNext()) {
            it.next().stopAndWait();
        }
        if (this.pollBookingExecutor != null) {
            this.pollBookingExecutor.shutdownNow();
        }
        if (this.streamPollingExecutor != null) {
            this.streamPollingExecutor.shutdownNow();
        }
        if (this.taskExecutorService != null) {
            this.taskExecutorService.shutdownNow();
        }
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.Scheduler
    public void schedule(Id.Program program, SchedulableProgramType schedulableProgramType, Schedule schedule) throws SchedulerException {
        schedule(program, schedulableProgramType, schedule, (Map<String, String>) ImmutableMap.of());
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.Scheduler
    public void schedule(Id.Program program, SchedulableProgramType schedulableProgramType, Schedule schedule, Map<String, String> map) throws SchedulerException {
        Preconditions.checkArgument(schedule instanceof StreamSizeSchedule, "Schedule should be of type StreamSizeSchedule");
        StreamSizeSchedule streamSizeSchedule = (StreamSizeSchedule) schedule;
        StreamSubscriber streamSubscriberForSchedule = streamSubscriberForSchedule(program, streamSizeSchedule);
        streamSubscriberForSchedule.createScheduleTask(program, schedulableProgramType, streamSizeSchedule, map);
        this.scheduleSubscribers.put(AbstractSchedulerService.scheduleIdFor(program, schedulableProgramType, streamSizeSchedule.getName()), streamSubscriberForSchedule);
    }

    private void restoreScheduleFromStore(Id.Program program, SchedulableProgramType schedulableProgramType, StreamSizeSchedule streamSizeSchedule, Map<String, String> map, boolean z, long j, long j2, long j3, long j4) throws SchedulerException {
        StreamSubscriber streamSubscriberForSchedule = streamSubscriberForSchedule(program, streamSizeSchedule);
        streamSubscriberForSchedule.restoreScheduleFromStore(program, schedulableProgramType, streamSizeSchedule, map, z, j, j2, j3, j4);
        this.scheduleSubscribers.put(AbstractSchedulerService.scheduleIdFor(program, schedulableProgramType, streamSizeSchedule.getName()), streamSubscriberForSchedule);
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.Scheduler
    public void schedule(Id.Program program, SchedulableProgramType schedulableProgramType, Iterable<Schedule> iterable) throws SchedulerException {
        schedule(program, schedulableProgramType, iterable, (Map<String, String>) ImmutableMap.of());
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.Scheduler
    public void schedule(Id.Program program, SchedulableProgramType schedulableProgramType, Iterable<Schedule> iterable, Map<String, String> map) throws SchedulerException {
        Iterator<Schedule> it = iterable.iterator();
        while (it.hasNext()) {
            schedule(program, schedulableProgramType, it.next(), map);
        }
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.Scheduler
    public List<ScheduledRuntime> previousScheduledRuntime(Id.Program program, SchedulableProgramType schedulableProgramType) throws SchedulerException {
        return ImmutableList.of();
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.Scheduler
    public List<ScheduledRuntime> nextScheduledRuntime(Id.Program program, SchedulableProgramType schedulableProgramType) throws SchedulerException {
        return ImmutableList.of();
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.Scheduler
    public List<String> getScheduleIds(Id.Program program, SchedulableProgramType schedulableProgramType) throws SchedulerException {
        String programIdFor = AbstractSchedulerService.programIdFor(program, schedulableProgramType);
        return ImmutableList.copyOf(this.scheduleSubscribers.subMap(String.format("%s%c", programIdFor, ':'), String.format("%s%c", programIdFor, Character.valueOf((char) (58 + 1)))).keySet());
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.Scheduler
    public void suspendSchedule(Id.Program program, SchedulableProgramType schedulableProgramType, String str) throws ScheduleNotFoundException, SchedulerException {
        StreamSubscriber streamSubscriber = this.scheduleSubscribers.get(AbstractSchedulerService.scheduleIdFor(program, schedulableProgramType, str));
        if (streamSubscriber == null) {
            throw new ScheduleNotFoundException(Id.Schedule.from(program.getApplication(), str));
        }
        streamSubscriber.suspendScheduleTask(program, schedulableProgramType, str);
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.Scheduler
    public void resumeSchedule(Id.Program program, SchedulableProgramType schedulableProgramType, String str) throws ScheduleNotFoundException, SchedulerException {
        StreamSubscriber streamSubscriber = this.scheduleSubscribers.get(AbstractSchedulerService.scheduleIdFor(program, schedulableProgramType, str));
        if (streamSubscriber == null) {
            throw new ScheduleNotFoundException(Id.Schedule.from(program.getApplication(), str));
        }
        streamSubscriber.resumeScheduleTask(program, schedulableProgramType, str);
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.Scheduler
    public void updateSchedule(Id.Program program, SchedulableProgramType schedulableProgramType, Schedule schedule) throws NotFoundException, SchedulerException {
        updateSchedule(program, schedulableProgramType, schedule, ImmutableMap.of());
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.Scheduler
    public void updateSchedule(Id.Program program, SchedulableProgramType schedulableProgramType, Schedule schedule, Map<String, String> map) throws NotFoundException, SchedulerException {
        Preconditions.checkArgument(schedule instanceof StreamSizeSchedule, "Schedule should be of type StreamSizeSchedule");
        StreamSizeSchedule streamSizeSchedule = (StreamSizeSchedule) schedule;
        StreamSubscriber streamSubscriber = this.scheduleSubscribers.get(AbstractSchedulerService.scheduleIdFor(program, schedulableProgramType, schedule.getName()));
        if (streamSubscriber == null) {
            throw new ScheduleNotFoundException(Id.Schedule.from(program.getApplication(), schedule.getName()));
        }
        if (streamSizeSchedule.getStreamName().equals(streamSubscriber.getStreamId().getId())) {
            streamSubscriber.updateScheduleTask(program, schedulableProgramType, streamSizeSchedule);
        } else {
            deleteSchedule(program, schedulableProgramType, schedule.getName());
            schedule(program, schedulableProgramType, schedule, map);
        }
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.Scheduler
    public void deleteSchedule(Id.Program program, SchedulableProgramType schedulableProgramType, String str) throws ScheduleNotFoundException, SchedulerException {
        StreamSubscriber remove = this.scheduleSubscribers.remove(AbstractSchedulerService.scheduleIdFor(program, schedulableProgramType, str));
        if (remove == null) {
            throw new ScheduleNotFoundException(Id.Schedule.from(program.getApplication(), str));
        }
        remove.deleteSchedule(program, schedulableProgramType, str);
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.Scheduler
    public void deleteSchedules(Id.Program program, SchedulableProgramType schedulableProgramType) throws SchedulerException {
        String programIdFor = AbstractSchedulerService.programIdFor(program, schedulableProgramType);
        NavigableSet<String> keySet = this.scheduleSubscribers.subMap(String.format("%s%c", programIdFor, ':'), String.format("%s%c", programIdFor, Character.valueOf((char) (58 + 1)))).keySet();
        int length = programIdFor.length() + 1;
        for (String str : keySet) {
            try {
                if (str.length() < length) {
                    LOG.warn("Format of scheduleID incorrect: {}", str);
                } else {
                    deleteSchedule(program, schedulableProgramType, str.substring(length));
                }
            } catch (ScheduleNotFoundException e) {
                LOG.debug("Could not delete schedule, it might have been deleted already by another thread '{}'", str, e);
            }
        }
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.Scheduler
    public void deleteAllSchedules(Id.Namespace namespace) throws SchedulerException {
        Iterator<ApplicationSpecification> it = this.store.getAllApplications(namespace).iterator();
        while (it.hasNext()) {
            deleteAllSchedules(namespace, it.next());
        }
    }

    private void deleteAllSchedules(Id.Namespace namespace, ApplicationSpecification applicationSpecification) throws SchedulerException {
        for (ScheduleSpecification scheduleSpecification : applicationSpecification.getSchedules().values()) {
            deleteSchedules(Id.Program.from(Id.Application.from(namespace.getId(), applicationSpecification.getName()), ProgramType.valueOfSchedulableType(scheduleSpecification.getProgram().getProgramType()), scheduleSpecification.getProgram().getProgramName()), scheduleSpecification.getProgram().getProgramType());
        }
    }

    @Override // co.cask.cdap.internal.app.runtime.schedule.Scheduler
    public Scheduler.ScheduleState scheduleState(Id.Program program, SchedulableProgramType schedulableProgramType, String str) throws SchedulerException {
        StreamSubscriber streamSubscriber = this.scheduleSubscribers.get(AbstractSchedulerService.scheduleIdFor(program, schedulableProgramType, str));
        return streamSubscriber != null ? streamSubscriber.scheduleTaskState(program, schedulableProgramType, str) : Scheduler.ScheduleState.NOT_FOUND;
    }

    private StreamSubscriber streamSubscriberForSchedule(Id.Program program, StreamSizeSchedule streamSizeSchedule) {
        Id.Stream from = Id.Stream.from(program.getNamespaceId(), streamSizeSchedule.getStreamName());
        StreamSubscriber streamSubscriber = this.streamSubscribers.get(from);
        if (streamSubscriber == null) {
            streamSubscriber = new StreamSubscriber(from);
            StreamSubscriber putIfAbsent = this.streamSubscribers.putIfAbsent(from, streamSubscriber);
            if (putIfAbsent == null) {
                streamSubscriber.startAndWait();
            } else {
                streamSubscriber = putIfAbsent;
            }
        }
        return streamSubscriber;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static long toBytes(int i) {
        return i * 1024 * 1024;
    }
}
