package org.apache.nemo.compiler.optimizer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.inject.Inject;
import net.jcip.annotations.NotThreadSafe;
import org.apache.nemo.common.Util;
import org.apache.nemo.common.dag.DAGBuilder;
import org.apache.nemo.common.exception.CompileTimeOptimizationException;
import org.apache.nemo.common.ir.IRDAG;
import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.CacheIDProperty;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import org.apache.nemo.common.ir.vertex.CachedSourceVertex;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.executionproperty.IgnoreSchedulingTempDataReceiverProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import org.apache.nemo.compiler.optimizer.pass.runtime.Message;
import org.apache.nemo.compiler.optimizer.policy.Policy;
import org.apache.nemo.compiler.optimizer.policy.XGBoostPolicy;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.ClientRPC;
import org.apache.reef.tang.annotations.Parameter;

@NotThreadSafe
/* loaded from: input_file:org/apache/nemo/compiler/optimizer/NemoOptimizer.class */
public final class NemoOptimizer implements Optimizer {
    private final String dagDirectory;
    private final Policy optimizationPolicy;
    private final String environmentTypeStr;
    private final String executorInfoContents;
    private final ClientRPC clientRPC;
    private final Map<UUID, Integer> cacheIdToParallelism = new HashMap();
    private int irDagCount = 0;

    @Inject
    private NemoOptimizer(@Parameter(JobConf.DAGDirectory.class) String str, @Parameter(JobConf.OptimizationPolicy.class) String str2, @Parameter(JobConf.EnvironmentType.class) String str3, @Parameter(JobConf.ExecutorJSONContents.class) String str4, ClientRPC clientRPC) {
        this.dagDirectory = str;
        this.environmentTypeStr = OptimizerUtils.filterEnvironmentTypeString(str3);
        this.executorInfoContents = str4;
        this.clientRPC = clientRPC;
        try {
            this.optimizationPolicy = (Policy) Class.forName(str2).newInstance();
            if (str2 == null) {
                throw new CompileTimeOptimizationException("A policy name should be specified.");
            }
        } catch (Exception e) {
            throw new CompileTimeOptimizationException(e);
        }
    }

    @Override // org.apache.nemo.compiler.optimizer.Optimizer
    public IRDAG optimizeAtCompileTime(IRDAG irdag) {
        StringBuilder append = new StringBuilder().append("ir-");
        int i = this.irDagCount;
        this.irDagCount = i + 1;
        String sb = append.append(i).append("-").toString();
        irdag.storeJSON(this.dagDirectory, sb, "IR before optimization");
        HashMap hashMap = new HashMap();
        IRDAG handleCaching = handleCaching(irdag, hashMap);
        if (!hashMap.isEmpty()) {
            handleCaching.storeJSON(this.dagDirectory, sb + "FilterCache", "IR after cache filtering");
        }
        beforeCompileTimeOptimization(irdag, this.optimizationPolicy);
        IRDAG runCompileTimeOptimization = this.optimizationPolicy.runCompileTimeOptimization(handleCaching, this.dagDirectory);
        runCompileTimeOptimization.storeJSON(this.dagDirectory, sb + this.optimizationPolicy.getClass().getSimpleName(), "IR optimized for " + this.optimizationPolicy.getClass().getSimpleName());
        hashMap.forEach((uuid, iREdge) -> {
            if (this.cacheIdToParallelism.containsKey(uuid)) {
                return;
            }
            this.cacheIdToParallelism.put(uuid, (Integer) runCompileTimeOptimization.getVertexById(iREdge.getDst().getId()).getPropertyValue(ParallelismProperty.class).orElseThrow(() -> {
                return new RuntimeException("No parallelism on an IR vertex.");
            }));
        });
        return runCompileTimeOptimization;
    }

    @Override // org.apache.nemo.compiler.optimizer.Optimizer
    public IRDAG optimizeAtRunTime(IRDAG irdag, Message message) {
        return this.optimizationPolicy.runRunTimeOptimizations(irdag, message);
    }

    private void beforeCompileTimeOptimization(IRDAG irdag, Policy policy) {
        irdag.recordExecutorInfo(Util.parseResourceSpecificationString(this.executorInfoContents));
        if (policy instanceof XGBoostPolicy) {
            this.clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder().setType(ControlMessage.DriverToClientMessageType.LaunchOptimization).setOptimizationType(ControlMessage.OptimizationType.XGBoost).setDataCollected(ControlMessage.DataCollectMessage.newBuilder().setData(irdag.irDAGSummary() + this.environmentTypeStr).build()).build());
        }
    }

    private IRDAG handleCaching(IRDAG irdag, Map<UUID, IREdge> map) {
        irdag.topologicalDo(iRVertex -> {
            irdag.getIncomingEdgesOf(iRVertex).forEach(iREdge -> {
                iREdge.getPropertyValue(CacheIDProperty.class).ifPresent(uuid -> {
                    map.put(uuid, iREdge);
                });
            });
        });
        if (map.isEmpty()) {
            return irdag;
        }
        DAGBuilder dAGBuilder = new DAGBuilder();
        List list = (List) irdag.getVertices().stream().filter(iRVertex2 -> {
            return irdag.getOutgoingEdgesOf(iRVertex2).isEmpty();
        }).collect(Collectors.toList());
        Objects.requireNonNull(dAGBuilder);
        list.forEach((v1) -> {
            r1.addVertex(v1);
        });
        list.forEach(iRVertex3 -> {
            addNonCachedVerticesAndEdges(irdag, iRVertex3, dAGBuilder);
        });
        return new IRDAG(dAGBuilder.buildWithoutSourceCheck());
    }

    private void addNonCachedVerticesAndEdges(IRDAG irdag, IRVertex iRVertex, DAGBuilder<IRVertex, IREdge> dAGBuilder) {
        if (((Boolean) iRVertex.getPropertyValue(IgnoreSchedulingTempDataReceiverProperty.class).orElse(false)).booleanValue() && irdag.getIncomingEdgesOf(iRVertex).stream().filter(iREdge -> {
            return iREdge.getPropertyValue(CacheIDProperty.class).isPresent();
        }).anyMatch(iREdge2 -> {
            return this.cacheIdToParallelism.containsKey(iREdge2.getPropertyValue(CacheIDProperty.class).get());
        })) {
            dAGBuilder.removeVertex(iRVertex);
        } else {
            irdag.getIncomingEdgesOf(iRVertex).stream().forEach(iREdge3 -> {
                Optional findFirst = irdag.getOutgoingEdgesOf(iREdge3.getSrc()).stream().filter(iREdge3 -> {
                    return iREdge3.getPropertyValue(CacheIDProperty.class).isPresent();
                }).map(iREdge4 -> {
                    return (UUID) iREdge4.getPropertyValue(CacheIDProperty.class).get();
                }).findFirst();
                if (!findFirst.isPresent() || this.cacheIdToParallelism.get(findFirst.get()) == null) {
                    IRVertex iRVertex2 = (IRVertex) iREdge3.getSrc();
                    dAGBuilder.addVertex(iRVertex2);
                    dAGBuilder.connectVertices(iREdge3);
                    addNonCachedVerticesAndEdges(irdag, iRVertex2, dAGBuilder);
                    return;
                }
                CachedSourceVertex cachedSourceVertex = new CachedSourceVertex(this.cacheIdToParallelism.get(findFirst.get()).intValue());
                cachedSourceVertex.setPropertyPermanently(ParallelismProperty.of(this.cacheIdToParallelism.get(findFirst.get())));
                dAGBuilder.addVertex(cachedSourceVertex);
                IREdge iREdge5 = new IREdge((CommunicationPatternProperty.Value) iREdge3.getPropertyValue(CommunicationPatternProperty.class).orElseThrow(() -> {
                    return new RuntimeException("No communication pattern on an ir edge");
                }), cachedSourceVertex, iRVertex);
                iREdge3.copyExecutionPropertiesTo(iREdge5);
                iREdge5.setProperty(CacheIDProperty.of((UUID) findFirst.get()));
                dAGBuilder.connectVertices(iREdge5);
            });
        }
    }
}
