package org.apache.nemo.runtime.common.plan;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.inject.Inject;
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.dag.DAGBuilder;
import org.apache.nemo.common.exception.IllegalVertexOperationException;
import org.apache.nemo.common.exception.PhysicalPlanGenerationException;
import org.apache.nemo.common.ir.IRDAG;
import org.apache.nemo.common.ir.Readable;
import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.common.ir.vertex.SourceVertex;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
import org.apache.nemo.common.ir.vertex.utility.SamplingVertex;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.reef.tang.annotations.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nemo/runtime/common/plan/PhysicalPlanGenerator.class */
public final class PhysicalPlanGenerator implements Function<IRDAG, DAG<Stage, StageEdge>> {
    private static final Logger LOG = LoggerFactory.getLogger(PhysicalPlanGenerator.class.getName());
    private final String dagDirectory;

    @Inject
    private PhysicalPlanGenerator(@Parameter(JobConf.DAGDirectory.class) String str) {
        this.dagDirectory = str;
    }

    @Override // java.util.function.Function
    public DAG<Stage, StageEdge> apply(IRDAG irdag) {
        DAG<Stage, StageEdge> stagePartitionIrDAG = stagePartitionIrDAG(irdag);
        stagePartitionIrDAG.getVertices().forEach(this::integrityCheck);
        handleDuplicateEdgeGroupProperty(stagePartitionIrDAG);
        stagePartitionIrDAG.storeJSON(this.dagDirectory, "plan-logical", "logical execution plan");
        return stagePartitionIrDAG;
    }

    private void handleDuplicateEdgeGroupProperty(DAG<Stage, StageEdge> dag) {
        HashMap hashMap = new HashMap();
        dag.topologicalDo(stage -> {
            dag.getIncomingEdgesOf(stage).forEach(stageEdge -> {
                Optional<T> propertyValue = stageEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
                if (propertyValue.isPresent()) {
                    ((List) hashMap.computeIfAbsent(((DuplicateEdgeGroupPropertyValue) propertyValue.get()).getGroupId(), str -> {
                        return new ArrayList();
                    })).add(stageEdge);
                }
            });
        });
        hashMap.forEach((str, list) -> {
            StageEdge stageEdge = (StageEdge) list.get(0);
            DuplicateEdgeGroupPropertyValue duplicateEdgeGroupPropertyValue = (DuplicateEdgeGroupPropertyValue) stageEdge.getPropertyValue(DuplicateEdgeGroupProperty.class).get();
            list.forEach(stageEdge2 -> {
                DuplicateEdgeGroupPropertyValue duplicateEdgeGroupPropertyValue2 = (DuplicateEdgeGroupPropertyValue) stageEdge2.getPropertyValue(DuplicateEdgeGroupProperty.class).get();
                if (duplicateEdgeGroupPropertyValue.isRepresentativeEdgeDecided()) {
                    duplicateEdgeGroupPropertyValue2.setRepresentativeEdgeId(duplicateEdgeGroupPropertyValue.getRepresentativeEdgeId());
                } else {
                    duplicateEdgeGroupPropertyValue2.setRepresentativeEdgeId(stageEdge.getId());
                }
                duplicateEdgeGroupPropertyValue2.setGroupSize(list.size());
            });
        });
    }

    public DAG<Stage, StageEdge> stagePartitionIrDAG(IRDAG irdag) {
        StagePartitioner stagePartitioner = new StagePartitioner();
        DAGBuilder dAGBuilder = new DAGBuilder();
        HashSet<IREdge> hashSet = new HashSet();
        HashMap hashMap = new HashMap();
        Map<IRVertex, Integer> apply = stagePartitioner.apply(irdag);
        HashSet hashSet2 = new HashSet();
        Random random = new Random(hashCode());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        irdag.topologicalDo(iRVertex -> {
            int intValue = ((Integer) apply.get(iRVertex)).intValue();
            if (!linkedHashMap.containsKey(Integer.valueOf(intValue))) {
                linkedHashMap.put(Integer.valueOf(intValue), new HashSet());
            }
            ((Set) linkedHashMap.get(Integer.valueOf(intValue))).add(iRVertex);
        });
        Iterator it = linkedHashMap.keySet().iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            Set<IRVertex> set = (Set) linkedHashMap.get(Integer.valueOf(intValue));
            String generateStageId = RuntimeIdManager.generateStageId(Integer.valueOf(intValue));
            ExecutionPropertyMap executionPropertyMap = new ExecutionPropertyMap(generateStageId);
            Set<VertexExecutionProperty> stageProperties = stagePartitioner.getStageProperties(set.iterator().next());
            Objects.requireNonNull(executionPropertyMap);
            stageProperties.forEach((v1) -> {
                r1.put(v1);
            });
            int intValue2 = ((Integer) executionPropertyMap.get(ParallelismProperty.class).orElseThrow(() -> {
                return new RuntimeException("Parallelism property must be set for Stage");
            })).intValue();
            List<Integer> taskIndicesToExecute = getTaskIndicesToExecute(set, intValue2, random);
            DAGBuilder dAGBuilder2 = new DAGBuilder();
            ArrayList arrayList = new ArrayList(intValue2);
            for (int i = 0; i < intValue2; i++) {
                arrayList.add(new HashMap());
            }
            Iterator<IRVertex> it2 = set.iterator();
            while (it2.hasNext()) {
                SourceVertex actualVertexToPutIntoStage = getActualVertexToPutIntoStage(it2.next());
                if ((actualVertexToPutIntoStage instanceof SourceVertex) && !hashSet2.contains(actualVertexToPutIntoStage)) {
                    SourceVertex sourceVertex = actualVertexToPutIntoStage;
                    try {
                        List readables = sourceVertex.getReadables(intValue2);
                        for (int i2 = 0; i2 < intValue2; i2++) {
                            ((Map) arrayList.get(i2)).put(actualVertexToPutIntoStage.getId(), (Readable) readables.get(i2));
                        }
                        sourceVertex.clearInternalStates();
                    } catch (Exception e) {
                        throw new PhysicalPlanGenerationException(e);
                    }
                }
                dAGBuilder2.addVertex(actualVertexToPutIntoStage);
            }
            for (IRVertex iRVertex2 : set) {
                irdag.getIncomingEdgesOf(iRVertex2).forEach(iREdge -> {
                    if (((Integer) apply.get(iREdge.getSrc())).equals(apply.get(iRVertex2))) {
                        dAGBuilder2.connectVertices(new RuntimeEdge(iREdge.getId(), iREdge.getExecutionProperties(), getActualVertexToPutIntoStage((IRVertex) iREdge.getSrc()), getActualVertexToPutIntoStage((IRVertex) iREdge.getDst())));
                    } else {
                        hashSet.add(iREdge);
                    }
                });
            }
            if (!dAGBuilder2.isEmpty()) {
                Stage stage = new Stage(generateStageId, taskIndicesToExecute, dAGBuilder2.buildWithoutSourceSinkCheck(), executionPropertyMap, arrayList);
                dAGBuilder.addVertex(stage);
                hashMap.put(Integer.valueOf(intValue), stage);
            }
            hashSet2.addAll(set);
        }
        for (IREdge iREdge2 : hashSet) {
            Stage stage2 = (Stage) hashMap.get(apply.get(iREdge2.getSrc()));
            Stage stage3 = (Stage) hashMap.get(apply.get(iREdge2.getDst()));
            if (stage2 == null || stage3 == null) {
                Object[] objArr = new Object[2];
                objArr[0] = stage2 == null ? String.format(" source stage for %s", iREdge2.getSrc()) : "";
                objArr[1] = stage3 == null ? String.format(" destination stage for %s", iREdge2.getDst()) : "";
                throw new IllegalVertexOperationException(String.format("Stage not added to the builder:%s%s", objArr));
            }
            dAGBuilder.connectVertices(new StageEdge(iREdge2.getId(), iREdge2.getExecutionProperties(), getActualVertexToPutIntoStage((IRVertex) iREdge2.getSrc()), getActualVertexToPutIntoStage((IRVertex) iREdge2.getDst()), stage2, stage3));
        }
        return dAGBuilder.build();
    }

    private IRVertex getActualVertexToPutIntoStage(IRVertex iRVertex) {
        return iRVertex instanceof SamplingVertex ? ((SamplingVertex) iRVertex).getCloneOfOriginalVertex() : iRVertex;
    }

    private List<Integer> getTaskIndicesToExecute(Set<IRVertex> set, int i, Random random) {
        if (((Set) set.stream().map(iRVertex -> {
            return Boolean.valueOf(iRVertex instanceof SamplingVertex);
        }).collect(Collectors.toSet())).size() != 1) {
            throw new IllegalArgumentException("Must be either all sampling vertices, or none: " + set.toString());
        }
        if (!(set.iterator().next() instanceof SamplingVertex)) {
            return (List) IntStream.range(0, i).boxed().collect(Collectors.toList());
        }
        int ceil = (int) Math.ceil(i * ((Float) set.stream().map(iRVertex2 -> {
            return Float.valueOf(((SamplingVertex) iRVertex2).getDesiredSampleRate());
        }).reduce(BinaryOperator.minBy((v0, v1) -> {
            return v0.compareTo(v1);
        })).orElseThrow(() -> {
            return new IllegalArgumentException(set.toString());
        })).floatValue());
        List list = (List) IntStream.range(0, i).boxed().collect(Collectors.toList());
        Collections.shuffle(list, random);
        return new ArrayList(list.subList(0, ceil));
    }

    private void integrityCheck(Stage stage) {
        if (!stage.getPropertyValue(ParallelismProperty.class).isPresent()) {
            throw new RuntimeException("Parallelism property must be set for Stage");
        }
        if (!stage.getPropertyValue(ScheduleGroupProperty.class).isPresent()) {
            throw new RuntimeException("ScheduleGroup property must be set for Stage");
        }
        stage.getIRDAG().getVertices().forEach(iRVertex -> {
            if (!(iRVertex instanceof SourceVertex) && !(iRVertex instanceof OperatorVertex)) {
                throw new UnsupportedOperationException(iRVertex.toString());
            }
        });
    }
}
