package org.apache.nemo.runtime.master.scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import org.apache.nemo.common.ir.Readable;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
import org.apache.nemo.common.ir.executionproperty.AssociatedProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ResourceLocalityProperty;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.plan.StageEdge;
import org.apache.nemo.runtime.common.plan.Task;
import org.apache.nemo.runtime.common.state.BlockState;
import org.apache.nemo.runtime.master.BlockManagerMaster;
import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;

@ThreadSafe
@DriverSide
@AssociatedProperty(ResourceLocalityProperty.class)
/* loaded from: input_file:org/apache/nemo/runtime/master/scheduler/LocalitySchedulingConstraint.class */
public final class LocalitySchedulingConstraint implements SchedulingConstraint {
    private final BlockManagerMaster blockManagerMaster;

    @Inject
    private LocalitySchedulingConstraint(BlockManagerMaster blockManagerMaster) {
        this.blockManagerMaster = blockManagerMaster;
    }

    private List<String> getIntermediateDataLocations(Task task) {
        if (task.getTaskIncomingEdges().size() == 1) {
            StageEdge stageEdge = (StageEdge) task.getTaskIncomingEdges().get(0);
            if (CommunicationPatternProperty.Value.ONE_TO_ONE.equals(stageEdge.getPropertyValue(CommunicationPatternProperty.class).orElseThrow(() -> {
                return new RuntimeException("No comm pattern!");
            }))) {
                Optional propertyValue = stageEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
                return (List) this.blockManagerMaster.getBlockHandlers(RuntimeIdManager.generateBlockId(propertyValue.isPresent() ? ((DuplicateEdgeGroupPropertyValue) propertyValue.get()).getRepresentativeEdgeId() : stageEdge.getId(), task.getTaskId()), BlockState.State.AVAILABLE).stream().map(blockRequestHandler -> {
                    try {
                        return blockRequestHandler.getLocationFuture().get();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    } catch (ExecutionException e2) {
                        throw new RuntimeException(e2);
                    }
                }).collect(Collectors.toList());
            }
        }
        return Collections.emptyList();
    }

    private static Set<String> getSourceDataLocations(Collection<Readable> collection) throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterator<Readable> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.addAll(it.next().getLocations());
        }
        return new HashSet(arrayList);
    }

    @Override // org.apache.nemo.runtime.master.scheduler.SchedulingConstraint
    public boolean testSchedulability(ExecutorRepresenter executorRepresenter, Task task) {
        if (!task.getTaskIncomingEdges().isEmpty()) {
            List<String> intermediateDataLocations = getIntermediateDataLocations(task);
            if (intermediateDataLocations.isEmpty()) {
                return true;
            }
            return intermediateDataLocations.contains(executorRepresenter.getExecutorId());
        }
        try {
            Set<String> sourceDataLocations = getSourceDataLocations(task.getIrVertexIdToReadable().values());
            if (sourceDataLocations.size() == 0) {
                return true;
            }
            return sourceDataLocations.contains(executorRepresenter.getNodeName());
        } catch (UnsupportedOperationException e) {
            return true;
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }
}
