package co.cask.cdap.internal.app.runtime.schedule;

import co.cask.cdap.app.runtime.ProgramRuntimeService;
import co.cask.cdap.app.store.StoreFactory;
import co.cask.cdap.config.PreferencesStore;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.discovery.ServiceDiscovered;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/schedule/DistributedSchedulerService.class */
public final class DistributedSchedulerService extends AbstractSchedulerService {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedSchedulerService.class);
    private final DiscoveryServiceClient discoveryServiceClient;
    private final AtomicBoolean schedulerStarted;
    private Cancellable cancellable;

    @Inject
    public DistributedSchedulerService(Supplier<org.quartz.Scheduler> supplier, StoreFactory storeFactory, ProgramRuntimeService programRuntimeService, DiscoveryServiceClient discoveryServiceClient, PreferencesStore preferencesStore) {
        super(supplier, storeFactory, programRuntimeService, preferencesStore);
        this.schedulerStarted = new AtomicBoolean(false);
        this.discoveryServiceClient = discoveryServiceClient;
    }

    protected void startUp() throws Exception {
        this.cancellable = this.discoveryServiceClient.discover("dataset.service").watchChanges(new ServiceDiscovered.ChangeListener() { // from class: co.cask.cdap.internal.app.runtime.schedule.DistributedSchedulerService.1
            public void onChange(ServiceDiscovered serviceDiscovered) {
                if (Iterables.isEmpty(serviceDiscovered) || DistributedSchedulerService.this.schedulerStarted.get()) {
                    return;
                }
                DistributedSchedulerService.LOG.info("Starting scheduler, Discovered {} dataset service(s)", Integer.valueOf(Iterables.size(serviceDiscovered)));
                try {
                    DistributedSchedulerService.this.startScheduler();
                    DistributedSchedulerService.this.schedulerStarted.set(true);
                } catch (Throwable th) {
                    DistributedSchedulerService.LOG.error("Exception when starting scheduler.", th);
                }
            }
        }, MoreExecutors.sameThreadExecutor());
    }

    protected void shutDown() throws Exception {
        try {
            LOG.info("Stopping scheduler");
            stopScheduler();
            this.schedulerStarted.set(false);
            if (this.cancellable != null) {
                this.cancellable.cancel();
            }
        } catch (Throwable th) {
            this.schedulerStarted.set(false);
            if (this.cancellable != null) {
                this.cancellable.cancel();
            }
            throw th;
        }
    }
}
