package org.apache.nemo.runtime.master.scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
import javax.inject.Inject;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.nemo.runtime.common.plan.Task;
import org.apache.nemo.runtime.common.state.TaskState;
import org.apache.nemo.runtime.master.PlanStateManager;
import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@DriverSide
@NotThreadSafe
/* loaded from: input_file:org/apache/nemo/runtime/master/scheduler/TaskDispatcher.class */
public final class TaskDispatcher {
    private static final Logger LOG = LoggerFactory.getLogger(TaskDispatcher.class.getName());
    private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
    private final PlanStateManager planStateManager;
    private final ExecutorRegistry executorRegistry;
    private final SchedulingConstraintRegistry schedulingConstraintRegistry;
    private final SchedulingPolicy schedulingPolicy;
    private final DelayedSignalingCondition schedulingIteration = new DelayedSignalingCondition();
    private final ExecutorService dispatcherThread = Executors.newSingleThreadExecutor(runnable -> {
        return new Thread(runnable, "TaskDispatcher thread");
    });
    private boolean isSchedulerRunning = false;
    private boolean isTerminated = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nemo/runtime/master/scheduler/TaskDispatcher$DelayedSignalingCondition.class */
    public final class DelayedSignalingCondition {
        private boolean hasDelayedSignal;
        private final Lock lock;
        private final Condition condition;

        private DelayedSignalingCondition() {
            this.hasDelayedSignal = false;
            this.lock = new ReentrantLock();
            this.condition = this.lock.newCondition();
        }

        void signal() {
            this.lock.lock();
            try {
                this.hasDelayedSignal = true;
                this.condition.signal();
            } finally {
                this.lock.unlock();
            }
        }

        void await() {
            this.lock.lock();
            while (!this.hasDelayedSignal) {
                try {
                    try {
                        this.condition.await();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    }
                } catch (Throwable th) {
                    this.lock.unlock();
                    throw th;
                }
            }
            this.hasDelayedSignal = false;
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nemo/runtime/master/scheduler/TaskDispatcher$TaskDispatcherThread.class */
    public final class TaskDispatcherThread implements Runnable {
        private TaskDispatcherThread() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!TaskDispatcher.this.isTerminated) {
                TaskDispatcher.this.doScheduleTaskList();
                TaskDispatcher.this.schedulingIteration.await();
            }
            if (TaskDispatcher.this.planStateManager.isPlanDone()) {
                TaskDispatcher.LOG.info("{} is complete.", TaskDispatcher.this.planStateManager.getPlanId());
            } else {
                TaskDispatcher.LOG.info("{} is incomplete.", TaskDispatcher.this.planStateManager.getPlanId());
            }
            TaskDispatcher.LOG.info("TaskDispatcher Terminated!");
        }
    }

    @Inject
    private TaskDispatcher(SchedulingConstraintRegistry schedulingConstraintRegistry, SchedulingPolicy schedulingPolicy, PendingTaskCollectionPointer pendingTaskCollectionPointer, ExecutorRegistry executorRegistry, PlanStateManager planStateManager) {
        this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
        this.planStateManager = planStateManager;
        this.executorRegistry = executorRegistry;
        this.schedulingPolicy = schedulingPolicy;
        this.schedulingConstraintRegistry = schedulingConstraintRegistry;
    }

    public static TaskDispatcher newInstance(SchedulingConstraintRegistry schedulingConstraintRegistry, SchedulingPolicy schedulingPolicy, PendingTaskCollectionPointer pendingTaskCollectionPointer, ExecutorRegistry executorRegistry, PlanStateManager planStateManager) {
        return new TaskDispatcher(schedulingConstraintRegistry, schedulingPolicy, pendingTaskCollectionPointer, executorRegistry, planStateManager);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doScheduleTaskList() {
        Optional<Collection<Task>> andSetNull = this.pendingTaskCollectionPointer.getAndSetNull();
        if (!andSetNull.isPresent()) {
            LOG.debug("PendingTaskCollectionPointer is empty. Awaiting for more Tasks...");
            return;
        }
        Collection<Task> collection = andSetNull.get();
        ArrayList arrayList = new ArrayList();
        for (Task task : collection) {
            if (this.planStateManager.getTaskState(task.getTaskId()).equals(TaskState.State.READY)) {
                this.executorRegistry.viewExecutors(set -> {
                    MutableObject mutableObject = new MutableObject(set);
                    task.getExecutionProperties().forEachProperties(vertexExecutionProperty -> {
                        Optional<SchedulingConstraint> optional = this.schedulingConstraintRegistry.get(vertexExecutionProperty.getClass());
                        if (!optional.isPresent() || ((Set) mutableObject.getValue()).isEmpty()) {
                            return;
                        }
                        mutableObject.setValue((Set) ((Set) mutableObject.getValue()).stream().filter(executorRepresenter -> {
                            return ((SchedulingConstraint) optional.get()).testSchedulability(executorRepresenter, task);
                        }).collect(Collectors.toSet()));
                    });
                    if (((Set) mutableObject.getValue()).isEmpty()) {
                        arrayList.add(task);
                        return;
                    }
                    ExecutorRepresenter selectExecutor = this.schedulingPolicy.selectExecutor((Collection) mutableObject.getValue(), task);
                    this.planStateManager.onTaskStateChanged(task.getTaskId(), TaskState.State.EXECUTING);
                    LOG.info("{} scheduled to {}", task.getTaskId(), selectExecutor.getExecutorId());
                    selectExecutor.onTaskScheduled(task);
                });
            } else {
                LOG.debug("Skipping {} as it is not READY", task.getTaskId());
            }
        }
        LOG.debug("All except {} were scheduled among {}", new Object[]{arrayList, collection});
        if (arrayList.size() > 0) {
            this.pendingTaskCollectionPointer.setIfNull(arrayList);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onExecutorSlotAvailable() {
        this.schedulingIteration.signal();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onNewPendingTaskCollectionAvailable() {
        this.schedulingIteration.signal();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void run() {
        if (this.isTerminated || this.isSchedulerRunning) {
            return;
        }
        this.dispatcherThread.execute(new TaskDispatcherThread());
        this.dispatcherThread.shutdown();
        this.isSchedulerRunning = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void terminate() {
        this.isTerminated = true;
        this.schedulingIteration.signal();
    }
}
