package org.apache.spark.network;

import java.nio.ByteBuffer;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import scala.Function2;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: Connection.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005=d!B\u0001\u0003\u0001\u0011Q!a\u0005*fG\u0016Lg/\u001b8h\u0007>tg.Z2uS>t'BA\u0002\u0005\u0003\u001dqW\r^<pe.T!!\u0002\u0004\u0002\u000bM\u0004\u0018M]6\u000b\u0005\u001dA\u0011AB1qC\u000eDWMC\u0001\n\u0003\ry'oZ\n\u0003\u0001-\u0001\"\u0001D\u0007\u000e\u0003\tI!A\u0004\u0002\u0003\u0015\r{gN\\3di&|g\u000e\u0003\u0005\u0011\u0001\t\u0005\t\u0015!\u0003\u0013\u0003!\u0019\u0007.\u00198oK2|6\u0001\u0001\t\u0003'ii\u0011\u0001\u0006\u0006\u0003+Y\t\u0001b\u00195b]:,Gn\u001d\u0006\u0003/a\t1A\\5p\u0015\u0005I\u0012\u0001\u00026bm\u0006L!a\u0007\u000b\u0003\u001bM{7m[3u\u0007\"\fgN\\3m\u0011!i\u0002A!A!\u0002\u0013q\u0012!C:fY\u0016\u001cGo\u001c:`!\t\u0019r$\u0003\u0002!)\tA1+\u001a7fGR|'\u000fC\u0003#\u0001\u0011\u00051%\u0001\u0004=S:LGO\u0010\u000b\u0004I\u00152\u0003C\u0001\u0007\u0001\u0011\u0015\u0001\u0012\u00051\u0001\u0013\u0011\u0015i\u0012\u00051\u0001\u001f\r\u0011A\u0003\u0001A\u0015\u0003\u000b%s'm\u001c=\u0014\u0005\u001dR\u0003CA\u0016/\u001b\u0005a#\"A\u0017\u0002\u000bM\u001c\u0017\r\\1\n\u0005=b#AB!osJ+g\rC\u0003#O\u0011\u0005\u0011\u0007F\u00013!\t\u0019t%D\u0001\u0001\u0011\u001d)tE1A\u0005\u0002Y\n\u0001\"\\3tg\u0006<Wm]\u000b\u0002oA!\u0001(P C\u001b\u0005I$B\u0001\u001e<\u0003\u001diW\u000f^1cY\u0016T!\u0001\u0010\u0017\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002?s\t9\u0001*Y:i\u001b\u0006\u0004\bCA\u0016A\u0013\t\tEFA\u0002J]R\u0004\"\u0001D\"\n\u0005\u0011\u0013!!\u0004\"vM\u001a,'/T3tg\u0006<W\r\u0003\u0004GO\u0001\u0006IaN\u0001\n[\u0016\u001c8/Y4fg\u0002BQ\u0001S\u0014\u0005\u0002%\u000b\u0001bZ3u\u0007\",hn\u001b\u000b\u0003\u0015B\u00032aK&N\u0013\taEF\u0001\u0004PaRLwN\u001c\t\u0003\u00199K!a\u0014\u0002\u0003\u00195+7o]1hK\u000eCWO\\6\t\u000bE;\u0005\u0019\u0001*\u0002\r!,\u0017\rZ3s!\ta1+\u0003\u0002U\u0005\t\u0011R*Z:tC\u001e,7\t[;oW\"+\u0017\rZ3s\u0011\u00151v\u0005\"\u0001X\u0003I9W\r^'fgN\fw-\u001a$pe\u000eCWO\\6\u0015\u0005aK\u0006cA\u0016L\u0005\")!,\u0016a\u0001\u001b\u0006)1\r[;oW\")Al\nC\u0001;\u0006i!/Z7pm\u0016lUm]:bO\u0016$\"AX1\u0011\u0005-z\u0016B\u00011-\u0005\u0011)f.\u001b;\t\u000b\t\\\u0006\u0019A2\u0002\u000f5,7o]1hKB\u0011A\u0002Z\u0005\u0003K\n\u0011q!T3tg\u0006<W\rC\u0004h\u0001\u0001\u0007I\u0011\u00025\u0002/%tg-\u001a:sK\u0012\u0014V-\\8uK6\u000bg.Y4fe&#W#A5\u0011\u00051Q\u0017BA6\u0003\u0005M\u0019uN\u001c8fGRLwN\\'b]\u0006<WM]%e\u0011\u001di\u0007\u00011A\u0005\n9\f1$\u001b8gKJ\u0014X\r\u001a*f[>$X-T1oC\u001e,'/\u00133`I\u0015\fHC\u00010p\u0011\u001d\u0001H.!AA\u0002%\f1\u0001\u001f\u00132\u0011\u0019\u0011\b\u0001)Q\u0005S\u0006A\u0012N\u001c4feJ,GMU3n_R,W*\u00198bO\u0016\u0014\u0018\n\u001a\u0011)\u0005E$\bCA\u0016v\u0013\t1HF\u0001\u0005w_2\fG/\u001b7f\u0011\u0015A\b\u0001\"\u0011z\u0003q9W\r\u001e*f[>$XmQ8o]\u0016\u001cG/[8o\u001b\u0006t\u0017mZ3s\u0013\u0012$\u0012!\u001b\u0005\u0006w\u0002!I\u0001`\u0001\u001baJ|7-Z:t\u0007>tg.Z2uS>tW*\u00198bO\u0016\u0014\u0018\n\u001a\u000b\u0003=vDQ!\u0015>A\u0002IC\u0001b \u0001C\u0002\u0013\u0005\u0011\u0011A\u0001\u0006S:\u0014w\u000e_\u000b\u0002e!9\u0011Q\u0001\u0001!\u0002\u0013\u0011\u0014AB5oE>D\b\u0005C\u0005\u0002\n\u0001\u0011\r\u0011\"\u0001\u0002\f\u0005a\u0001.Z1eKJ\u0014UO\u001a4feV\u0011\u0011Q\u0002\t\u0005\u0003\u001f\t\t\"D\u0001\u0017\u0013\r\t\u0019B\u0006\u0002\u000b\u0005f$XMQ;gM\u0016\u0014\b\u0002CA\f\u0001\u0001\u0006I!!\u0004\u0002\u001b!,\u0017\rZ3s\u0005V4g-\u001a:!\u0011%\tY\u0002\u0001a\u0001\n\u0003\ti\"A\tp]J+7-Z5wK\u000e\u000bG\u000e\u001c2bG.,\"!a\b\u0011\r-\n\tcC2_\u0013\r\t\u0019\u0003\f\u0002\n\rVt7\r^5p]JB\u0011\"a\n\u0001\u0001\u0004%\t!!\u000b\u0002+=t'+Z2fSZ,7)\u00197mE\u0006\u001c7n\u0018\u0013fcR\u0019a,a\u000b\t\u0013A\f)#!AA\u0002\u0005}\u0001\u0002CA\u0018\u0001\u0001\u0006K!a\b\u0002%=t'+Z2fSZ,7)\u00197mE\u0006\u001c7\u000e\t\u0005\n\u0003g\u0001\u0001\u0019!C\u0001\u0003k\tAbY;se\u0016tGo\u00115v].,\u0012!\u0014\u0005\n\u0003s\u0001\u0001\u0019!C\u0001\u0003w\t\u0001cY;se\u0016tGo\u00115v].|F%Z9\u0015\u0007y\u000bi\u0004\u0003\u0005q\u0003o\t\t\u00111\u0001N\u0011\u001d\t\t\u0005\u0001Q!\n5\u000bQbY;se\u0016tGo\u00115v].\u0004\u0003bBA#\u0001\u0011\u0005\u0013qI\u0001\u0005e\u0016\fG\r\u0006\u0002\u0002JA\u00191&a\u0013\n\u0007\u00055CFA\u0004C_>dW-\u00198\t\u000f\u0005E\u0003\u0001\"\u0001\u0002T\u0005IqN\u001c*fG\u0016Lg/\u001a\u000b\u0004=\u0006U\u0003\u0002CA,\u0003\u001f\u0002\r!a\b\u0002\u0011\r\fG\u000e\u001c2bG.Dq!a\u0017\u0001\t\u0003\n9%A\u000bdQ\u0006tw-Z%oi\u0016\u0014Xm\u001d;G_J\u0014V-\u00193\t\u000f\u0005}\u0003\u0001\"\u0011\u0002H\u000512\r[1oO\u0016Le\u000e^3sKN$hi\u001c:Xe&$X\rC\u0004\u0002d\u0001!\t%!\u001a\u0002!I,w-[:uKJLe\u000e^3sKN$H#\u00010\t\u000f\u0005%\u0004\u0001\"\u0011\u0002f\u0005\u0011RO\u001c:fO&\u001cH/\u001a:J]R,'/Z:u\u0011\u001d\ti\u0007\u0001C!\u0003\u000f\nAC]3tKR4uN]2f%\u0016\u0014XmZ5ti\u0016\u0014\b")
/* loaded from: input_file:org/apache/spark/network/ReceivingConnection.class */
public class ReceivingConnection extends Connection {
    private volatile ConnectionManagerId inferredRemoteManagerId;
    private final Inbox inbox;
    private final ByteBuffer headerBuffer;
    private Function2<Connection, Message, BoxedUnit> onReceiveCallback;
    private MessageChunk currentChunk;

    /* compiled from: Connection.scala */
    /* loaded from: input_file:org/apache/spark/network/ReceivingConnection$Inbox.class */
    public class Inbox {
        private final HashMap<Object, BufferMessage> messages;
        public final /* synthetic */ ReceivingConnection $outer;

        public HashMap<Object, BufferMessage> messages() {
            return this.messages;
        }

        public Option<MessageChunk> getChunk(MessageChunkHeader messageChunkHeader) {
            BufferMessage orElseUpdate = messages().getOrElseUpdate(BoxesRunTime.boxToInteger(messageChunkHeader.id()), new ReceivingConnection$Inbox$$anonfun$1(this, messageChunkHeader));
            org$apache$spark$network$ReceivingConnection$Inbox$$$outer().logTrace(new ReceivingConnection$Inbox$$anonfun$getChunk$1(this, orElseUpdate));
            return orElseUpdate.getChunkForReceiving(messageChunkHeader.chunkSize());
        }

        public Option<BufferMessage> getMessageForChunk(MessageChunk messageChunk) {
            return messages().get(BoxesRunTime.boxToInteger(messageChunk.header().id()));
        }

        public void removeMessage(Message message) {
            messages().$minus$eq((HashMap<Object, BufferMessage>) BoxesRunTime.boxToInteger(message.id()));
        }

        public /* synthetic */ ReceivingConnection org$apache$spark$network$ReceivingConnection$Inbox$$$outer() {
            return this.$outer;
        }

        public final BufferMessage org$apache$spark$network$ReceivingConnection$Inbox$$createNewMessage$1(MessageChunkHeader messageChunkHeader) {
            BufferMessage bufferMessage = (BufferMessage) Message$.MODULE$.create(messageChunkHeader);
            bufferMessage.started_$eq(true);
            bufferMessage.startTime_$eq(System.currentTimeMillis());
            org$apache$spark$network$ReceivingConnection$Inbox$$$outer().logDebug(new ReceivingConnection$Inbox$$anonfun$org$apache$spark$network$ReceivingConnection$Inbox$$createNewMessage$1$1(this, bufferMessage));
            messages().$plus$eq2(new Tuple2<>(BoxesRunTime.boxToInteger(bufferMessage.id()), bufferMessage));
            return bufferMessage;
        }

        public Inbox(ReceivingConnection receivingConnection) {
            if (receivingConnection == null) {
                throw new NullPointerException();
            }
            this.$outer = receivingConnection;
            this.messages = new HashMap<>();
        }
    }

    private ConnectionManagerId inferredRemoteManagerId() {
        return this.inferredRemoteManagerId;
    }

    private void inferredRemoteManagerId_$eq(ConnectionManagerId connectionManagerId) {
        this.inferredRemoteManagerId = connectionManagerId;
    }

    @Override // org.apache.spark.network.Connection
    public ConnectionManagerId getRemoteConnectionManagerId() {
        ConnectionManagerId inferredRemoteManagerId = inferredRemoteManagerId();
        return inferredRemoteManagerId == null ? super.getRemoteConnectionManagerId() : inferredRemoteManagerId;
    }

    private void processConnectionManagerId(MessageChunkHeader messageChunkHeader) {
        ConnectionManagerId fromSocketAddress;
        ConnectionManagerId inferredRemoteManagerId = inferredRemoteManagerId();
        if (messageChunkHeader.address() == null || inferredRemoteManagerId != null || (fromSocketAddress = ConnectionManagerId$.MODULE$.fromSocketAddress(messageChunkHeader.address())) == null) {
            return;
        }
        inferredRemoteManagerId_$eq(fromSocketAddress);
    }

    public Inbox inbox() {
        return this.inbox;
    }

    public ByteBuffer headerBuffer() {
        return this.headerBuffer;
    }

    public Function2<Connection, Message, BoxedUnit> onReceiveCallback() {
        return this.onReceiveCallback;
    }

    public void onReceiveCallback_$eq(Function2<Connection, Message, BoxedUnit> function2) {
        this.onReceiveCallback = function2;
    }

    public MessageChunk currentChunk() {
        return this.currentChunk;
    }

    public void currentChunk_$eq(MessageChunk messageChunk) {
        this.currentChunk = messageChunk;
    }

    @Override // org.apache.spark.network.Connection
    public boolean read() {
        while (true) {
            try {
                if (currentChunk() == null) {
                    if (channel().read(headerBuffer()) == -1) {
                        close();
                        return false;
                    }
                    if (headerBuffer().remaining() > 0) {
                        return true;
                    }
                    headerBuffer().flip();
                    if (headerBuffer().remaining() != MessageChunkHeader$.MODULE$.HEADER_SIZE()) {
                        throw new Exception(new StringBuilder().append((Object) "Unexpected number of bytes (").append(BoxesRunTime.boxToInteger(headerBuffer().remaining())).append((Object) ") in the header").toString());
                    }
                    MessageChunkHeader create = MessageChunkHeader$.MODULE$.create(headerBuffer());
                    headerBuffer().clear();
                    processConnectionManagerId(create);
                    if (Message$.MODULE$.BUFFER_MESSAGE() != create.typ()) {
                        throw new Exception("Message of unknown type received");
                    }
                    if (create.totalSize() == 0) {
                        if (onReceiveCallback() == null) {
                            BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        } else {
                            onReceiveCallback().mo8441apply(this, Message$.MODULE$.create(create));
                        }
                        currentChunk_$eq(null);
                        return true;
                    }
                    currentChunk_$eq((MessageChunk) inbox().getChunk(create).orNull(Predef$.MODULE$.conforms()));
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                }
                if (currentChunk() == null) {
                    throw new Exception("No message chunk to receive data");
                }
                int read = channel().read(currentChunk().buffer());
                if (read == 0) {
                    return true;
                }
                if (read == -1) {
                    close();
                    return false;
                }
                if (currentChunk().buffer().remaining() == 0) {
                    BufferMessage bufferMessage = inbox().getMessageForChunk(currentChunk()).get();
                    if (bufferMessage.isCompletelyReceived()) {
                        bufferMessage.flip();
                        bufferMessage.finishTime_$eq(System.currentTimeMillis());
                        logDebug(new ReceivingConnection$$anonfun$read$3(this, bufferMessage));
                        if (onReceiveCallback() == null) {
                            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                        } else {
                            onReceiveCallback().mo8441apply(this, bufferMessage);
                        }
                        inbox().removeMessage(bufferMessage);
                    }
                    currentChunk_$eq(null);
                }
            } catch (Exception e) {
                logWarning(new ReceivingConnection$$anonfun$read$4(this), e);
                callOnExceptionCallback(e);
                close();
                return false;
            }
        }
    }

    public void onReceive(Function2<Connection, Message, BoxedUnit> function2) {
        onReceiveCallback_$eq(function2);
    }

    @Override // org.apache.spark.network.Connection
    public boolean changeInterestForRead() {
        return true;
    }

    @Override // org.apache.spark.network.Connection
    public boolean changeInterestForWrite() {
        throw new IllegalStateException("Unexpected invocation right now");
    }

    @Override // org.apache.spark.network.Connection
    public void registerInterest() {
        changeConnectionKeyInterest(1);
    }

    @Override // org.apache.spark.network.Connection
    public void unregisterInterest() {
        changeConnectionKeyInterest(0);
    }

    @Override // org.apache.spark.network.Connection
    public boolean resetForceReregister() {
        return false;
    }

    public ReceivingConnection(SocketChannel socketChannel, Selector selector) {
        super(socketChannel, selector);
        this.inferredRemoteManagerId = null;
        this.inbox = new Inbox(this);
        this.headerBuffer = ByteBuffer.allocate(MessageChunkHeader$.MODULE$.HEADER_SIZE());
        this.onReceiveCallback = null;
        this.currentChunk = null;
        channel().register(selector(), 1);
    }
}
