package org.apache.hudi.integ.testsuite.dag.scheduler;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
import org.apache.hudi.integ.testsuite.dag.WriterContext;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.integ.testsuite.dag.nodes.DelayNode;
import org.apache.hudi.metrics.Metrics;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.class */
public class DagScheduler {
    private static Logger log = LoggerFactory.getLogger(DagScheduler.class);
    private WorkflowDag workflowDag;
    private ExecutionContext executionContext;

    public DagScheduler(WorkflowDag workflowDag, WriterContext writerContext, JavaSparkContext javaSparkContext) {
        this.workflowDag = workflowDag;
        this.executionContext = new ExecutionContext(javaSparkContext, writerContext);
    }

    public void schedule() throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        try {
            execute(newFixedThreadPool, this.workflowDag);
            newFixedThreadPool.shutdown();
        } finally {
            if (!newFixedThreadPool.isShutdown()) {
                log.info("Forcing shutdown of executor service, this might kill running tasks");
                newFixedThreadPool.shutdownNow();
            }
        }
    }

    private void execute(ExecutorService executorService, WorkflowDag workflowDag) throws Exception {
        int i;
        log.info("Running workloads");
        List nodeList = workflowDag.getNodeList();
        int i2 = 1;
        do {
            log.warn("===================================================================");
            log.warn("Running workloads for round num " + i2);
            log.warn("===================================================================");
            PriorityQueue priorityQueue = new PriorityQueue();
            Iterator it = nodeList.iterator();
            while (it.hasNext()) {
                priorityQueue.add(((DagNode) it.next()).m10clone());
            }
            do {
                ArrayList arrayList = new ArrayList();
                HashSet hashSet = new HashSet();
                while (priorityQueue.size() > 0) {
                    DagNode dagNode = (DagNode) priorityQueue.poll();
                    log.warn("Executing node \"" + dagNode.getConfig().getOtherConfigs().get(DeltaConfig.Config.CONFIG_NAME) + "\" :: " + dagNode.getConfig());
                    int i3 = i2;
                    arrayList.add(executorService.submit(() -> {
                        executeNode(dagNode, i3);
                    }));
                    if (dagNode.getChildNodes().size() > 0) {
                        hashSet.addAll(dagNode.getChildNodes());
                    }
                }
                priorityQueue.addAll(hashSet);
                hashSet.clear();
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    ((Future) it2.next()).get(1L, TimeUnit.HOURS);
                }
            } while (priorityQueue.size() > 0);
            log.info("Finished workloads for round num " + i2);
            if (i2 < workflowDag.getRounds()) {
                new DelayNode(workflowDag.getIntermittentDelayMins()).execute(this.executionContext, i2);
            }
            Metrics.flush();
            i = i2;
            i2++;
        } while (i < workflowDag.getRounds());
        log.info("Finished workloads");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void executeNode(DagNode dagNode, int i) {
        if (dagNode.isCompleted()) {
            throw new RuntimeException("DagNode already completed! Cannot re-execute");
        }
        try {
            for (int repeatCount = dagNode.getConfig().getRepeatCount(); repeatCount > 0; repeatCount--) {
                dagNode.execute(this.executionContext, i);
                log.info("Finished executing {}", dagNode.getName());
            }
            dagNode.setCompleted(true);
        } catch (Exception e) {
            log.error("Exception executing node", e);
            throw new HoodieException(e);
        }
    }
}
