package org.apache.nemo.runtime.executor.data;

import com.google.protobuf.InvalidProtocolBufferException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import org.apache.nemo.common.Pair;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
import org.apache.nemo.runtime.common.plan.RuntimeEdge;
import org.apache.nemo.runtime.common.plan.StageEdge;
import org.apache.nemo.runtime.executor.bytetransfer.ByteInputContext;
import org.apache.nemo.runtime.executor.bytetransfer.ByteOutputContext;
import org.apache.nemo.runtime.executor.bytetransfer.ByteTransfer;
import org.apache.nemo.runtime.executor.data.DataUtil;
import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
import org.apache.reef.tang.annotations.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/nemo/runtime/executor/data/PipeManagerWorker.class */
public final class PipeManagerWorker {
    private static final Logger LOG = LoggerFactory.getLogger(PipeManagerWorker.class.getName());
    private final String executorId;
    private final SerializerManager serializerManager;
    private final ByteTransfer byteTransfer;
    private final PipeContainer pipeContainer = new PipeContainer();
    private final PersistentConnectionToMasterMap toMaster;

    @Inject
    private PipeManagerWorker(@Parameter(JobConf.ExecutorId.class) String str, ByteTransfer byteTransfer, SerializerManager serializerManager, PersistentConnectionToMasterMap persistentConnectionToMasterMap) {
        this.executorId = str;
        this.byteTransfer = byteTransfer;
        this.serializerManager = serializerManager;
        this.toMaster = persistentConnectionToMasterMap;
    }

    public CompletableFuture<DataUtil.IteratorWithNumBytes> read(int i, RuntimeEdge runtimeEdge, int i2) {
        String id = runtimeEdge.getId();
        return this.toMaster.getMessageSender("PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID").request(ControlMessage.Message.newBuilder().setId(RuntimeIdManager.generateMessageId()).setListenerId("PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID").setType(ControlMessage.MessageType.RequestPipeLoc).setRequestPipeLocMsg(ControlMessage.RequestPipeLocationMessage.newBuilder().setExecutorId(this.executorId).setRuntimeEdgeId(id).setSrcTaskIndex(i).build()).build()).thenCompose(message -> {
            if (message.getType() != ControlMessage.MessageType.PipeLocInfo) {
                throw new RuntimeException("Response message type mismatch!");
            }
            if (!message.getPipeLocInfoMsg().hasExecutorId()) {
                throw new IllegalStateException();
            }
            return this.byteTransfer.newInputContext(message.getPipeLocInfoMsg().getExecutorId(), ControlMessage.PipeTransferContextDescriptor.newBuilder().setRuntimeEdgeId(id).setSrcTaskIndex(i).setDstTaskIndex(i2).setNumPipeToWait(getNumOfPipeToWait(runtimeEdge)).build().toByteArray(), true).thenApply(byteInputContext -> {
                return new DataUtil.InputStreamIterator(byteInputContext.getInputStreams(), this.serializerManager.getSerializer(id));
            });
        });
    }

    public void notifyMaster(String str, long j) {
        this.toMaster.getMessageSender("PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID").send(ControlMessage.Message.newBuilder().setId(RuntimeIdManager.generateMessageId()).setListenerId("PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID").setType(ControlMessage.MessageType.PipeInit).setPipeInitMsg(ControlMessage.PipeInitMessage.newBuilder().setRuntimeEdgeId(str).setSrcTaskIndex(j).setExecutorId(this.executorId).build()).build());
    }

    public List<ByteOutputContext> getOutputContexts(RuntimeEdge runtimeEdge, long j) {
        Pair<String, Long> of = Pair.of(runtimeEdge.getId(), Long.valueOf(j));
        this.pipeContainer.putPipeListIfAbsent(of, getNumOfPipeToWait(runtimeEdge));
        return this.pipeContainer.getPipes(of);
    }

    public Serializer getSerializer(String str) {
        return this.serializerManager.getSerializer(str);
    }

    public void onOutputContext(ByteOutputContext byteOutputContext) throws InvalidProtocolBufferException {
        ControlMessage.PipeTransferContextDescriptor pipeTransferContextDescriptor = (ControlMessage.PipeTransferContextDescriptor) ControlMessage.PipeTransferContextDescriptor.PARSER.parseFrom(byteOutputContext.getContextDescriptor());
        long srcTaskIndex = pipeTransferContextDescriptor.getSrcTaskIndex();
        String runtimeEdgeId = pipeTransferContextDescriptor.getRuntimeEdgeId();
        int dstTaskIndex = (int) pipeTransferContextDescriptor.getDstTaskIndex();
        int numPipeToWait = (int) pipeTransferContextDescriptor.getNumPipeToWait();
        Pair<String, Long> of = Pair.of(runtimeEdgeId, Long.valueOf(srcTaskIndex));
        this.pipeContainer.putPipeListIfAbsent(of, numPipeToWait);
        this.pipeContainer.putPipe(of, dstTaskIndex, byteOutputContext);
    }

    public void onInputContext(ByteInputContext byteInputContext) throws InvalidProtocolBufferException {
        throw new UnsupportedOperationException();
    }

    private int getNumOfPipeToWait(RuntimeEdge runtimeEdge) {
        int intValue = ((Integer) ((StageEdge) runtimeEdge).getDstIRVertex().getPropertyValue(ParallelismProperty.class).orElseThrow(() -> {
            return new IllegalStateException();
        })).intValue();
        if (((CommunicationPatternProperty.Value) ((StageEdge) runtimeEdge).getPropertyValue(CommunicationPatternProperty.class).orElseThrow(() -> {
            return new IllegalStateException();
        })).equals(CommunicationPatternProperty.Value.OneToOne)) {
            return 1;
        }
        return intValue;
    }
}
