package org.apache.gobblin.service.modules.orchestration;

import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.ServiceMetricNames;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
/* loaded from: input_file:org/apache/gobblin/service/modules/orchestration/DagManager.class */
public class DagManager extends AbstractIdleService {
    private static final Logger log = LoggerFactory.getLogger(DagManager.class);
    public static final String DEFAULT_FLOW_FAILURE_OPTION = FailureOption.FINISH_ALL_POSSIBLE.name();
    private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
    private static final Integer DEFAULT_NUM_THREADS = 3;
    private static final Integer TERMINATION_TIMEOUT = 30;
    private static final String DAG_MANAGER_PREFIX = "gobblin.service.dagManager.";
    private static final String NUM_THREADS_KEY = "gobblin.service.dagManager.numThreads";
    private static final String JOB_STATUS_POLLING_INTERVAL_KEY = "gobblin.service.dagManager.pollingInterval";
    private static final String JOB_STATUS_RETRIEVER_KEY = "gobblin.service.dagManager.jobStatusRetriever";
    private static final String DAG_STORE_CLASS_KEY = "gobblin.service.dagManager.dagStateStoreClass";
    static final String DAG_STATESTORE_DIR = "gobblin.service.dagManager.dagStateStoreDir";
    private BlockingQueue<Dag<JobExecutionPlan>> queue;
    private ScheduledExecutorService scheduledExecutorPool;
    private boolean instrumentationEnabled;
    private final Integer numThreads;
    private final Integer pollingInterval;
    private final JobStatusRetriever jobStatusRetriever;
    private final DagStateStore dagStateStore;
    private volatile boolean isActive;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.gobblin.service.modules.orchestration.DagManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/gobblin/service/modules/orchestration/DagManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$gobblin$service$ExecutionStatus = new int[ExecutionStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$gobblin$service$ExecutionStatus[ExecutionStatus.COMPLETE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$gobblin$service$ExecutionStatus[ExecutionStatus.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$gobblin$service$ExecutionStatus[ExecutionStatus.CANCELLED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/apache/gobblin/service/modules/orchestration/DagManager$DagManagerThread.class */
    public static class DagManagerThread implements Runnable {
        private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap();
        private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap();
        private final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new HashMap();
        private final Set<String> failedDagIdsFinishRunning = new HashSet();
        private final Set<String> failedDagIdsFinishAllPossible = new HashSet();
        private final MetricContext metricContext;
        private final Optional<EventSubmitter> eventSubmitter;
        private final Optional<Timer> jobStatusPolledTimer;
        private JobStatusRetriever jobStatusRetriever;
        private DagStateStore dagStateStore;
        private BlockingQueue<Dag<JobExecutionPlan>> queue;

        DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, BlockingQueue<Dag<JobExecutionPlan>> blockingQueue, boolean z) {
            this.jobStatusRetriever = jobStatusRetriever;
            this.dagStateStore = dagStateStore;
            this.queue = blockingQueue;
            if (z) {
                this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
                this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build());
                this.jobStatusPolledTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
            } else {
                this.metricContext = null;
                this.eventSubmitter = Optional.absent();
                this.jobStatusPolledTimer = Optional.absent();
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Dag<JobExecutionPlan> poll = this.queue.poll();
                if (poll != null) {
                    Dag<JobExecutionPlan> dag = poll;
                    if (dag.isEmpty()) {
                        DagManager.log.info("Empty dag; ignoring the dag");
                    }
                    initialize(dag);
                }
                DagManager.log.info("Polling job statuses..");
                pollJobStatuses();
                DagManager.log.info("Poll done.");
                DagManager.log.info("Cleaning up finished dags..");
                cleanUp();
                DagManager.log.info("Clean up done");
            } catch (Exception e) {
                DagManager.log.error("Exception encountered in {}", getClass().getName(), e);
            }
        }

        private void initialize(Dag<JobExecutionPlan> dag) throws IOException {
            String generateDagId = DagManagerUtils.generateDagId(dag);
            DagManager.log.info("Initializing Dag {}", generateDagId);
            if (this.dags.containsKey(generateDagId)) {
                DagManager.log.warn("Already tracking a dag with dagId {}, skipping.", generateDagId);
                return;
            }
            this.dags.put(generateDagId, dag);
            DagManager.log.info("Dag {} - determining if any jobs are already running.", generateDagId);
            for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
                if (DagManagerUtils.getExecutionStatus(dagNode) == ExecutionStatus.RUNNING) {
                    addJobState(generateDagId, dagNode);
                }
            }
            DagManager.log.info("Dag {} submitting jobs ready for execution.", generateDagId);
            submitNext(generateDagId);
            DagManager.log.info("Dag {} Initialization complete.", generateDagId);
        }

        private void pollJobStatuses() throws IOException {
            this.failedDagIdsFinishRunning.clear();
            for (Dag.DagNode<JobExecutionPlan> dagNode : this.jobToDag.keySet()) {
                long nanoTime = System.nanoTime();
                JobStatus pollJobStatus = pollJobStatus(dagNode);
                Instrumented.updateTimer(this.jobStatusPolledTimer, System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                Preconditions.checkNotNull(pollJobStatus, "Received null job status for a running job " + DagManagerUtils.getJobName(dagNode));
                JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(dagNode);
                switch (AnonymousClass1.$SwitchMap$org$apache$gobblin$service$ExecutionStatus[ExecutionStatus.valueOf(pollJobStatus.getEventName()).ordinal()]) {
                    case 1:
                        jobExecutionPlan.setExecutionStatus(ExecutionStatus.COMPLETE);
                        onJobFinish(dagNode);
                        break;
                    case 2:
                    case 3:
                        jobExecutionPlan.setExecutionStatus(ExecutionStatus.FAILED);
                        onJobFinish(dagNode);
                        break;
                    default:
                        jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
                        break;
                }
            }
        }

        private JobStatus pollJobStatus(Dag.DagNode<JobExecutionPlan> dagNode) {
            Config config = dagNode.getValue().getJobSpec().getConfig();
            Iterator jobStatusesForFlowExecution = this.jobStatusRetriever.getJobStatusesForFlowExecution(config.getString("flow.group"), config.getString("flow.name"), config.getLong("flow.executionId"), config.getString("job.group"), config.getString("job.name"));
            if (jobStatusesForFlowExecution.hasNext()) {
                return (JobStatus) jobStatusesForFlowExecution.next();
            }
            return null;
        }

        void submitNext(String str) throws IOException {
            Dag<JobExecutionPlan> dag = this.dags.get(str);
            for (Dag.DagNode<JobExecutionPlan> dagNode : DagManagerUtils.getNext(dag)) {
                submitJob(dagNode);
                addJobState(str, dagNode);
            }
            this.dagStateStore.writeCheckpoint(dag);
        }

        private void submitJob(Dag.DagNode<JobExecutionPlan> dagNode) {
            JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(dagNode);
            jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
            JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
            SpecProducer specProducer = null;
            try {
                specProducer = DagManagerUtils.getSpecProducer(dagNode);
                if (!DagManagerUtils.getJobConfig(dagNode).hasPath("flow.executionId")) {
                    DagManager.log.warn("JobSpec does not contain flowExecutionId.");
                }
                DagManager.log.info("Submitting job: {} on executor: {}", jobSpec, specProducer);
                Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
                DagManager.log.info("Going to orchestrate JobSpec: {} on Executor: {}", jobSpec, specProducer);
                TimingEvent timingEvent = this.eventSubmitter.isPresent() ? ((EventSubmitter) this.eventSubmitter.get()).getTimingEvent("JobOrchestrated") : null;
                specProducer.addSpec(jobSpec);
                if (timingEvent != null) {
                    timingEvent.stop(jobMetadata);
                }
                DagManager.log.info("Orchestrated JobSpec: {} on Executor: {}", jobSpec, specProducer);
            } catch (Exception e) {
                DagManager.log.error("Cannot submit job: {} on executor: {}", new Object[]{jobSpec, specProducer, e});
            }
        }

        private void onJobFinish(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
            Dag<JobExecutionPlan> dag = this.jobToDag.get(dagNode);
            String generateDagId = DagManagerUtils.generateDagId(dag);
            String jobName = DagManagerUtils.getJobName(dagNode);
            ExecutionStatus executionStatus = DagManagerUtils.getExecutionStatus(dagNode);
            DagManager.log.info("Job {} of Dag {} has finished with status {}", new Object[]{jobName, generateDagId, executionStatus.name()});
            deleteJobState(generateDagId, dagNode);
            if (executionStatus == ExecutionStatus.COMPLETE) {
                submitNext(generateDagId);
            } else if (executionStatus == ExecutionStatus.FAILED) {
                if (DagManagerUtils.getFailureOption(dag) == FailureOption.FINISH_RUNNING) {
                    this.failedDagIdsFinishRunning.add(generateDagId);
                } else {
                    this.failedDagIdsFinishAllPossible.add(generateDagId);
                }
            }
        }

        private void deleteJobState(String str, Dag.DagNode<JobExecutionPlan> dagNode) {
            this.jobToDag.remove(dagNode);
            this.dagToJobs.get(str).remove(dagNode);
        }

        private void addJobState(String str, Dag.DagNode<JobExecutionPlan> dagNode) {
            this.jobToDag.put(dagNode, this.dags.get(str));
            if (this.dagToJobs.containsKey(str)) {
                this.dagToJobs.get(str).add(dagNode);
                return;
            }
            LinkedList<Dag.DagNode<JobExecutionPlan>> newLinkedList = Lists.newLinkedList();
            newLinkedList.add(dagNode);
            this.dagToJobs.put(str, newLinkedList);
        }

        private boolean hasRunningJobs(String str) {
            return !this.dagToJobs.get(str).isEmpty();
        }

        private void cleanUp() {
            for (String str : this.failedDagIdsFinishRunning) {
                LinkedList<Dag.DagNode<JobExecutionPlan>> linkedList = this.dagToJobs.get(str);
                while (!linkedList.isEmpty()) {
                    deleteJobState(str, linkedList.poll());
                }
                DagManager.log.info("Dag {} has finished with status FAILED; Cleaning up dag from the state store.", str);
                cleanUpDag(str);
            }
            for (String str2 : this.dags.keySet()) {
                if (!hasRunningJobs(str2)) {
                    Object obj = "COMPLETE";
                    if (this.failedDagIdsFinishAllPossible.contains(str2)) {
                        obj = "FAILED";
                        this.failedDagIdsFinishAllPossible.remove(str2);
                    }
                    DagManager.log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", str2, obj);
                    cleanUpDag(str2);
                }
            }
        }

        private void cleanUpDag(String str) {
            Dag<JobExecutionPlan> dag = this.dags.get(str);
            this.dagToJobs.remove(str);
            this.dags.remove(str);
            this.dagStateStore.cleanUp(dag);
        }
    }

    /* loaded from: input_file:org/apache/gobblin/service/modules/orchestration/DagManager$FailureOption.class */
    public enum FailureOption {
        FINISH_RUNNING("FINISH_RUNNING"),
        CANCEL("CANCEL"),
        FINISH_ALL_POSSIBLE("FINISH_ALL_POSSIBLE");

        private final String failureOption;

        FailureOption(String str) {
            this.failureOption = str;
        }

        @Override // java.lang.Enum
        public String toString() {
            return this.failureOption;
        }
    }

    public DagManager(Config config, boolean z) {
        this.isActive = false;
        this.queue = new LinkedBlockingDeque();
        this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, DEFAULT_NUM_THREADS);
        this.scheduledExecutorPool = Executors.newScheduledThreadPool(this.numThreads.intValue());
        this.pollingInterval = ConfigUtils.getInt(config, JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
        this.instrumentationEnabled = z;
        try {
            this.jobStatusRetriever = (JobStatusRetriever) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(config.getString(JOB_STATUS_RETRIEVER_KEY)), new Object[]{config});
            this.dagStateStore = (DagStateStore) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(config.getString(DAG_STORE_CLASS_KEY)), new Object[]{config});
        } catch (ReflectiveOperationException e) {
            throw new RuntimeException("Exception encountered during DagManager initialization", e);
        }
    }

    public DagManager(Config config) {
        this(config, true);
    }

    protected void startUp() {
        for (int i = 0; i < this.numThreads.intValue(); i++) {
            this.scheduledExecutorPool.scheduleAtFixedRate(new DagManagerThread(this.jobStatusRetriever, this.dagStateStore, this.queue, this.instrumentationEnabled), 0L, this.pollingInterval.intValue(), TimeUnit.SECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void offer(Dag<JobExecutionPlan> dag) throws IOException {
        this.dagStateStore.writeCheckpoint(dag);
        if (!this.queue.offer(dag)) {
            throw new IOException("Could not add dag" + DagManagerUtils.generateDagId(dag) + "to queue");
        }
    }

    public synchronized void setActive(boolean z) {
        this.isActive = z;
        try {
            if (this.isActive) {
                Iterator<Dag<JobExecutionPlan>> it = this.dagStateStore.getDags().iterator();
                while (it.hasNext()) {
                    offer(it.next());
                }
            }
        } catch (IOException e) {
            throw new RuntimeException("Exception encountered when activating the new DagManager", e);
        }
    }

    protected void shutDown() throws Exception {
        this.scheduledExecutorPool.shutdown();
        this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT.intValue(), TimeUnit.SECONDS);
    }
}
