package org.apache.nemo.compiler.optimizer.pass.runtime;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.nemo.common.HashRange;
import org.apache.nemo.common.KeyRange;
import org.apache.nemo.common.Pair;
import org.apache.nemo.common.exception.RuntimeOptimizationException;
import org.apache.nemo.common.ir.IRDAG;
import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
import org.apache.nemo.common.ir.edge.executionproperty.SubPartitionSetProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.EnableDynamicTaskSizingProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nemo/compiler/optimizer/pass/runtime/DynamicTaskSizingRuntimePass.class */
public final class DynamicTaskSizingRuntimePass extends RunTimePass<Map<String, Long>> {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicTaskSizingRuntimePass.class.getName());
    private static final int PARTITIONER_PROPERTY_FOR_SMALL_JOB = 1024;
    private static final int PARTITIONER_PROPERTY_FOR_MEDIUM_JOB = 2048;
    private static final int PARTITIONER_PROPERTY_FOR_LARGE_JOB = 4096;
    private static final int LOWER_BOUND_SMALL_JOB_GB = 1;
    private static final int LOWER_BOUND_MEDIUM_JOB_GB = 10;
    private static final int LOWER_BOUND_LARGE_JOB_GB = 100;
    private final String mapKey = "opt.parallelism";

    @Override // java.util.function.BiFunction
    public IRDAG apply(IRDAG irdag, Message<Map<String, Long>> message) {
        Set<IREdge> examinedEdges = message.getExaminedEdges();
        Set set = (Set) examinedEdges.stream().map((v0) -> {
            return v0.getDst();
        }).collect(Collectors.toSet());
        irdag.topologicalDo(iRVertex -> {
            if (set.contains(iRVertex)) {
                examinedEdges.addAll(irdag.getIncomingEdgesOf(iRVertex));
            }
        });
        LOG.info("Examined edges {}", examinedEdges.stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        if (!((Boolean) examinedEdges.iterator().next().getDst().getPropertyValue(EnableDynamicTaskSizingProperty.class).orElse(false)).booleanValue()) {
            return irdag;
        }
        Map<String, Long> messageValue = message.getMessageValue();
        LOG.info("messageValue {}", messageValue);
        int intValue = messageValue.get("opt.parallelism").intValue();
        int partitionerProperty = getPartitionerProperty(irdag);
        for (IREdge iREdge : examinedEdges) {
            if (CommunicationPatternProperty.Value.SHUFFLE.equals(iREdge.getPropertyValue(CommunicationPatternProperty.class).get()) && partitionerProperty != ((Integer) ((Pair) iREdge.getPropertyValue(PartitionerProperty.class).get()).right()).intValue()) {
                throw new IllegalArgumentException();
            }
        }
        int i = partitionerProperty / intValue;
        examinedEdges.forEach(iREdge2 -> {
            setSubPartitionSetProperty(iREdge2, i, partitionerProperty);
        });
        examinedEdges.forEach(iREdge3 -> {
            setDstVertexParallelismProperty(iREdge3, i, partitionerProperty);
        });
        return irdag;
    }

    private int getPartitionerProperty(IRDAG irdag) {
        long longValue = irdag.getInputSize().longValue() / 1073741824;
        if (longValue < 1) {
            throw new RuntimeOptimizationException(String.format("Job size must be greater than %d GB to run DynamicTaskSizingRuntimePass", Integer.valueOf(LOWER_BOUND_SMALL_JOB_GB)));
        }
        return longValue < 10 ? PARTITIONER_PROPERTY_FOR_SMALL_JOB : longValue < 100 ? PARTITIONER_PROPERTY_FOR_MEDIUM_JOB : PARTITIONER_PROPERTY_FOR_LARGE_JOB;
    }

    private void setSubPartitionSetProperty(IREdge iREdge, int i, int i2) {
        List list = (List) iREdge.getPropertyValue(SubPartitionSetProperty.class).orElseThrow(() -> {
            return new RuntimeOptimizationException("SubPartitionSet Property of edge is missing.");
        });
        if (list.isEmpty()) {
            return;
        }
        int intValue = ((Integer) ((KeyRange) list.get(0)).rangeBeginInclusive()).intValue();
        ArrayList arrayList = new ArrayList();
        int i3 = 0;
        int i4 = intValue;
        while (true) {
            int i5 = i4;
            if (i5 >= i2) {
                iREdge.setPropertyPermanently(SubPartitionSetProperty.of(arrayList));
                return;
            } else {
                arrayList.add(i3, HashRange.of(i5, i5 + i));
                i3 += LOWER_BOUND_SMALL_JOB_GB;
                i4 = i5 + i;
            }
        }
    }

    private void setDstVertexParallelismProperty(IREdge iREdge, int i, int i2) {
        iREdge.getDst().setPropertyPermanently(ParallelismProperty.of(Integer.valueOf((i2 - ((Integer) ((KeyRange) ((List) iREdge.getPropertyValue(SubPartitionSetProperty.class).orElseThrow(() -> {
            return new RuntimeOptimizationException("SubpartitionSet Property of the edge is missing.");
        })).get(0)).rangeBeginInclusive()).intValue()) / i)));
    }
}
