package org.apache.nemo.runtime.executor.bytetransfer;

import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
import org.apache.nemo.runtime.common.comm.ControlMessage;

/* loaded from: input_file:org/apache/nemo/runtime/executor/bytetransfer/FrameDecoder.class */
final class FrameDecoder extends ByteToMessageDecoder {
    private static final int HEADER_LENGTH = 9;
    private final ContextManager contextManager;
    private long controlBodyBytesToRead = 0;
    private long dataBodyBytesToRead = 0;
    private ByteInputContext inputContext;
    private boolean isLastFrame;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public FrameDecoder(ContextManager contextManager) {
        this.contextManager = contextManager;
    }

    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws InvalidProtocolBufferException {
        boolean onFrameStarted;
        do {
            if (this.controlBodyBytesToRead > 0) {
                onFrameStarted = onControlBodyAdded(byteBuf, list);
            } else if (this.dataBodyBytesToRead > 0) {
                onDataBodyAdded(byteBuf);
                onFrameStarted = byteBuf.readableBytes() > 0;
            } else {
                onFrameStarted = onFrameStarted(channelHandlerContext, byteBuf);
            }
        } while (onFrameStarted);
    }

    private boolean onFrameStarted(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        if (!$assertionsDisabled && this.controlBodyBytesToRead != 0) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.dataBodyBytesToRead != 0) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.inputContext != null) {
            throw new AssertionError();
        }
        if (byteBuf.readableBytes() < HEADER_LENGTH) {
            return false;
        }
        byte readByte = byteBuf.readByte();
        int readInt = byteBuf.readInt();
        long readUnsignedInt = byteBuf.readUnsignedInt();
        if (readUnsignedInt < 0) {
            throw new IllegalStateException(String.format("Frame length is negative: %d", Long.valueOf(readUnsignedInt)));
        }
        if ((readByte & 8) == 0) {
            this.controlBodyBytesToRead = readUnsignedInt;
            return true;
        }
        this.dataBodyBytesToRead = readUnsignedInt;
        ControlMessage.ByteTransferDataDirection byteTransferDataDirection = (readByte & 4) == 0 ? ControlMessage.ByteTransferDataDirection.INITIATOR_SENDS_DATA : ControlMessage.ByteTransferDataDirection.INITIATOR_RECEIVES_DATA;
        boolean z = (readByte & 2) != 0;
        this.isLastFrame = (readByte & 1) != 0;
        this.inputContext = this.contextManager.getInputContext(byteTransferDataDirection, readInt);
        if (this.inputContext == null) {
            throw new IllegalStateException(String.format("Transport context for %s:%d was not found between the localaddress %s and the remote address %s", byteTransferDataDirection, Integer.valueOf(readInt), channelHandlerContext.channel().localAddress(), channelHandlerContext.channel().remoteAddress()));
        }
        if (z) {
            this.inputContext.onNewStream();
        }
        if (this.dataBodyBytesToRead != 0) {
            return true;
        }
        onDataFrameEnd();
        return true;
    }

    private boolean onControlBodyAdded(ByteBuf byteBuf, List list) throws InvalidProtocolBufferException {
        byte[] bArr;
        int i;
        if (!$assertionsDisabled && this.controlBodyBytesToRead <= 0) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.dataBodyBytesToRead != 0) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.inputContext != null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.controlBodyBytesToRead > 2147483647L) {
            throw new AssertionError();
        }
        if (byteBuf.readableBytes() < this.controlBodyBytesToRead) {
            return false;
        }
        if (byteBuf.hasArray()) {
            bArr = byteBuf.array();
            i = byteBuf.arrayOffset() + byteBuf.readerIndex();
        } else {
            bArr = new byte[(int) this.controlBodyBytesToRead];
            byteBuf.getBytes(byteBuf.readerIndex(), bArr, 0, (int) this.controlBodyBytesToRead);
            i = 0;
        }
        list.add((ControlMessage.ByteTransferContextSetupMessage) ControlMessage.ByteTransferContextSetupMessage.PARSER.parseFrom(bArr, i, (int) this.controlBodyBytesToRead));
        byteBuf.skipBytes((int) this.controlBodyBytesToRead);
        this.controlBodyBytesToRead = 0L;
        return true;
    }

    private void onDataBodyAdded(ByteBuf byteBuf) {
        if (!$assertionsDisabled && this.controlBodyBytesToRead != 0) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.dataBodyBytesToRead <= 0) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.inputContext == null) {
            throw new AssertionError();
        }
        long min = Math.min(this.dataBodyBytesToRead, byteBuf.readableBytes());
        if (!$assertionsDisabled && min > 2147483647L) {
            throw new AssertionError();
        }
        this.inputContext.onByteBuf(byteBuf.readSlice((int) min).retain());
        this.dataBodyBytesToRead -= min;
        if (this.dataBodyBytesToRead == 0) {
            onDataFrameEnd();
        }
    }

    private void onDataFrameEnd() {
        if (this.isLastFrame) {
            this.inputContext.onContextClose();
        }
        this.inputContext = null;
    }

    static {
        $assertionsDisabled = !FrameDecoder.class.desiredAssertionStatus();
    }
}
