package org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nemo.common.KeyExtractor;
import org.apache.nemo.common.coder.DecoderFactory;
import org.apache.nemo.common.coder.EncoderFactory;
import org.apache.nemo.common.ir.IRDAG;
import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
import org.apache.nemo.common.ir.edge.executionproperty.KeyDecoderProperty;
import org.apache.nemo.common.ir.edge.executionproperty.KeyEncoderProperty;
import org.apache.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
import org.apache.nemo.common.ir.edge.executionproperty.SubPartitionSetProperty;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.executionproperty.EnableDynamicTaskSizingProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import org.apache.nemo.common.ir.vertex.utility.TaskSizeSplitterVertex;
import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.Annotates;
import org.apache.nemo.runtime.common.plan.StagePartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Annotates({EnableDynamicTaskSizingProperty.class, PartitionerProperty.class, SubPartitionSetProperty.class, ParallelismProperty.class})
/* loaded from: input_file:org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SamplingTaskSizingPass.class */
public final class SamplingTaskSizingPass extends ReshapingPass {
    private static final Logger LOG = LoggerFactory.getLogger(SamplingTaskSizingPass.class.getName());
    private static final int PARTITIONER_PROPERTY_FOR_SMALL_JOB = 1024;
    private static final int PARTITIONER_PROPERTY_FOR_MEDIUM_JOB = 2048;
    private static final int PARTITIONER_PROPERTY_FOR_LARGE_JOB = 4096;
    private final StagePartitioner stagePartitioner;

    public SamplingTaskSizingPass() {
        super(SamplingTaskSizingPass.class);
        this.stagePartitioner = new StagePartitioner();
    }

    @Override // java.util.function.Function
    public IRDAG apply(IRDAG irdag) {
        boolean isDTSEnabledByJobSize = isDTSEnabledByJobSize(irdag);
        if (!isDTSEnabledByJobSize) {
            return irdag;
        }
        irdag.topologicalDo(iRVertex -> {
            iRVertex.setProperty(EnableDynamicTaskSizingProperty.of(Boolean.valueOf(isDTSEnabledByJobSize)));
        });
        int partitionerPropertyByJobSize = getPartitionerPropertyByJobSize(irdag);
        Map apply = this.stagePartitioner.apply(irdag);
        HashMap hashMap = new HashMap();
        apply.forEach((iRVertex2, num) -> {
            if (!hashMap.containsKey(num)) {
                hashMap.put(num, new HashSet());
            }
            ((Set) hashMap.get(num)).add(iRVertex2);
        });
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        irdag.topologicalDo(iRVertex3 -> {
            for (IREdge iREdge : irdag.getIncomingEdgesOf(iRVertex3)) {
                if (isAppropriateForInsertingSplitterVertex(irdag, iRVertex3, iREdge, apply, hashMap)) {
                    hashSet.add((Integer) apply.get(iRVertex3));
                    hashSet2.add(iREdge);
                }
            }
        });
        irdag.topologicalDo(iRVertex4 -> {
            for (IREdge iREdge : irdag.getIncomingEdgesOf(iRVertex4)) {
                if (hashSet2.contains(iREdge)) {
                    hashSet2.remove(iREdge);
                    iREdge.setProperty(PartitionerProperty.of(PartitionerProperty.Type.HASH, partitionerPropertyByJobSize));
                    hashSet2.add(iREdge);
                }
            }
        });
        List topologicalSort = irdag.getTopologicalSort();
        Collections.reverse(topologicalSort);
        Iterator it = topologicalSort.iterator();
        while (it.hasNext()) {
            for (IREdge iREdge : irdag.getOutgoingEdgesOf((IRVertex) it.next())) {
                if (hashSet2.contains(iREdge)) {
                    Set<IRVertex> set = (Set) hashMap.get(apply.get(iREdge.getDst()));
                    HashSet hashSet3 = new HashSet();
                    for (IRVertex iRVertex5 : set) {
                        Iterator it2 = ((Set) irdag.getOutgoingEdgesOf(iRVertex5).stream().map((v0) -> {
                            return v0.getDst();
                        }).collect(Collectors.toSet())).iterator();
                        while (it2.hasNext()) {
                            if (!set.contains((IRVertex) it2.next())) {
                                hashSet3.add(iRVertex5);
                            }
                        }
                    }
                    Set<IRVertex> set2 = (Set) set.stream().filter(iRVertex6 -> {
                        if (!irdag.getOutgoingEdgesOf(iRVertex6).isEmpty()) {
                            Stream map = irdag.getOutgoingEdgesOf(iRVertex6).stream().map((v0) -> {
                                return v0.getDst();
                            });
                            Objects.requireNonNull(set);
                            if (map.anyMatch((v1) -> {
                                return r1.contains(v1);
                            })) {
                                return false;
                            }
                        }
                        return true;
                    }).collect(Collectors.toSet());
                    Stream map = set.stream().flatMap(iRVertex7 -> {
                        return irdag.getIncomingEdgesOf(iRVertex7).stream();
                    }).map((v0) -> {
                        return v0.getSrc();
                    });
                    Objects.requireNonNull(set);
                    if (map.allMatch((v1) -> {
                        return r1.contains(v1);
                    })) {
                        break;
                    }
                    insertSplitterVertex(irdag, set, Collections.singleton(iREdge.getDst()), hashSet3, set2, partitionerPropertyByJobSize);
                }
            }
        }
        return irdag;
    }

    private boolean isDTSEnabledByJobSize(IRDAG irdag) {
        return irdag.getInputSize().longValue() >= 1073741824;
    }

    private int getPartitionerPropertyByJobSize(IRDAG irdag) {
        long longValue = irdag.getInputSize().longValue() / 1073741824;
        return (1 > longValue || longValue >= 10) ? (10 > longValue || longValue >= 100) ? PARTITIONER_PROPERTY_FOR_LARGE_JOB : PARTITIONER_PROPERTY_FOR_MEDIUM_JOB : PARTITIONER_PROPERTY_FOR_SMALL_JOB;
    }

    private boolean isAppropriateForInsertingSplitterVertex(IRDAG irdag, IRVertex iRVertex, IREdge iREdge, Map<IRVertex, Integer> map, Map<Integer, Set<IRVertex>> map2) {
        if (!CommunicationPatternProperty.Value.SHUFFLE.equals(iREdge.getPropertyValue(CommunicationPatternProperty.class).get()) || irdag.getIncomingEdgesOf(iRVertex).size() > 1 || irdag.getOutgoingEdgesOf(iREdge.getSrc()).size() > 1) {
            return false;
        }
        Set<IRVertex> set = map2.get(map.get(iRVertex));
        Set set2 = (Set) set.stream().flatMap(iRVertex2 -> {
            return irdag.getOutgoingEdgesOf(iRVertex2).stream();
        }).filter(iREdge2 -> {
            return !set.contains(iREdge2.getDst());
        }).collect(Collectors.toSet());
        if (set2.isEmpty()) {
            return true;
        }
        Iterator it = set2.iterator();
        while (it.hasNext()) {
            if (CommunicationPatternProperty.Value.ONE_TO_ONE.equals(((IREdge) it.next()).getPropertyValue(CommunicationPatternProperty.class).get())) {
                return false;
            }
        }
        return true;
    }

    private void insertSplitterVertex(IRDAG irdag, Set<IRVertex> set, Set<IRVertex> set2, Set<IRVertex> set3, Set<IRVertex> set4, int i) {
        TaskSizeSplitterVertex taskSizeSplitterVertex = new TaskSizeSplitterVertex("Splitter" + set2.iterator().next().getId(), set, set2, set3, set4, (Set) set.stream().flatMap(iRVertex -> {
            return irdag.getIncomingEdgesOf(iRVertex).stream();
        }).filter(iREdge -> {
            return set.contains(iREdge.getSrc());
        }).collect(Collectors.toSet()), i);
        taskSizeSplitterVertex.setMaxNumberOfIterations(2);
        irdag.insert(taskSizeSplitterVertex);
        taskSizeSplitterVertex.printLogs();
    }

    private IREdge changeOneToOneEdgeToShuffleEdge(IREdge iREdge, IREdge iREdge2, int i) {
        if (!CommunicationPatternProperty.Value.ONE_TO_ONE.equals(iREdge.getPropertyValue(CommunicationPatternProperty.class).get()) || !CommunicationPatternProperty.Value.SHUFFLE.equals(iREdge2.getPropertyValue(CommunicationPatternProperty.class).get())) {
            return iREdge;
        }
        iREdge.setProperty(CommunicationPatternProperty.of(CommunicationPatternProperty.Value.SHUFFLE));
        iREdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.PULL));
        iREdge.setProperty(PartitionerProperty.of(PartitionerProperty.Type.HASH, i));
        iREdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LOCAL_FILE_STORE));
        if (!iREdge.getPropertyValue(KeyExtractorProperty.class).isPresent()) {
            iREdge.setProperty(KeyExtractorProperty.of((KeyExtractor) iREdge2.getPropertyValue(KeyExtractorProperty.class).get()));
        }
        if (!iREdge.getPropertyValue(KeyEncoderProperty.class).isPresent()) {
            iREdge.setProperty(KeyEncoderProperty.of((EncoderFactory) iREdge2.getPropertyValue(KeyEncoderProperty.class).get()));
        }
        if (!iREdge.getPropertyValue(KeyDecoderProperty.class).isPresent()) {
            iREdge.setProperty(KeyDecoderProperty.of((DecoderFactory) iREdge2.getPropertyValue(KeyDecoderProperty.class).get()));
        }
        return iREdge;
    }
}
