package org.apache.qpid.proton.hawtdispatch.api;

import java.util.LinkedList;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.impl.DeliveryImpl;
import org.apache.qpid.proton.engine.impl.ReceiverImpl;
import org.apache.qpid.proton.hawtdispatch.impl.Defer;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.ByteArrayOutputStream;

/* loaded from: input_file:WEB-INF/lib/proton-hawtdispatch-0.3.0-fuse-2.jar:org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.class */
public class AmqpReceiver extends AmqpLink {
    final AmqpSession parent;
    final ReceiverImpl receiver;
    ByteArrayOutputStream current = new ByteArrayOutputStream();
    LinkedList<MessageDelivery> inbound = new LinkedList<>();
    Defer deferedDrain = new Defer() { // from class: org.apache.qpid.proton.hawtdispatch.api.AmqpReceiver.2
        @Override // org.fusesource.hawtdispatch.Task, java.lang.Runnable
        public void run() {
            AmqpReceiver.this.drainInbound();
        }
    };
    int resumed = 0;
    AmqpDeliveryListener deliveryListener;

    public AmqpReceiver(AmqpSession amqpSession, ReceiverImpl receiverImpl, QoS qoS) {
        this.parent = amqpSession;
        this.receiver = receiverImpl;
        attach();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.qpid.proton.hawtdispatch.api.AmqpEndpointBase
    public ReceiverImpl getEndpoint() {
        return this.receiver;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.qpid.proton.hawtdispatch.api.AmqpEndpointBase
    public AmqpSession getParent() {
        return this.parent;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.qpid.proton.hawtdispatch.api.AmqpLink
    public void processDelivery(Delivery delivery) {
        int recv;
        if (!delivery.isReadable()) {
            System.out.println("it was not readable!");
            return;
        }
        if (this.current == null) {
            this.current = new ByteArrayOutputStream();
        }
        byte[] bArr = new byte[4096];
        while (true) {
            recv = this.receiver.recv(bArr, 0, bArr.length);
            if (recv <= 0) {
                break;
            } else {
                this.current.write(bArr, 0, recv);
            }
        }
        if (recv == 0) {
            return;
        }
        this.receiver.advance();
        Buffer buffer = this.current.toBuffer();
        this.current = null;
        onMessage(delivery, buffer);
    }

    protected void onMessage(Delivery delivery, Buffer buffer) {
        MessageDelivery messageDelivery = new MessageDelivery(buffer) { // from class: org.apache.qpid.proton.hawtdispatch.api.AmqpReceiver.1
            @Override // org.apache.qpid.proton.hawtdispatch.api.MessageDelivery
            AmqpLink link() {
                return AmqpReceiver.this;
            }

            @Override // org.apache.qpid.proton.hawtdispatch.api.MessageDelivery
            public void settle() {
                if (!this.delivery.isSettled()) {
                    this.delivery.disposition(new Accepted());
                    this.delivery.settle();
                }
                AmqpReceiver.this.drain();
            }
        };
        messageDelivery.delivery = (DeliveryImpl) delivery;
        delivery.setContext(messageDelivery);
        this.inbound.add(messageDelivery);
        drainInbound();
    }

    public void drain() {
        defer(this.deferedDrain);
    }

    public void resume() {
        this.resumed++;
    }

    public void suspend() {
        this.resumed--;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void drainInbound() {
        while (this.deliveryListener != null && !this.inbound.isEmpty() && this.resumed > 0) {
            this.deliveryListener.onMessageDelivery(this.inbound.removeFirst());
            this.receiver.flow(1);
        }
    }

    public AmqpDeliveryListener getDeliveryListener() {
        return this.deliveryListener;
    }

    public void setDeliveryListener(AmqpDeliveryListener amqpDeliveryListener) {
        this.deliveryListener = amqpDeliveryListener;
        drainInbound();
    }
}
