package org.apache.gobblin.service.modules.orchestration;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.RequesterService;
import org.apache.gobblin.service.ServiceRequester;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
import org.apache.gobblin.service.monitoring.KillFlowEvent;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
/* loaded from: input_file:org/apache/gobblin/service/modules/orchestration/DagManager.class */
public class DagManager extends AbstractIdleService {
    public static final String DAG_MANAGER_PREFIX = "gobblin.service.dagManager.";
    private static final String JOB_STATUS_RETRIEVER_KEY = "gobblin.service.dagManager.jobStatusRetriever";
    public static final String NUM_THREADS_KEY = "gobblin.service.dagManager.numThreads";
    public static final String JOB_STATUS_POLLING_INTERVAL_KEY = "gobblin.service.dagManager.pollingInterval";
    private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = "gobblin.service.dagManager.jobStatusRetriever.class";
    private static final String DAG_STATESTORE_CLASS_KEY = "gobblin.service.dagManager.dagStateStoreClass";
    private static final String USER_JOB_QUOTA_KEY = "gobblin.service.dagManager.defaultJobQuota";
    private static final String PER_USER_QUOTA = "gobblin.service.dagManager.perUserQuota";
    private static final String QUOTA_SEPERATOR = ":";
    private BlockingQueue<Dag<JobExecutionPlan>>[] queue;
    private BlockingQueue<String>[] cancelQueue;
    DagManagerThread[] dagManagerThreads;
    private ScheduledExecutorService scheduledExecutorPool;
    private boolean instrumentationEnabled;
    private DagStateStore dagStateStore;
    private Map<URI, TopologySpec> topologySpecMap;
    private final Integer numThreads;
    private final Integer pollingInterval;
    private final JobStatusRetriever jobStatusRetriever;
    private final Config config;
    private final Optional<EventSubmitter> eventSubmitter;
    private final int defaultQuota;
    private final Map<String, Integer> perUserQuota;
    private volatile boolean isActive;
    private static final Logger log = LoggerFactory.getLogger(DagManager.class);
    public static final String DEFAULT_FLOW_FAILURE_OPTION = FailureOption.FINISH_ALL_POSSIBLE.name();
    private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
    public static final Integer DEFAULT_NUM_THREADS = 3;
    private static final Integer TERMINATION_TIMEOUT = 30;
    private static final String DEFAULT_JOB_STATUS_RETRIEVER_CLASS = FsJobStatusRetriever.class.getName();
    private static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.gobblin.service.modules.orchestration.DagManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/gobblin/service/modules/orchestration/DagManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$gobblin$service$ExecutionStatus = new int[ExecutionStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$gobblin$service$ExecutionStatus[ExecutionStatus.COMPLETE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$gobblin$service$ExecutionStatus[ExecutionStatus.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$gobblin$service$ExecutionStatus[ExecutionStatus.CANCELLED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$gobblin$service$ExecutionStatus[ExecutionStatus.PENDING.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$gobblin$service$ExecutionStatus[ExecutionStatus.PENDING_RETRY.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* loaded from: input_file:org/apache/gobblin/service/modules/orchestration/DagManager$DagManagerThread.class */
    public static class DagManagerThread implements Runnable {
        private static final Map<String, Integer> proxyUserToJobCount = new ConcurrentHashMap();
        private static final Map<String, Integer> requesterToJobCount = new ConcurrentHashMap();
        private final MetricContext metricContext;
        private final Optional<EventSubmitter> eventSubmitter;
        private final Optional<Timer> jobStatusPolledTimer;
        private final int defaultQuota;
        private final Map<String, Integer> perUserQuota;
        private JobStatusRetriever jobStatusRetriever;
        private DagStateStore dagStateStore;
        private BlockingQueue<Dag<JobExecutionPlan>> queue;
        private BlockingQueue<String> cancelQueue;
        private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap();
        private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap();
        final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new HashMap();
        final Map<String, Long> dagToSLA = new HashMap();
        private final Set<String> failedDagIdsFinishRunning = new HashSet();
        private final Set<String> failedDagIdsFinishAllPossible = new HashSet();
        private final AtomicLong orchestrationDelay = new AtomicLong(0);

        DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, BlockingQueue<Dag<JobExecutionPlan>> blockingQueue, BlockingQueue<String> blockingQueue2, boolean z, int i, Map<String, Integer> map) {
            this.jobStatusRetriever = jobStatusRetriever;
            this.dagStateStore = dagStateStore;
            this.queue = blockingQueue;
            this.cancelQueue = blockingQueue2;
            this.defaultQuota = i;
            this.perUserQuota = map;
            if (!z) {
                this.metricContext = null;
                this.eventSubmitter = Optional.absent();
                this.jobStatusPolledTimer = Optional.absent();
            } else {
                this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
                this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build());
                this.jobStatusPolledTimer = Optional.of(this.metricContext.timer("GobblinService.jobStatusPoll.time"));
                this.metricContext.register(this.metricContext.newContextAwareGauge("GobblinService.flowOrchestration.delay", () -> {
                    return Long.valueOf(this.orchestrationDelay.get());
                }));
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                String poll = this.cancelQueue.poll();
                if (poll != null) {
                    cancelDag(poll);
                }
                while (!this.queue.isEmpty()) {
                    Dag<JobExecutionPlan> poll2 = this.queue.poll();
                    if (poll2 != null) {
                        if (poll2.isEmpty()) {
                            DagManager.log.info("Empty dag; ignoring the dag");
                        }
                        initialize(poll2);
                    }
                }
                DagManager.log.debug("Polling job statuses..");
                pollAndAdvanceDag();
                DagManager.log.debug("Poll done.");
                DagManager.log.debug("Cleaning up finished dags..");
                cleanUp();
                DagManager.log.debug("Clean up done");
            } catch (Exception e) {
                DagManager.log.error("Exception encountered in {}", getClass().getName(), e);
            }
        }

        private void cancelDag(String str) throws ExecutionException, InterruptedException {
            DagManager.log.info("Cancel flow with DagId {}", str);
            if (!this.dagToJobs.containsKey(str)) {
                DagManager.log.warn("Did not find Dag with id {}, it might be already cancelled/finished.", str);
                return;
            }
            LinkedList<Dag.DagNode<JobExecutionPlan>> linkedList = this.dagToJobs.get(str);
            DagManager.log.info("Found {} DagNodes to cancel.", Integer.valueOf(linkedList.size()));
            Iterator<Dag.DagNode<JobExecutionPlan>> it = linkedList.iterator();
            while (it.hasNext()) {
                cancelDagNode(it.next());
            }
            this.dags.get(str).setFlowEvent("FlowCancelled");
            this.dags.get(str).setMessage("Flow killed by request");
        }

        private void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws ExecutionException, InterruptedException {
            Properties properties = new Properties();
            if (dagNode.getValue().getJobFuture().isPresent()) {
                properties.put("specProducer.serialized.future", DagManagerUtils.getSpecProducer(dagNode).serializeAddSpecResponse((Future) dagNode.getValue().getJobFuture().get()));
                sendCancellationEvent(dagNode.getValue());
            }
            DagManagerUtils.getSpecProducer(dagNode).deleteSpec((URI) null, properties);
        }

        private void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
            if (this.eventSubmitter.isPresent()) {
                ((EventSubmitter) this.eventSubmitter.get()).getTimingEvent("JobCancelTimer").stop(TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan));
                jobExecutionPlan.setExecutionStatus(ExecutionStatus.CANCELLED);
            }
        }

        private void initialize(Dag<JobExecutionPlan> dag) throws IOException {
            String generateDagId = DagManagerUtils.generateDagId(dag);
            DagManager.log.info("Initializing Dag {}", DagManagerUtils.getFullyQualifiedDagName(dag));
            if (this.dags.containsKey(generateDagId)) {
                DagManager.log.warn("Already tracking a dag with dagId {}, skipping.", generateDagId);
                return;
            }
            this.dags.put(generateDagId, dag);
            DagManager.log.debug("Dag {} - determining if any jobs are already running.", DagManagerUtils.getFullyQualifiedDagName(dag));
            boolean z = false;
            for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
                if (DagManagerUtils.getExecutionStatus(dagNode) == ExecutionStatus.RUNNING) {
                    addJobState(generateDagId, dagNode);
                    getRunningJobsCounter(dagNode).inc();
                    z = true;
                }
            }
            DagManager.log.debug("Dag {} submitting jobs ready for execution.", DagManagerUtils.getFullyQualifiedDagName(dag));
            Iterator<Dag.DagNode<JobExecutionPlan>> it = submitNext(generateDagId).get(generateDagId).iterator();
            while (it.hasNext()) {
                addJobState(generateDagId, it.next());
            }
            DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, "FlowRunning");
            if (!z) {
                this.orchestrationDelay.set(System.currentTimeMillis() - DagManagerUtils.getFlowExecId(dag));
            }
            DagManager.log.info("Dag {} Initialization complete.", DagManagerUtils.getFullyQualifiedDagName(dag));
        }

        private void pollAndAdvanceDag() throws IOException, ExecutionException, InterruptedException {
            this.failedDagIdsFinishRunning.clear();
            HashMap newHashMap = Maps.newHashMap();
            ArrayList<Dag.DagNode<JobExecutionPlan>> newArrayList = Lists.newArrayList();
            for (Dag.DagNode<JobExecutionPlan> dagNode : this.jobToDag.keySet()) {
                boolean slaKillIfNeeded = slaKillIfNeeded(dagNode);
                JobStatus pollJobStatus = pollJobStatus(dagNode);
                ExecutionStatus jobExecutionStatus = getJobExecutionStatus(slaKillIfNeeded, killJobIfOrphaned(dagNode, pollJobStatus), pollJobStatus);
                JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(dagNode);
                switch (AnonymousClass1.$SwitchMap$org$apache$gobblin$service$ExecutionStatus[jobExecutionStatus.ordinal()]) {
                    case 1:
                        jobExecutionPlan.setExecutionStatus(ExecutionStatus.COMPLETE);
                        newHashMap.putAll(onJobFinish(dagNode));
                        newArrayList.add(dagNode);
                        break;
                    case 2:
                        jobExecutionPlan.setExecutionStatus(ExecutionStatus.FAILED);
                        newHashMap.putAll(onJobFinish(dagNode));
                        newArrayList.add(dagNode);
                        break;
                    case 3:
                        jobExecutionPlan.setExecutionStatus(ExecutionStatus.CANCELLED);
                        newHashMap.putAll(onJobFinish(dagNode));
                        newArrayList.add(dagNode);
                        break;
                    case 4:
                        jobExecutionPlan.setExecutionStatus(ExecutionStatus.PENDING);
                        break;
                    case 5:
                        jobExecutionPlan.setExecutionStatus(ExecutionStatus.PENDING_RETRY);
                        break;
                    default:
                        jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
                        break;
                }
                if (pollJobStatus != null && pollJobStatus.isShouldRetry()) {
                    DagManager.log.info("Retrying job: {}, current attempts: {}, max attempts: {}", new Object[]{DagManagerUtils.getFullyQualifiedJobName(dagNode), Integer.valueOf(pollJobStatus.getCurrentAttempts()), Integer.valueOf(pollJobStatus.getMaxAttempts())});
                    submitJob(dagNode);
                }
            }
            for (Map.Entry entry : newHashMap.entrySet()) {
                String str = (String) entry.getKey();
                Iterator it = ((Set) entry.getValue()).iterator();
                while (it.hasNext()) {
                    addJobState(str, (Dag.DagNode) it.next());
                }
            }
            for (Dag.DagNode<JobExecutionPlan> dagNode2 : newArrayList) {
                deleteJobState(DagManagerUtils.generateDagId(dagNode2), dagNode2);
            }
        }

        private boolean killJobIfOrphaned(Dag.DagNode<JobExecutionPlan> dagNode, JobStatus jobStatus) throws ExecutionException, InterruptedException {
            if (jobStatus == null) {
                return false;
            }
            ExecutionStatus valueOf = ExecutionStatus.valueOf(jobStatus.getEventName());
            long jobStartSla = DagManagerUtils.getJobStartSla(dagNode);
            long orchestratedTime = jobStatus.getOrchestratedTime();
            if (valueOf != ExecutionStatus.ORCHESTRATED || System.currentTimeMillis() - orchestratedTime <= jobStartSla) {
                return false;
            }
            DagManager.log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Killing the job now...", new Object[]{DagManagerUtils.getJobName(dagNode), DagManagerUtils.getFullyQualifiedDagName(dagNode), Long.valueOf(jobStartSla)});
            cancelDagNode(dagNode);
            String generateDagId = DagManagerUtils.generateDagId(dagNode);
            this.dags.get(generateDagId).setFlowEvent("FlowCancelled");
            this.dags.get(generateDagId).setMessage("Flow killed because no update received for " + jobStartSla + " ms after orchestration");
            return true;
        }

        private ExecutionStatus getJobExecutionStatus(boolean z, boolean z2, JobStatus jobStatus) {
            return (z || z2) ? ExecutionStatus.CANCELLED : jobStatus == null ? ExecutionStatus.PENDING : ExecutionStatus.valueOf(jobStatus.getEventName());
        }

        private boolean slaKillIfNeeded(Dag.DagNode<JobExecutionPlan> dagNode) throws ExecutionException, InterruptedException {
            long flowSLA;
            long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
            long currentTimeMillis = System.currentTimeMillis();
            String generateDagId = DagManagerUtils.generateDagId(dagNode);
            if (this.dagToSLA.containsKey(generateDagId)) {
                flowSLA = this.dagToSLA.get(generateDagId).longValue();
            } else {
                flowSLA = DagManagerUtils.getFlowSLA(dagNode);
                this.dagToSLA.put(generateDagId, Long.valueOf(flowSLA));
            }
            if (currentTimeMillis <= flowStartTime + flowSLA) {
                return false;
            }
            DagManager.log.info("Flow {} exceeded the SLA of {} ms. Killing the job {} now...", new Object[]{dagNode.getValue().getJobSpec().getConfig().getString("flow.name"), Long.valueOf(flowSLA), dagNode.getValue().getJobSpec().getConfig().getString("job.name")});
            cancelDagNode(dagNode);
            this.dags.get(generateDagId).setFlowEvent("FlowCancelled");
            this.dags.get(generateDagId).setMessage("Flow killed due to exceeding SLA of " + flowSLA + " ms");
            return true;
        }

        private JobStatus pollJobStatus(Dag.DagNode<JobExecutionPlan> dagNode) {
            Config config = dagNode.getValue().getJobSpec().getConfig();
            String string = config.getString("flow.group");
            String string2 = config.getString("flow.name");
            long j = config.getLong("flow.executionId");
            String string3 = config.getString("job.group");
            String string4 = config.getString("job.name");
            long nanoTime = System.nanoTime();
            Iterator jobStatusesForFlowExecution = this.jobStatusRetriever.getJobStatusesForFlowExecution(string2, string, j, string4, string3);
            Instrumented.updateTimer(this.jobStatusPolledTimer, System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            if (jobStatusesForFlowExecution.hasNext()) {
                return (JobStatus) jobStatusesForFlowExecution.next();
            }
            return null;
        }

        synchronized Map<String, Set<Dag.DagNode<JobExecutionPlan>>> submitNext(String str) throws IOException {
            Dag<JobExecutionPlan> dag = this.dags.get(str);
            Set<Dag.DagNode<JobExecutionPlan>> next = DagManagerUtils.getNext(dag);
            Iterator<Dag.DagNode<JobExecutionPlan>> it = next.iterator();
            while (it.hasNext()) {
                submitJob(it.next());
            }
            this.dagStateStore.writeCheckpoint(dag);
            HashMap newHashMap = Maps.newHashMap();
            newHashMap.put(str, next);
            return newHashMap;
        }

        private void submitJob(Dag.DagNode<JobExecutionPlan> dagNode) {
            DagManagerUtils.incrementJobAttempt(dagNode);
            JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(dagNode);
            jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
            JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
            Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
            String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
            try {
                checkQuota(dagNode);
                SpecProducer specProducer = DagManagerUtils.getSpecProducer(dagNode);
                TimingEvent timingEvent = this.eventSubmitter.isPresent() ? ((EventSubmitter) this.eventSubmitter.get()).getTimingEvent("JobOrchestrated") : null;
                Future addSpec = specProducer.addSpec(jobSpec);
                dagNode.getValue().setJobFuture(Optional.of(addSpec));
                this.dagStateStore.writeCheckpoint(this.dags.get(DagManagerUtils.generateDagId(dagNode)));
                if (this.metricContext != null) {
                    getRunningJobsCounter(dagNode).inc();
                    getRunningJobsCounterForUser(dagNode).forEach(contextAwareCounter -> {
                        contextAwareCounter.inc();
                    });
                }
                addSpec.get();
                jobMetadata.put("message", specProducer.getExecutionLink(addSpec, specExecutorUri));
                if (timingEvent != null) {
                    timingEvent.stop(jobMetadata);
                }
                DagManager.log.info("Orchestrated job: {} on Executor: {}", DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
            } catch (Exception e) {
                TimingEvent timingEvent2 = this.eventSubmitter.isPresent() ? ((EventSubmitter) this.eventSubmitter.get()).getTimingEvent("JobFailedTimer") : null;
                String str = "Cannot submit job " + DagManagerUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
                DagManager.log.error(str, e);
                jobMetadata.put("message", str + " due to " + e.getMessage());
                if (timingEvent2 != null) {
                    timingEvent2.stop(jobMetadata);
                }
            }
        }

        private void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
            String string = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), "user.to.proxy", (String) null);
            String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
            boolean incrementMapAndCheckQuota = string != null ? incrementMapAndCheckQuota(proxyUserToJobCount, string, dagNode) : true;
            String serializedRequesterList = DagManagerUtils.getSerializedRequesterList(dagNode);
            boolean z = true;
            String str = null;
            if (serializedRequesterList != null) {
                for (ServiceRequester serviceRequester : RequesterService.deserialize(serializedRequesterList)) {
                    z &= incrementMapAndCheckQuota(requesterToJobCount, serviceRequester.getName(), dagNode);
                    if (!z && str == null) {
                        str = "Quota exceeded for requester " + serviceRequester.getName() + " on executor " + specExecutorUri + ": quota=" + getQuotaForUser(serviceRequester.getName()) + ", runningJobs=" + requesterToJobCount.get(DagManagerUtils.getUserQuotaKey(serviceRequester.getName(), dagNode));
                    }
                }
            }
            if (!incrementMapAndCheckQuota) {
                throw new IOException("Quota exceeded for proxy user " + string + " on executor " + specExecutorUri + ": quota=" + getQuotaForUser(string) + ", runningJobs=" + proxyUserToJobCount.get(DagManagerUtils.getUserQuotaKey(string, dagNode)));
            }
            if (!z) {
                throw new IOException(str);
            }
        }

        private boolean incrementMapAndCheckQuota(Map<String, Integer> map, String str, Dag.DagNode<JobExecutionPlan> dagNode) {
            String userQuotaKey = DagManagerUtils.getUserQuotaKey(str, dagNode);
            int intValue = map.getOrDefault(userQuotaKey, 0).intValue();
            if (dagNode.getValue().getCurrentAttempts() == 1) {
                intValue++;
                map.put(userQuotaKey, Integer.valueOf(intValue));
            }
            return intValue <= getQuotaForUser(str);
        }

        private int getQuotaForUser(String str) {
            return this.perUserQuota.getOrDefault(str, Integer.valueOf(this.defaultQuota)).intValue();
        }

        private Map<String, Set<Dag.DagNode<JobExecutionPlan>>> onJobFinish(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
            Dag<JobExecutionPlan> dag = this.jobToDag.get(dagNode);
            String generateDagId = DagManagerUtils.generateDagId(dag);
            String fullyQualifiedJobName = DagManagerUtils.getFullyQualifiedJobName(dagNode);
            ExecutionStatus executionStatus = DagManagerUtils.getExecutionStatus(dagNode);
            DagManager.log.info("Job {} of Dag {} has finished with status {}", new Object[]{fullyQualifiedJobName, generateDagId, executionStatus.name()});
            releaseQuota(dagNode);
            if (this.metricContext != null) {
                getRunningJobsCounter(dagNode).dec();
                getRunningJobsCounterForUser(dagNode).forEach(contextAwareCounter -> {
                    contextAwareCounter.dec();
                });
            }
            switch (AnonymousClass1.$SwitchMap$org$apache$gobblin$service$ExecutionStatus[executionStatus.ordinal()]) {
                case 1:
                    return submitNext(generateDagId);
                case 2:
                    dag.setMessage("Flow failed because job " + fullyQualifiedJobName + " failed");
                    if (DagManagerUtils.getFailureOption(dag) == FailureOption.FINISH_RUNNING) {
                        this.failedDagIdsFinishRunning.add(generateDagId);
                    } else {
                        this.failedDagIdsFinishAllPossible.add(generateDagId);
                    }
                    return Maps.newHashMap();
                case 3:
                    if (DagManagerUtils.getFailureOption(dag) == FailureOption.FINISH_RUNNING) {
                        this.failedDagIdsFinishRunning.add(generateDagId);
                    } else {
                        this.failedDagIdsFinishAllPossible.add(generateDagId);
                    }
                    return Maps.newHashMap();
                default:
                    DagManager.log.warn("It should not reach here. Job status is unexpected.");
                    return Maps.newHashMap();
            }
        }

        private void releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) {
            String string = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), "user.to.proxy", (String) null);
            if (string != null) {
                String userQuotaKey = DagManagerUtils.getUserQuotaKey(string, dagNode);
                if (proxyUserToJobCount.containsKey(userQuotaKey) && proxyUserToJobCount.get(userQuotaKey).intValue() > 0) {
                    proxyUserToJobCount.put(userQuotaKey, Integer.valueOf(proxyUserToJobCount.get(userQuotaKey).intValue() - 1));
                }
            }
            String serializedRequesterList = DagManagerUtils.getSerializedRequesterList(dagNode);
            if (serializedRequesterList != null) {
                try {
                    Iterator it = RequesterService.deserialize(serializedRequesterList).iterator();
                    while (it.hasNext()) {
                        String userQuotaKey2 = DagManagerUtils.getUserQuotaKey(((ServiceRequester) it.next()).getName(), dagNode);
                        if (requesterToJobCount.containsKey(userQuotaKey2) && requesterToJobCount.get(userQuotaKey2).intValue() > 0) {
                            requesterToJobCount.put(userQuotaKey2, Integer.valueOf(requesterToJobCount.get(userQuotaKey2).intValue() - 1));
                        }
                    }
                } catch (IOException e) {
                    DagManager.log.error("Failed to release quota for requester list " + serializedRequesterList, e);
                }
            }
        }

        private void deleteJobState(String str, Dag.DagNode<JobExecutionPlan> dagNode) {
            this.jobToDag.remove(dagNode);
            this.dagToJobs.get(str).remove(dagNode);
            this.dagToSLA.remove(str);
        }

        private void addJobState(String str, Dag.DagNode<JobExecutionPlan> dagNode) {
            this.jobToDag.put(dagNode, this.dags.get(str));
            if (this.dagToJobs.containsKey(str)) {
                this.dagToJobs.get(str).add(dagNode);
                return;
            }
            LinkedList<Dag.DagNode<JobExecutionPlan>> newLinkedList = Lists.newLinkedList();
            newLinkedList.add(dagNode);
            this.dagToJobs.put(str, newLinkedList);
        }

        private boolean hasRunningJobs(String str) {
            return !this.dagToJobs.get(str).isEmpty();
        }

        private ContextAwareCounter getRunningJobsCounter(Dag.DagNode<JobExecutionPlan> dagNode) {
            return this.metricContext.contextAwareCounter(MetricRegistry.name("GobblinService", new String[]{"RunningFlows", dagNode.getValue().getSpecExecutor().getUri().toString()}));
        }

        private List<ContextAwareCounter> getRunningJobsCounterForUser(Dag.DagNode<JobExecutionPlan> dagNode) {
            String string = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), "user.to.proxy", (String) null);
            ArrayList arrayList = new ArrayList();
            if (StringUtils.isNotEmpty(string)) {
                arrayList.add(this.metricContext.contextAwareCounter(MetricRegistry.name("GobblinService", new String[]{"ServiceUsers", string})));
            }
            try {
                String serializedRequesterList = DagManagerUtils.getSerializedRequesterList(dagNode);
                if (StringUtils.isNotEmpty(serializedRequesterList)) {
                    Iterator it = RequesterService.deserialize(serializedRequesterList).iterator();
                    while (it.hasNext()) {
                        arrayList.add(this.metricContext.contextAwareCounter(MetricRegistry.name("GobblinService", new String[]{"ServiceUsers", ((ServiceRequester) it.next()).getName()})));
                    }
                }
            } catch (IOException e) {
                DagManager.log.error("Error while fetching requester list.", e);
            }
            return arrayList;
        }

        private void cleanUp() {
            ArrayList arrayList = new ArrayList();
            for (String str : this.failedDagIdsFinishRunning) {
                LinkedList<Dag.DagNode<JobExecutionPlan>> linkedList = this.dagToJobs.get(str);
                while (!linkedList.isEmpty()) {
                    deleteJobState(str, linkedList.poll());
                }
                DagManager.log.info("Dag {} has finished with status FAILED; Cleaning up dag from the state store.", str);
                DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dags.get(str), "FlowFailed");
                arrayList.add(str);
            }
            for (String str2 : this.dags.keySet()) {
                if (!hasRunningJobs(str2) && !this.failedDagIdsFinishRunning.contains(str2)) {
                    String str3 = "FlowSucceeded";
                    if (this.failedDagIdsFinishAllPossible.contains(str2)) {
                        str3 = "FlowFailed";
                        this.failedDagIdsFinishAllPossible.remove(str2);
                    }
                    DagManager.log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", str2, str3);
                    DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dags.get(str2), str3);
                    arrayList.add(str2);
                }
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                cleanUpDag((String) it.next());
            }
        }

        private synchronized void cleanUpDag(String str) {
            try {
                this.dagStateStore.cleanUp(this.dags.get(str));
            } catch (IOException e) {
                DagManager.log.error(String.format("Failed to clean %s from backStore due to:", str), e);
            }
            this.dags.remove(str);
            this.dagToJobs.remove(str);
        }
    }

    /* loaded from: input_file:org/apache/gobblin/service/modules/orchestration/DagManager$FailureOption.class */
    public enum FailureOption {
        FINISH_RUNNING("FINISH_RUNNING"),
        CANCEL("CANCEL"),
        FINISH_ALL_POSSIBLE("FINISH_ALL_POSSIBLE");

        private final String failureOption;

        FailureOption(String str) {
            this.failureOption = str;
        }

        @Override // java.lang.Enum
        public String toString() {
            return this.failureOption;
        }
    }

    public DagManager(Config config, boolean z) {
        this.isActive = false;
        this.config = config;
        this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, DEFAULT_NUM_THREADS);
        this.queue = initializeDagQueue(this.numThreads.intValue());
        this.cancelQueue = initializeDagQueue(this.numThreads.intValue());
        this.scheduledExecutorPool = Executors.newScheduledThreadPool(this.numThreads.intValue());
        this.pollingInterval = ConfigUtils.getInt(config, JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
        this.instrumentationEnabled = z;
        if (z) {
            this.eventSubmitter = Optional.of(new EventSubmitter.Builder(Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass()), "org.apache.gobblin.service").build());
        } else {
            this.eventSubmitter = Optional.absent();
        }
        this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA).intValue();
        this.perUserQuota = new HashMap();
        for (String str : ConfigUtils.getStringList(config, PER_USER_QUOTA)) {
            this.perUserQuota.put(str.split(QUOTA_SEPERATOR)[0], Integer.valueOf(Integer.parseInt(str.split(QUOTA_SEPERATOR)[1])));
        }
        try {
            this.jobStatusRetriever = createJobStatusRetriever(config);
        } catch (ReflectiveOperationException e) {
            throw new RuntimeException("Exception encountered during DagManager initialization", e);
        }
    }

    JobStatusRetriever createJobStatusRetriever(Config config) throws ReflectiveOperationException {
        return (JobStatusRetriever) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY, DEFAULT_JOB_STATUS_RETRIEVER_CLASS)), new Object[]{config});
    }

    KafkaJobStatusMonitor createJobStatusMonitor(Config config) throws ReflectiveOperationException {
        return new KafkaJobStatusMonitorFactory().createJobStatusMonitor(config);
    }

    DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> map) {
        try {
            return (DagStateStore) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, FSDagStateStore.class.getName())), new Object[]{config, map});
        } catch (ReflectiveOperationException e) {
            throw new RuntimeException(e);
        }
    }

    private static LinkedBlockingDeque[] initializeDagQueue(int i) {
        LinkedBlockingDeque[] linkedBlockingDequeArr = new LinkedBlockingDeque[i];
        for (int i2 = 0; i2 < i; i2++) {
            linkedBlockingDequeArr[i2] = new LinkedBlockingDeque();
        }
        return linkedBlockingDequeArr;
    }

    public DagManager(Config config) {
        this(config, true);
    }

    protected void startUp() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void addDag(Dag<JobExecutionPlan> dag, boolean z) throws IOException {
        if (z) {
            this.dagStateStore.writeCheckpoint(dag);
        }
        if (!this.queue[DagManagerUtils.getDagQueueId(dag, this.numThreads.intValue())].offer(dag)) {
            throw new IOException("Could not add dag" + DagManagerUtils.generateDagId(dag) + "to queue");
        }
        submitEventsAndSetStatus(dag);
    }

    private void submitEventsAndSetStatus(Dag<JobExecutionPlan> dag) {
        if (this.eventSubmitter.isPresent()) {
            Iterator<Dag.DagNode<JobExecutionPlan>> it = dag.getNodes().iterator();
            while (it.hasNext()) {
                JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(it.next());
                ((EventSubmitter) this.eventSubmitter.get()).getTimingEvent("JobPending").stop(TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan));
                jobExecutionPlan.setExecutionStatus(ExecutionStatus.PENDING);
            }
        }
    }

    public synchronized void stopDag(URI uri) throws IOException {
        String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
        String flowName = FlowSpec.Utils.getFlowName(uri);
        List latestExecutionIdsForFlow = this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
        log.info("Found {} flows to cancel.", Integer.valueOf(latestExecutionIdsForFlow.size()));
        Iterator it = latestExecutionIdsForFlow.iterator();
        while (it.hasNext()) {
            killFlow(flowGroup, flowName, ((Long) it.next()).longValue());
        }
    }

    private void killFlow(String str, String str2, long j) throws IOException {
        int dagQueueId = DagManagerUtils.getDagQueueId(j, this.numThreads.intValue());
        String generateDagId = DagManagerUtils.generateDagId(str, str2, j);
        if (!this.cancelQueue[dagQueueId].offer(generateDagId)) {
            throw new IOException("Could not add dag " + generateDagId + " to cancellation queue.");
        }
    }

    @Subscribe
    public void handleKillFlowEvent(KillFlowEvent killFlowEvent) {
        log.info("Received kill request for flow ({}, {}, {})", new Object[]{killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(), killFlowEvent.getFlowExecutionId()});
        try {
            killFlow(killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(), killFlowEvent.getFlowExecutionId().longValue());
        } catch (IOException e) {
            log.warn("Failed to kill flow", e);
        }
    }

    public synchronized void setTopologySpecMap(Map<URI, TopologySpec> map) {
        this.topologySpecMap = map;
    }

    public synchronized void setActive(boolean z) {
        if (this.isActive == z) {
            log.info("DagManager already {}, skipping further actions.", !z ? "inactive" : "active");
            return;
        }
        this.isActive = z;
        try {
            if (this.isActive) {
                log.info("Activating DagManager.");
                log.info("Scheduling {} DagManager threads", this.numThreads);
                this.dagStateStore = createDagStateStore(this.config, this.topologySpecMap);
                this.dagManagerThreads = new DagManagerThread[this.numThreads.intValue()];
                for (int i = 0; i < this.numThreads.intValue(); i++) {
                    DagManagerThread dagManagerThread = new DagManagerThread(this.jobStatusRetriever, this.dagStateStore, this.queue[i], this.cancelQueue[i], this.instrumentationEnabled, this.defaultQuota, this.perUserQuota);
                    this.dagManagerThreads[i] = dagManagerThread;
                    this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0L, this.pollingInterval.intValue(), TimeUnit.SECONDS);
                }
                List<Dag<JobExecutionPlan>> dags = this.dagStateStore.getDags();
                log.info("Loading " + dags.size() + " dags from dag state store");
                Iterator<Dag<JobExecutionPlan>> it = dags.iterator();
                while (it.hasNext()) {
                    addDag(it.next(), false);
                }
            } else {
                log.info("Inactivating the DagManager. Shutting down all DagManager threads");
                this.scheduledExecutorPool.shutdown();
                try {
                    this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT.intValue(), TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    log.error("Exception encountered when shutting down DagManager threads.", e);
                }
            }
        } catch (IOException e2) {
            log.error("Exception encountered when activating the new DagManager", e2);
            throw new RuntimeException(e2);
        }
    }

    protected void shutDown() throws Exception {
        this.scheduledExecutorPool.shutdown();
        this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT.intValue(), TimeUnit.SECONDS);
    }

    public Integer getNumThreads() {
        return this.numThreads;
    }

    public JobStatusRetriever getJobStatusRetriever() {
        return this.jobStatusRetriever;
    }
}
