package co.cask.cdap.internal.app.services;

import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramRuntimeService;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.ApplicationNotFoundException;
import co.cask.cdap.common.ProgramNotFoundException;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.internal.app.runtime.AbstractListener;
import co.cask.cdap.internal.app.runtime.BasicArguments;
import co.cask.cdap.internal.app.runtime.SimpleProgramOptions;
import co.cask.cdap.internal.app.store.RunRecordMeta;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.ProgramType;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.twill.api.RunId;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/services/ProgramLifecycleService.class */
public class ProgramLifecycleService extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(ProgramLifecycleService.class);
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    private final Store store;
    private final ProgramRuntimeService runtimeService;
    private final CConfiguration configuration;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: co.cask.cdap.internal.app.services.ProgramLifecycleService$4, reason: invalid class name */
    /* loaded from: input_file:co/cask/cdap/internal/app/services/ProgramLifecycleService$4.class */
    public static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$co$cask$cdap$proto$ProgramType = new int[ProgramType.values().length];

        static {
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramType[ProgramType.FLOW.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramType[ProgramType.MAPREDUCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramType[ProgramType.SPARK.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramType[ProgramType.SERVICE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramType[ProgramType.WORKER.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramType[ProgramType.WORKFLOW.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/internal/app/services/ProgramLifecycleService$RunRecordsCorrectorRunnable.class */
    public static class RunRecordsCorrectorRunnable implements Runnable {
        private static final Logger LOG = LoggerFactory.getLogger(RunRecordsCorrectorRunnable.class);
        private final ProgramLifecycleService programLifecycleService;

        public RunRecordsCorrectorRunnable(ProgramLifecycleService programLifecycleService) {
            this.programLifecycleService = programLifecycleService;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                LOG.debug("Start correcting invalid run records ...");
                this.programLifecycleService.validateAndCorrectRunningRunRecords();
                LOG.debug("End correcting invalid run records.");
            } catch (Throwable th) {
                LOG.warn("Unable to complete correcting run records: {}", Throwables.getRootCause(th).getMessage());
                LOG.debug("Exception thrown when running run id cleaner.", th);
            }
        }
    }

    @Inject
    public ProgramLifecycleService(Store store, ProgramRuntimeService programRuntimeService, CConfiguration cConfiguration) {
        this.store = store;
        this.runtimeService = programRuntimeService;
        this.configuration = cConfiguration;
    }

    protected void startUp() throws Exception {
        LOG.info("Starting ProgramLifecycleService");
        long j = this.configuration.getLong("app.program.runid.corrector.interval");
        if (j <= 0) {
            LOG.debug("Invalid run id corrector interval {}. Setting it to 180 seconds.", Long.valueOf(j));
            j = 180;
        }
        this.scheduledExecutorService.scheduleWithFixedDelay(new RunRecordsCorrectorRunnable(this), 2L, j, TimeUnit.SECONDS);
    }

    protected void shutDown() throws Exception {
        LOG.info("Shutting down ProgramLifecycleService");
        this.scheduledExecutorService.shutdown();
        try {
            if (!this.scheduledExecutorService.awaitTermination(5L, TimeUnit.SECONDS)) {
                this.scheduledExecutorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private Program getProgram(Id.Program program) throws IOException, ApplicationNotFoundException, ProgramNotFoundException {
        Program loadProgram = this.store.loadProgram(program);
        if (loadProgram == null) {
            throw new ProgramNotFoundException(program);
        }
        return loadProgram;
    }

    public ProgramRuntimeService.RuntimeInfo start(final Id.Program program, Map<String, String> map, final Map<String, String> map2, boolean z) throws IOException, ProgramNotFoundException, ApplicationNotFoundException {
        ProgramRuntimeService.RuntimeInfo run = this.runtimeService.run(getProgram(program), new SimpleProgramOptions(program.getId(), new BasicArguments(map), new BasicArguments(map2), z));
        final ProgramController controller = run.getController();
        final String id = controller.getRunId().getId();
        final String id2 = run.getTwillRunId() == null ? null : run.getTwillRunId().getId();
        if (program.getType() != ProgramType.MAPREDUCE && program.getType() != ProgramType.SPARK) {
            controller.addListener(new AbstractListener() { // from class: co.cask.cdap.internal.app.services.ProgramLifecycleService.1
                @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
                public void init(ProgramController.State state, @Nullable Throwable th) {
                    long time = RunIds.getTime(controller.getRunId(), TimeUnit.SECONDS);
                    if (time == -1) {
                        time = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
                    }
                    ProgramLifecycleService.this.store.setStart(program, id, time, id2, map2);
                    if (state == ProgramController.State.COMPLETED) {
                        completed();
                    }
                    if (state == ProgramController.State.ERROR) {
                        error(controller.getFailureCause());
                    }
                }

                @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
                public void completed() {
                    ProgramLifecycleService.LOG.debug("Program {} {} {} completed successfully.", new Object[]{program.getNamespaceId(), program.getApplicationId(), program.getId()});
                    ProgramLifecycleService.this.store.setStop(program, id, TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()), ProgramController.State.COMPLETED.getRunStatus());
                }

                @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
                public void killed() {
                    ProgramLifecycleService.LOG.debug("Program {} {} {} killed.", new Object[]{program.getNamespaceId(), program.getApplicationId(), program.getId()});
                    ProgramLifecycleService.this.store.setStop(program, id, TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()), ProgramController.State.KILLED.getRunStatus());
                }

                @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
                public void suspended() {
                    ProgramLifecycleService.LOG.debug("Suspending Program {} {} {} {}.", new Object[]{program.getNamespaceId(), program.getApplicationId(), program, id});
                    ProgramLifecycleService.this.store.setSuspend(program, id);
                }

                @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
                public void resuming() {
                    ProgramLifecycleService.LOG.debug("Resuming Program {} {} {} {}.", new Object[]{program.getNamespaceId(), program.getApplicationId(), program, id});
                    ProgramLifecycleService.this.store.setResume(program, id);
                }

                @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
                public void error(Throwable th) {
                    ProgramLifecycleService.LOG.info("Program stopped with error {}, {}", new Object[]{program, id, th});
                    ProgramLifecycleService.this.store.setStop(program, id, TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()), ProgramController.State.ERROR.getRunStatus());
                }
            }, Threads.SAME_THREAD_EXECUTOR);
        }
        return run;
    }

    public void stopProgram(Id.Program program, RunId runId) throws ExecutionException, InterruptedException {
        ProgramRuntimeService.RuntimeInfo lookup = this.runtimeService.lookup(program, runId);
        if (lookup != null) {
            lookup.getController().stop().get();
        } else {
            LOG.warn("RunTimeInfo not found for Program {} RunId {} to be stopped", program, runId);
        }
    }

    public ProgramRuntimeService.RuntimeInfo findRuntimeInfo(Id.Program program, ProgramType programType) {
        for (ProgramRuntimeService.RuntimeInfo runtimeInfo : this.runtimeService.list(programType).values()) {
            if (program.equals(runtimeInfo.getProgramId())) {
                return runtimeInfo;
            }
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void validateAndCorrectRunningRunRecords() {
        HashSet newHashSet = Sets.newHashSet();
        for (ProgramType programType : ProgramType.values()) {
            validateAndCorrectRunningRunRecords(programType, newHashSet);
        }
        if (newHashSet.isEmpty()) {
            return;
        }
        LOG.info("Corrected {} of run records with RUNNING status but no actual program running.", Integer.valueOf(newHashSet.size()));
    }

    void validateAndCorrectRunningRunRecords(final ProgramType programType, Set<String> set) {
        final Map<RunId, ProgramRuntimeService.RuntimeInfo> list = this.runtimeService.list(programType);
        LOG.trace("Start getting run records not actually running ...");
        List<RunRecordMeta> runs = this.store.getRuns(ProgramRunStatus.RUNNING, new Predicate<RunRecordMeta>() { // from class: co.cask.cdap.internal.app.services.ProgramLifecycleService.2
            public boolean apply(RunRecordMeta runRecordMeta) {
                return !list.containsKey(RunIds.fromString(runRecordMeta.getPid()));
            }
        });
        LOG.trace("End getting {} run records not actually running.", Integer.valueOf(runs.size()));
        final HashMap hashMap = new HashMap();
        LOG.trace("Start getting invalid run records  ...");
        Collection<RunRecordMeta> filter = Collections2.filter(runs, new Predicate<RunRecordMeta>() { // from class: co.cask.cdap.internal.app.services.ProgramLifecycleService.3
            public boolean apply(RunRecordMeta runRecordMeta) {
                String pid = runRecordMeta.getPid();
                Id.Program retrieveProgramIdForRunRecord = ProgramLifecycleService.this.retrieveProgramIdForRunRecord(programType, pid);
                if (retrieveProgramIdForRunRecord == null) {
                    return false;
                }
                hashMap.put(pid, retrieveProgramIdForRunRecord);
                return true;
            }
        });
        LOG.trace("End getting invalid run records.");
        if (filter.isEmpty()) {
            LOG.trace("No RunRecords found with RUNNING status but the program is not actually running for program type {}", programType.getPrettyName());
        } else {
            LOG.warn("Found {} RunRecords with RUNNING status but the program is not actually running for program type {}", Integer.valueOf(filter.size()), programType.getPrettyName());
        }
        for (RunRecordMeta runRecordMeta : filter) {
            if (shouldCorrectForWorkflowChildren(runRecordMeta, set)) {
                String pid = runRecordMeta.getPid();
                Id.Program program = (Id.Program) hashMap.get(pid);
                LOG.warn("Fixing RunRecord {} in program {} of type {} with RUNNING status but the program is not running", new Object[]{pid, program, programType.getPrettyName()});
                this.store.compareAndSetStatus(program, pid, ProgramController.State.ALIVE.getRunStatus(), ProgramController.State.ERROR.getRunStatus());
                set.add(pid);
            } else {
                LOG.trace("Will not correct invalid run record {} since it's parent workflow still running.", runRecordMeta);
            }
        }
    }

    private boolean shouldCorrectForWorkflowChildren(RunRecordMeta runRecordMeta, Set<String> set) {
        Id.Program retrieveProgramIdForRunRecord;
        if (runRecordMeta.getProperties() == null || runRecordMeta.getProperties().get("workflowrunid") == null) {
            return true;
        }
        String str = (String) runRecordMeta.getProperties().get("workflowrunid");
        if (set.contains(str) || (retrieveProgramIdForRunRecord = retrieveProgramIdForRunRecord(ProgramType.WORKFLOW, str)) == null) {
            return true;
        }
        RunRecordMeta run = this.store.getRun(retrieveProgramIdForRunRecord, str);
        return run == null || run.getStatus() != ProgramRunStatus.RUNNING || this.runtimeService.lookup(retrieveProgramIdForRunRecord, RunIds.fromString(str)) == null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Removed duplicated region for block: B:21:0x02aa A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:25:? A[LOOP:0: B:2:0x0015->B:25:?, LOOP_END, SYNTHETIC] */
    @javax.annotation.Nullable
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public co.cask.cdap.proto.Id.Program retrieveProgramIdForRunRecord(co.cask.cdap.proto.ProgramType r8, java.lang.String r9) {
        /*
            Method dump skipped, instructions count: 685
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: co.cask.cdap.internal.app.services.ProgramLifecycleService.retrieveProgramIdForRunRecord(co.cask.cdap.proto.ProgramType, java.lang.String):co.cask.cdap.proto.Id$Program");
    }

    @Nullable
    private Id.Program validateProgramForRunRecord(String str, String str2, ProgramType programType, String str3, String str4) {
        Id.Program from = Id.Program.from(str, str2, programType, str3);
        if (this.store.getRun(from, str4) != null) {
            return from;
        }
        return null;
    }
}
