package co.cask.cdap.scheduler;

import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.namespace.NamespaceQueryAdmin;
import co.cask.cdap.common.service.RetryStrategies;
import co.cask.cdap.common.service.RetryStrategy;
import co.cask.cdap.data.dataset.SystemDatasetInstantiator;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.MultiThreadDatasetCache;
import co.cask.cdap.data2.transaction.MultiThreadTransactionAware;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.data2.transaction.TxCallable;
import co.cask.cdap.internal.app.runtime.schedule.ScheduleTaskRunner;
import co.cask.cdap.internal.app.runtime.schedule.constraint.CheckableConstraint;
import co.cask.cdap.internal.app.runtime.schedule.constraint.ConstraintContext;
import co.cask.cdap.internal.app.runtime.schedule.constraint.ConstraintResult;
import co.cask.cdap.internal.app.runtime.schedule.queue.Job;
import co.cask.cdap.internal.app.runtime.schedule.queue.JobQueueDataset;
import co.cask.cdap.internal.app.runtime.schedule.store.Schedulers;
import co.cask.cdap.internal.app.services.ProgramLifecycleService;
import co.cask.cdap.internal.app.services.PropertiesResolver;
import co.cask.cdap.internal.schedule.constraint.Constraint;
import co.cask.cdap.proto.id.NamespaceId;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.tephra.TransactionFailureException;
import org.apache.tephra.TransactionSystemClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/cask/cdap/scheduler/ConstraintCheckerService.class */
public class ConstraintCheckerService extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(ConstraintCheckerService.class);
    private final Transactional transactional;
    private final DatasetFramework datasetFramework;
    private final MultiThreadDatasetCache multiThreadDatasetCache;
    private final Store store;
    private final ProgramLifecycleService lifecycleService;
    private final PropertiesResolver propertiesResolver;
    private final NamespaceQueryAdmin namespaceQueryAdmin;
    private final CConfiguration cConf;
    private ScheduleTaskRunner taskRunner;
    private ListeningExecutorService taskExecutorService;
    private volatile boolean stopping = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/scheduler/ConstraintCheckerService$ConstraintCheckerThread.class */
    public class ConstraintCheckerThread implements Runnable {
        private final int partition;
        private JobQueueDataset jobQueue;
        private Job lastConsumed;
        private int failureCount;
        private final Deque<Job> readyJobs = new ArrayDeque();
        private final RetryStrategy scheduleStrategy = RetryStrategies.exponentialDelay(100, Schedulers.SUBSCRIBER_TX_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);

        ConstraintCheckerThread(int i) {
            this.partition = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.jobQueue = Schedulers.getJobQueue(ConstraintCheckerService.this.multiThreadDatasetCache, ConstraintCheckerService.this.datasetFramework);
            while (!ConstraintCheckerService.this.stopping) {
                try {
                    long checkJobQueue = checkJobQueue();
                    if (checkJobQueue > 0) {
                        TimeUnit.MILLISECONDS.sleep(checkJobQueue);
                    }
                } catch (InterruptedException e) {
                }
            }
        }

        private long checkJobQueue() {
            boolean z = false;
            try {
                z = ((Boolean) Transactions.execute(ConstraintCheckerService.this.transactional, new TxCallable<Boolean>() { // from class: co.cask.cdap.scheduler.ConstraintCheckerService.ConstraintCheckerThread.1
                    /* renamed from: call, reason: merged with bridge method [inline-methods] */
                    public Boolean m342call(DatasetContext datasetContext) throws Exception {
                        return Boolean.valueOf(ConstraintCheckerThread.this.checkJobConstraints());
                    }
                })).booleanValue();
                runReadyJobs();
                this.failureCount = 0;
            } catch (Exception e) {
                ConstraintCheckerService.LOG.warn("Failed to check Job constraints. Will retry in next run", e);
                this.failureCount++;
            }
            return this.failureCount > 0 ? this.scheduleStrategy.nextRetry(this.failureCount, 0L) : (z && this.readyJobs.isEmpty()) ? 2000L : 0L;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean checkJobConstraints() throws Exception {
            boolean z = true;
            CloseableIterator<Job> jobs = this.jobQueue.getJobs(this.partition, this.lastConsumed);
            Throwable th = null;
            try {
                try {
                    Stopwatch start = new Stopwatch().start();
                    while (!ConstraintCheckerService.this.stopping && start.elapsedMillis() < 1000) {
                        if (!jobs.hasNext()) {
                            this.lastConsumed = null;
                            boolean z2 = z;
                            if (jobs != null) {
                                if (0 != 0) {
                                    try {
                                        jobs.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    jobs.close();
                                }
                            }
                            return z2;
                        }
                        Job job = (Job) jobs.next();
                        this.lastConsumed = job;
                        z = false;
                        checkAndUpdateJob(this.jobQueue, job);
                    }
                    if (jobs != null) {
                        if (0 != 0) {
                            try {
                                jobs.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            jobs.close();
                        }
                    }
                    return z;
                } finally {
                }
            } catch (Throwable th4) {
                if (jobs != null) {
                    if (th != null) {
                        try {
                            jobs.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        jobs.close();
                    }
                }
                throw th4;
            }
        }

        private void checkAndUpdateJob(JobQueueDataset jobQueueDataset, Job job) {
            ConstraintResult.SatisfiedState constraintsSatisfied;
            long currentTimeMillis = System.currentTimeMillis();
            if (job.isToBeDeleted()) {
                if (job.getState() == Job.State.PENDING_CONSTRAINT || (job.getState() == Job.State.PENDING_TRIGGER && currentTimeMillis - job.getDeleteTimeMillis().longValue() > 60000)) {
                    jobQueueDataset.deleteJob(job);
                    return;
                }
                return;
            }
            if (currentTimeMillis - job.getCreationTime() >= job.getSchedule().getTimeoutMillis() + 60000) {
                ConstraintCheckerService.LOG.info("Deleted job {}, due to timeout value of {}.", job.getJobKey(), Long.valueOf(job.getSchedule().getTimeoutMillis()));
                jobQueueDataset.deleteJob(job);
            } else if (job.getState() == Job.State.PENDING_CONSTRAINT && (constraintsSatisfied = constraintsSatisfied(job, currentTimeMillis)) != ConstraintResult.SatisfiedState.NOT_SATISFIED) {
                if (constraintsSatisfied == ConstraintResult.SatisfiedState.NEVER_SATISFIED) {
                    jobQueueDataset.deleteJob(job);
                } else {
                    jobQueueDataset.transitState(job, Job.State.PENDING_LAUNCH);
                    this.readyJobs.add(job);
                }
            }
        }

        private void runReadyJobs() {
            Iterator<Job> it = this.readyJobs.iterator();
            while (it.hasNext() && !ConstraintCheckerService.this.stopping) {
                final Job next = it.next();
                try {
                    Transactions.execute(ConstraintCheckerService.this.transactional, new TxCallable<Void>() { // from class: co.cask.cdap.scheduler.ConstraintCheckerService.ConstraintCheckerThread.2
                        /* renamed from: call, reason: merged with bridge method [inline-methods] */
                        public Void m343call(DatasetContext datasetContext) throws Exception {
                            ConstraintCheckerThread.this.runReadyJob(next);
                            return null;
                        }
                    });
                } catch (TransactionFailureException e) {
                    ConstraintCheckerService.LOG.warn("Failed to run program {} in schedule {}. Skip running this program.", new Object[]{next.getSchedule().getProgramId(), next.getSchedule().getName(), e});
                }
                it.remove();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean runReadyJob(Job job) {
            Job job2 = this.jobQueue.getJob(job.getJobKey());
            if (job2 == null) {
                return true;
            }
            if (job2.isToBeDeleted() || job2.getState() != Job.State.PENDING_LAUNCH) {
                this.jobQueue.deleteJob(job);
                return true;
            }
            try {
                ConstraintCheckerService.this.taskRunner.launch(job);
            } catch (Exception e) {
                ConstraintCheckerService.LOG.error("Skip launching job {} because the program {} encountered an exception while launching.", new Object[]{job.getJobKey(), job.getSchedule().getProgramId(), e});
            }
            this.jobQueue.deleteJob(job);
            return true;
        }

        private ConstraintResult.SatisfiedState constraintsSatisfied(Job job, long j) {
            ConstraintResult.SatisfiedState satisfiedState = ConstraintResult.SatisfiedState.SATISFIED;
            ConstraintContext constraintContext = new ConstraintContext(job, j, ConstraintCheckerService.this.store);
            for (Constraint constraint : job.getSchedule().getConstraints()) {
                if (!(constraint instanceof CheckableConstraint)) {
                    throw new IllegalArgumentException("Implementation of Constraint in ProgramSchedule must implement CheckableConstraint");
                }
                ConstraintResult check = ((CheckableConstraint) constraint).check(job.getSchedule(), constraintContext);
                if (check.getSatisfiedState() == ConstraintResult.NEVER_SATISFIED.getSatisfiedState()) {
                    return ConstraintResult.NEVER_SATISFIED.getSatisfiedState();
                }
                if (check.getSatisfiedState() == ConstraintResult.SatisfiedState.NOT_SATISFIED) {
                    satisfiedState = ConstraintResult.SatisfiedState.NOT_SATISFIED;
                }
            }
            return satisfiedState;
        }
    }

    @Inject
    ConstraintCheckerService(Store store, ProgramLifecycleService programLifecycleService, PropertiesResolver propertiesResolver, NamespaceQueryAdmin namespaceQueryAdmin, CConfiguration cConfiguration, DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient) {
        this.store = store;
        this.lifecycleService = programLifecycleService;
        this.propertiesResolver = propertiesResolver;
        this.namespaceQueryAdmin = namespaceQueryAdmin;
        this.cConf = cConfiguration;
        this.multiThreadDatasetCache = new MultiThreadDatasetCache(new SystemDatasetInstantiator(datasetFramework), transactionSystemClient, NamespaceId.SYSTEM, ImmutableMap.of(), (MetricsContext) null, (Map) null, new MultiThreadTransactionAware[0]);
        this.transactional = Transactions.createTransactionalWithRetry(Transactions.createTransactional(this.multiThreadDatasetCache), org.apache.tephra.RetryStrategies.retryOnConflict(20, 100L));
        this.datasetFramework = datasetFramework;
    }

    protected void startUp() throws Exception {
        LOG.info("Starting ConstraintCheckerService.");
        this.taskExecutorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("constraint-checker-task").build()));
        this.taskRunner = new ScheduleTaskRunner(this.store, this.lifecycleService, this.propertiesResolver, this.taskExecutorService, this.namespaceQueryAdmin, this.cConf);
        int numPartitions = Schedulers.getJobQueue(this.multiThreadDatasetCache, this.datasetFramework).getNumPartitions();
        for (int i = 0; i < numPartitions; i++) {
            this.taskExecutorService.submit(new ConstraintCheckerThread(i));
        }
        LOG.info("Started ConstraintCheckerService. state: " + state());
    }

    protected void shutDown() throws Exception {
        this.stopping = true;
        LOG.info("Stopping ConstraintCheckerService.");
        try {
            try {
                this.taskExecutorService.awaitTermination(5L, TimeUnit.SECONDS);
                if (!this.taskExecutorService.isTerminated()) {
                    this.taskExecutorService.shutdownNow();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                if (!this.taskExecutorService.isTerminated()) {
                    this.taskExecutorService.shutdownNow();
                }
            }
            LOG.info("Stopped ConstraintCheckerService.");
        } catch (Throwable th) {
            if (!this.taskExecutorService.isTerminated()) {
                this.taskExecutorService.shutdownNow();
            }
            throw th;
        }
    }
}
