package org.apache.nemo.runtime.executor.datatransfer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.nemo.common.HashRange;
import org.apache.nemo.common.KeyRange;
import org.apache.nemo.common.exception.BlockFetchException;
import org.apache.nemo.common.exception.UnsupportedCommPatternException;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.plan.RuntimeEdge;
import org.apache.nemo.runtime.common.plan.StageEdge;
import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
import org.apache.nemo.runtime.executor.data.DataUtil;

/* loaded from: input_file:org/apache/nemo/runtime/executor/datatransfer/BlockInputReader.class */
public final class BlockInputReader implements InputReader {
    private final BlockManagerWorker blockManagerWorker;
    private final int dstTaskIndex;
    private final IRVertex srcVertex;
    private final RuntimeEdge runtimeEdge;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BlockInputReader(int i, IRVertex iRVertex, RuntimeEdge runtimeEdge, BlockManagerWorker blockManagerWorker) {
        this.dstTaskIndex = i;
        this.srcVertex = iRVertex;
        this.runtimeEdge = runtimeEdge;
        this.blockManagerWorker = blockManagerWorker;
    }

    @Override // org.apache.nemo.runtime.executor.datatransfer.InputReader
    public List<CompletableFuture<DataUtil.IteratorWithNumBytes>> read() {
        Optional propertyValue = this.runtimeEdge.getPropertyValue(CommunicationPatternProperty.class);
        if (((CommunicationPatternProperty.Value) propertyValue.get()).equals(CommunicationPatternProperty.Value.OneToOne)) {
            return Collections.singletonList(readOneToOne());
        }
        if (((CommunicationPatternProperty.Value) propertyValue.get()).equals(CommunicationPatternProperty.Value.BroadCast)) {
            return readBroadcast();
        }
        if (((CommunicationPatternProperty.Value) propertyValue.get()).equals(CommunicationPatternProperty.Value.Shuffle)) {
            return readDataInRange();
        }
        throw new UnsupportedCommPatternException(new Exception("Communication pattern not supported"));
    }

    @Override // org.apache.nemo.runtime.executor.datatransfer.InputReader
    public IRVertex getSrcIrVertex() {
        return this.srcVertex;
    }

    private String generateWildCardBlockId(int i) {
        Optional propertyValue = this.runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
        return (!propertyValue.isPresent() || ((DuplicateEdgeGroupPropertyValue) propertyValue.get()).getGroupSize() <= 1) ? RuntimeIdManager.generateBlockIdWildcard(this.runtimeEdge.getId(), i) : RuntimeIdManager.generateBlockIdWildcard(((DuplicateEdgeGroupPropertyValue) propertyValue.get()).getRepresentativeEdgeId(), i);
    }

    private CompletableFuture<DataUtil.IteratorWithNumBytes> readOneToOne() {
        return this.blockManagerWorker.readBlock(generateWildCardBlockId(this.dstTaskIndex), this.runtimeEdge.getId(), (DataStoreProperty.Value) this.runtimeEdge.getPropertyValue(DataStoreProperty.class).get(), HashRange.all());
    }

    private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readBroadcast() {
        int sourceParallelism = InputReader.getSourceParallelism(this);
        Optional propertyValue = this.runtimeEdge.getPropertyValue(DataStoreProperty.class);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < sourceParallelism; i++) {
            arrayList.add(this.blockManagerWorker.readBlock(generateWildCardBlockId(i), this.runtimeEdge.getId(), (DataStoreProperty.Value) propertyValue.get(), HashRange.all()));
        }
        return arrayList;
    }

    private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readDataInRange() {
        if (!$assertionsDisabled && !(this.runtimeEdge instanceof StageEdge)) {
            throw new AssertionError();
        }
        Optional propertyValue = this.runtimeEdge.getPropertyValue(DataStoreProperty.class);
        this.runtimeEdge.getTaskIdxToKeyRange().get(Integer.valueOf(this.dstTaskIndex));
        KeyRange keyRange = (KeyRange) this.runtimeEdge.getTaskIdxToKeyRange().get(Integer.valueOf(this.dstTaskIndex));
        if (keyRange == null) {
            throw new BlockFetchException(new Throwable("The hash range to read is not assigned to " + this.dstTaskIndex + "'th task"));
        }
        int sourceParallelism = InputReader.getSourceParallelism(this);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < sourceParallelism; i++) {
            arrayList.add(this.blockManagerWorker.readBlock(generateWildCardBlockId(i), this.runtimeEdge.getId(), (DataStoreProperty.Value) propertyValue.get(), keyRange));
        }
        return arrayList;
    }

    static {
        $assertionsDisabled = !BlockInputReader.class.desiredAssertionStatus();
    }
}
