package co.cask.cdap.internal.app.services;

import co.cask.cdap.app.runtime.ProgramRuntimeService;
import co.cask.cdap.app.runtime.ProgramStateWriter;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.namespace.NamespaceAdmin;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.internal.app.runtime.LocalDatasetDeleterRunnable;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.store.RunRecordMeta;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.ProgramRunId;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.util.concurrent.AbstractIdleService;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/services/RunRecordCorrectorService.class */
public abstract class RunRecordCorrectorService extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(RunRecordCorrectorService.class);
    private static final Set<ProgramRunStatus> NOT_STOPPED_STATUSES = EnumSet.of(ProgramRunStatus.STARTING, ProgramRunStatus.RUNNING, ProgramRunStatus.SUSPENDED);
    private final Store store;
    private final ProgramStateWriter programStateWriter;
    private final ProgramRuntimeService runtimeService;
    private final long startTimeoutSecs;
    private final int txBatchSize;
    private final CConfiguration cConf;
    private final NamespaceAdmin namespaceAdmin;
    private final DatasetFramework datasetFramework;
    private ScheduledExecutorService localDatasetDeleterService;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RunRecordCorrectorService(CConfiguration cConfiguration, Store store, ProgramStateWriter programStateWriter, ProgramRuntimeService programRuntimeService, NamespaceAdmin namespaceAdmin, DatasetFramework datasetFramework) {
        this(cConfiguration, store, programStateWriter, programRuntimeService, namespaceAdmin, datasetFramework, 2 * cConfiguration.getLong("app.program.max.start.seconds"), cConfiguration.getInt("app.program.runid.corrector.tx.batch.size"));
    }

    @VisibleForTesting
    RunRecordCorrectorService(CConfiguration cConfiguration, Store store, ProgramStateWriter programStateWriter, ProgramRuntimeService programRuntimeService, NamespaceAdmin namespaceAdmin, DatasetFramework datasetFramework, long j, int i) {
        this.store = store;
        this.programStateWriter = programStateWriter;
        this.runtimeService = programRuntimeService;
        this.cConf = cConfiguration;
        this.namespaceAdmin = namespaceAdmin;
        this.datasetFramework = datasetFramework;
        this.startTimeoutSecs = j;
        this.txBatchSize = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void fixRunRecords() {
        Set<ProgramRunId> doFixRunRecords = doFixRunRecords();
        if (doFixRunRecords.isEmpty()) {
            return;
        }
        LOG.info("Corrected {} run records with status in {} that have no actual running program. Such programs likely have crashed or were killed by external signal.", Integer.valueOf(doFixRunRecords.size()), NOT_STOPPED_STATUSES);
    }

    private Set<ProgramRunId> doFixRunRecords() {
        LOG.trace("Start getting run records not actually running ...");
        HashSet hashSet = new HashSet();
        Predicate<RunRecordMeta> createFilter = createFilter(hashSet);
        for (ProgramRunStatus programRunStatus : NOT_STOPPED_STATUSES) {
            long j = 0;
            while (true) {
                Map<ProgramRunId, RunRecordMeta> runs = this.store.getRuns(programRunStatus, j, Long.MAX_VALUE, this.txBatchSize, createFilter);
                LOG.trace("{} run records in {} state but are not actually running", Integer.valueOf(runs.size()));
                if (runs.isEmpty()) {
                    break;
                }
                for (RunRecordMeta runRecordMeta : runs.values()) {
                    j = Math.max(j, RunIds.getTime(runRecordMeta.getPid(), TimeUnit.SECONDS));
                    ProgramRunId programRunId = runRecordMeta.getProgramRunId();
                    if (!hashSet.contains(programRunId)) {
                        String format = String.format("Fixed RunRecord for program run %s in %s state because it is actually not running", programRunId, runRecordMeta.getStatus());
                        this.programStateWriter.error(programRunId, new ProgramRunAbortedException(format));
                        hashSet.add(programRunId);
                        LOG.warn(format);
                    }
                }
            }
        }
        if (hashSet.isEmpty()) {
            LOG.trace("No RunRecord found with status in {}, but the program are not actually running", NOT_STOPPED_STATUSES);
        } else {
            LOG.warn("Fixed {} RunRecords with status in {}, but the programs are not actually running", Integer.valueOf(hashSet.size()), NOT_STOPPED_STATUSES);
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startUp() throws Exception {
        LOG.info("Starting RunRecordCorrectorService");
        this.localDatasetDeleterService = Executors.newScheduledThreadPool(1);
        long j = this.cConf.getLong("app.program.local.dataset.deleter.interval");
        if (j <= 0) {
            LOG.warn("Invalid interval specified for the local dataset deleter {}. Setting it to 3600 seconds.", Long.valueOf(j));
            j = 3600;
        }
        long j2 = this.cConf.getLong("app.program.local.dataset.deleter.initial.delay");
        if (j2 <= 0) {
            LOG.warn("Invalid initial delay specified for the local dataset deleter {}. Setting it to 300 seconds.", Long.valueOf(j2));
            j2 = 300;
        }
        this.localDatasetDeleterService.scheduleWithFixedDelay(new LocalDatasetDeleterRunnable(this.namespaceAdmin, this.store, this.datasetFramework), j2, j, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void shutDown() throws Exception {
        LOG.info("Stopping RunRecordCorrectorService");
        this.localDatasetDeleterService.shutdown();
        try {
            if (!this.localDatasetDeleterService.awaitTermination(5L, TimeUnit.SECONDS)) {
                this.localDatasetDeleterService.shutdownNow();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private Predicate<RunRecordMeta> createFilter(final Set<ProgramRunId> set) {
        final long seconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
        return new Predicate<RunRecordMeta>() { // from class: co.cask.cdap.internal.app.services.RunRecordCorrectorService.1
            public boolean apply(RunRecordMeta runRecordMeta) {
                ProgramRunId programRunId = runRecordMeta.getProgramRunId();
                if (set.contains(programRunId)) {
                    return false;
                }
                long startTs = seconds - runRecordMeta.getStartTs();
                ProgramId parent = programRunId.getParent();
                Map systemArgs = runRecordMeta.getSystemArgs();
                String str = systemArgs == null ? null : (String) systemArgs.get(ProgramOptionConstants.WORKFLOW_NAME);
                String str2 = systemArgs == null ? null : (String) systemArgs.get(ProgramOptionConstants.WORKFLOW_RUN_ID);
                if (str != null && str2 != null) {
                    if (RunRecordCorrectorService.this.runtimeService.list(parent.getParent().program(ProgramType.WORKFLOW, str)).containsKey(RunIds.fromString(str2))) {
                        return false;
                    }
                }
                return startTs > RunRecordCorrectorService.this.startTimeoutSecs && !RunRecordCorrectorService.this.runtimeService.list(parent).containsKey(RunIds.fromString(programRunId.getRun()));
            }
        };
    }
}
