package com.rabbitmq.qpid.protonj2.client.impl;

import com.rabbitmq.qpid.protonj2.buffer.ProtonCompositeBuffer;
import com.rabbitmq.qpid.protonj2.client.DeliveryState;
import com.rabbitmq.qpid.protonj2.client.StreamDelivery;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientDeliveryAbortedException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException;
import com.rabbitmq.qpid.protonj2.client.futures.ClientFuture;
import com.rabbitmq.qpid.protonj2.engine.IncomingDelivery;
import com.rabbitmq.qpid.protonj2.engine.Scheduler;
import com.rabbitmq.qpid.protonj2.engine.exceptions.EngineFailedException;
import com.rabbitmq.qpid.protonj2.engine.util.StringUtils;
import com.rabbitmq.qpid.protonj2.types.messaging.Accepted;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/rabbitmq/qpid/protonj2/client/impl/ClientStreamDelivery.class */
public final class ClientStreamDelivery extends ClientDeliverable<ClientStreamDelivery, ClientStreamReceiver> implements StreamDelivery {
    private static final Logger LOG = LoggerFactory.getLogger(ClientStreamDelivery.class);
    private final ClientStreamReceiver receiver;
    private final IncomingDelivery protonDelivery;
    private ClientStreamReceiverMessage message;
    private RawDeliveryInputStream rawInputStream;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/rabbitmq/qpid/protonj2/client/impl/ClientStreamDelivery$RawDeliveryInputStream.class */
    public class RawDeliveryInputStream extends InputStream {
        private final ProtonCompositeBuffer buffer;
        private final Scheduler executor;
        private ClientFuture<Integer> readRequest;
        private int markLimit;
        private final int INVALID_MARK = -1;
        private final AtomicBoolean closed = new AtomicBoolean();
        private int markIndex = -1;

        public RawDeliveryInputStream() {
            this.executor = ClientStreamDelivery.this.receiver.session().getScheduler();
            this.buffer = ClientStreamDelivery.this.receiver.session().connection().getEngine().configuration().getBufferAllocator().composite().convertToReadOnly();
        }

        @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.markLimit = 0;
            this.markIndex = -1;
            if (this.closed.compareAndSet(false, true)) {
                ClientFuture createFuture = ClientStreamDelivery.this.receiver.session().getFutureFactory().createFuture();
                try {
                    try {
                        this.executor.execute(() -> {
                            try {
                                ClientStreamDelivery.this.protonDelivery.readAll();
                            } catch (EngineFailedException e) {
                            }
                            try {
                                this.buffer.close();
                            } catch (Exception e2) {
                            }
                            if (this.readRequest != null) {
                                this.readRequest.complete(-1);
                                this.readRequest = null;
                            }
                            createFuture.complete(null);
                        });
                        ClientStreamDelivery.this.receiver.session().request(ClientStreamDelivery.this.receiver, createFuture);
                        super.close();
                        try {
                            this.buffer.close();
                        } catch (Exception e) {
                        }
                    } catch (Throwable th) {
                        super.close();
                        try {
                            this.buffer.close();
                        } catch (Exception e2) {
                        }
                        throw th;
                    }
                } catch (Exception e3) {
                    ClientStreamDelivery.LOG.debug("Ignoring error on RawInputStream close: ", e3);
                    super.close();
                    try {
                        this.buffer.close();
                    } catch (Exception e4) {
                    }
                }
            }
        }

        @Override // java.io.InputStream
        public boolean markSupported() {
            return true;
        }

        @Override // java.io.InputStream
        public synchronized void mark(int i) {
            this.markIndex = this.buffer.getReadOffset();
            this.markLimit = i;
        }

        @Override // java.io.InputStream
        public synchronized void reset() throws IOException {
            if (this.markIndex != -1) {
                this.buffer.setReadOffset(this.markIndex);
                this.markIndex = -1;
                this.markLimit = 0;
            }
        }

        @Override // java.io.InputStream
        public int available() throws IOException {
            checkStreamStateIsValid();
            if (this.buffer.isReadable()) {
                return this.buffer.getReadableBytes();
            }
            ClientFuture createFuture = ClientStreamDelivery.this.receiver.session().getFutureFactory().createFuture();
            try {
                this.executor.execute(() -> {
                    if (ClientStreamDelivery.this.protonDelivery.available() > 0) {
                        this.buffer.append(ClientStreamDelivery.this.protonDelivery.readAll());
                    }
                    createFuture.complete(Integer.valueOf(this.buffer.getReadableBytes()));
                });
                return ((Integer) ClientStreamDelivery.this.receiver.session().request(ClientStreamDelivery.this.receiver, createFuture)).intValue();
            } catch (Exception e) {
                throw new IOException("Error reading requested data", e);
            }
        }

        @Override // java.io.InputStream
        public int read() throws IOException {
            checkStreamStateIsValid();
            int i = -1;
            while (true) {
                if (this.buffer.isReadable()) {
                    i = this.buffer.readByte() & 255;
                    tryReleaseReadBuffers();
                    break;
                }
                if (requestMoreData() < 0) {
                    break;
                }
            }
            return i;
        }

        @Override // java.io.InputStream
        public int read(byte[] bArr, int i, int i2) throws IOException {
            int i3;
            checkStreamStateIsValid();
            Objects.checkFromIndexSize(i, i2, bArr.length);
            int i4 = i2;
            int i5 = 0;
            if (i2 <= 0) {
                return 0;
            }
            while (i4 > 0) {
                if (this.buffer.isReadable()) {
                    if (this.buffer.getReadableBytes() < i4) {
                        int readableBytes = this.buffer.getReadableBytes();
                        this.buffer.readBytes(bArr, i + i5, this.buffer.getReadableBytes());
                        i5 += readableBytes;
                        i3 = i4 - readableBytes;
                    } else {
                        this.buffer.readBytes(bArr, i + i5, i4);
                        i5 += i4;
                        i3 = 0;
                    }
                    i4 = i3;
                    tryReleaseReadBuffers();
                } else if (requestMoreData() < 0) {
                    if (i5 > 0) {
                        return i5;
                    }
                    return -1;
                }
            }
            return i5;
        }

        @Override // java.io.InputStream
        public long skip(long j) throws IOException {
            checkStreamStateIsValid();
            long j2 = j;
            if (j <= 0) {
                return 0L;
            }
            while (j2 > 0) {
                if (this.buffer.isReadable()) {
                    if (this.buffer.getReadableBytes() < j2) {
                        j2 -= this.buffer.getReadableBytes();
                        this.buffer.mo51advanceReadOffset(this.buffer.getReadableBytes());
                    } else {
                        this.buffer.mo51advanceReadOffset((int) j2);
                        j2 = 0;
                    }
                    tryReleaseReadBuffers();
                } else if (requestMoreData() < 0) {
                    break;
                }
            }
            return j - j2;
        }

        @Override // java.io.InputStream
        public long transferTo(OutputStream outputStream) throws IOException {
            checkStreamStateIsValid();
            return super.transferTo(outputStream);
        }

        private void tryReleaseReadBuffers() {
            if (this.buffer.getReadOffset() - this.markIndex > this.markLimit) {
                this.markIndex = -1;
                this.markLimit = 0;
                try {
                    this.buffer.splitComponentsFloor(this.buffer.getReadOffset()).close();
                } catch (Exception e) {
                }
            }
        }

        private void handleDeliveryRead(IncomingDelivery incomingDelivery) {
            if (this.closed.get()) {
                incomingDelivery.readAll();
                return;
            }
            if (this.readRequest == null || this.readRequest.isComplete()) {
                return;
            }
            if (incomingDelivery.available() > 0) {
                this.buffer.append(ClientStreamDelivery.this.protonDelivery.readAll());
                this.readRequest.complete(Integer.valueOf(this.buffer.getReadableBytes()));
            } else if (!incomingDelivery.isPartial()) {
                this.readRequest.complete(-1);
            }
            this.readRequest = null;
        }

        private void handleDeliveryAborted(IncomingDelivery incomingDelivery) {
            if (this.readRequest != null) {
                this.readRequest.failed(new ClientDeliveryAbortedException("The remote sender has aborted this delivery"));
            }
        }

        private void handleReceiverClosed(ClientStreamReceiver clientStreamReceiver) {
            if (this.readRequest != null) {
                this.readRequest.failed(new ClientResourceRemotelyClosedException("The receiver link has been remotely closed."));
            }
        }

        private int requestMoreData() throws IOException {
            ClientFuture createFuture = ClientStreamDelivery.this.receiver.session().getFutureFactory().createFuture();
            try {
                this.executor.execute(() -> {
                    if (ClientStreamDelivery.this.protonDelivery.getLink().isLocallyClosedOrDetached()) {
                        createFuture.failed(new ClientException("Cannot read from delivery due to link having been closed"));
                        return;
                    }
                    if (ClientStreamDelivery.this.protonDelivery.available() > 0) {
                        this.buffer.append(ClientStreamDelivery.this.protonDelivery.readAll());
                        createFuture.complete(Integer.valueOf(this.buffer.getReadableBytes()));
                    } else if (ClientStreamDelivery.this.protonDelivery.isAborted()) {
                        createFuture.failed(new ClientDeliveryAbortedException("The remote sender has aborted this delivery"));
                    } else if (ClientStreamDelivery.this.protonDelivery.isPartial()) {
                        this.readRequest = createFuture;
                    } else {
                        createFuture.complete(-1);
                    }
                });
                return ((Integer) ClientStreamDelivery.this.receiver.session().request(ClientStreamDelivery.this.receiver, createFuture)).intValue();
            } catch (Exception e) {
                throw new IOException("Error reading requested data", e);
            }
        }

        private void checkStreamStateIsValid() throws IOException {
            if (this.closed.get()) {
                throw new IOException("The InputStream has been explicitly closed");
            }
            if (ClientStreamDelivery.this.receiver.isClosed()) {
                throw new IOException("Underlying receiver has closed", ClientStreamDelivery.this.receiver.getFailureCause());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClientStreamDelivery(ClientStreamReceiver clientStreamReceiver, IncomingDelivery incomingDelivery) {
        super(clientStreamReceiver, incomingDelivery);
        this.receiver = clientStreamReceiver;
        this.protonDelivery = incomingDelivery.setLinkedResource(this);
        autoAcceptDeliveryIfNecessary();
        incomingDelivery.deliveryReadHandler(this::handleDeliveryRead).deliveryAbortedHandler(this::handleDeliveryAborted);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientDeliverable
    public ClientStreamDelivery self() {
        return this;
    }

    @Override // com.rabbitmq.qpid.protonj2.client.StreamDelivery
    public ClientStreamReceiver receiver() {
        return this.receiver;
    }

    @Override // com.rabbitmq.qpid.protonj2.client.StreamDelivery
    public boolean aborted() {
        return this.protonDelivery.isAborted();
    }

    @Override // com.rabbitmq.qpid.protonj2.client.StreamDelivery
    public boolean completed() {
        return !this.protonDelivery.isPartial();
    }

    @Override // com.rabbitmq.qpid.protonj2.client.StreamDelivery
    public ClientStreamReceiverMessage message() throws ClientException {
        if (this.rawInputStream != null && this.message == null) {
            throw new ClientIllegalStateException("Cannot access Delivery Message API after requesting an InputStream");
        }
        if (this.message == null) {
            ClientStreamReceiver clientStreamReceiver = this.receiver;
            RawDeliveryInputStream rawDeliveryInputStream = new RawDeliveryInputStream();
            this.rawInputStream = rawDeliveryInputStream;
            this.message = new ClientStreamReceiverMessage(clientStreamReceiver, this, rawDeliveryInputStream);
        }
        return this.message;
    }

    @Override // com.rabbitmq.qpid.protonj2.client.StreamDelivery
    public Map<String, Object> annotations() throws ClientException {
        if (this.rawInputStream == null || this.message != null) {
            return StringUtils.toStringKeyedMap(message().deliveryAnnotations() != null ? message().deliveryAnnotations().getValue() : null);
        }
        throw new ClientIllegalStateException("Cannot access Delivery Annotations API after requesting an InputStream");
    }

    @Override // com.rabbitmq.qpid.protonj2.client.StreamDelivery
    public InputStream rawInputStream() throws ClientException {
        if (this.message != null) {
            throw new ClientIllegalStateException("Cannot access Delivery InputStream API after requesting an Message");
        }
        if (this.rawInputStream == null) {
            this.rawInputStream = new RawDeliveryInputStream();
        }
        return this.rawInputStream;
    }

    void handleDeliveryRead(IncomingDelivery incomingDelivery) {
        try {
            if (this.rawInputStream != null) {
                this.rawInputStream.handleDeliveryRead(incomingDelivery);
            }
        } finally {
            autoAcceptDeliveryIfNecessary();
        }
    }

    void handleDeliveryAborted(IncomingDelivery incomingDelivery) {
        try {
            if (this.rawInputStream != null) {
                this.rawInputStream.handleDeliveryAborted(incomingDelivery);
            }
        } finally {
            try {
                this.receiver.disposition(incomingDelivery, null, true);
            } catch (Exception e) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleReceiverClosed(ClientStreamReceiver clientStreamReceiver) {
        if (this.rawInputStream != null) {
            this.rawInputStream.handleReceiverClosed(clientStreamReceiver);
        }
    }

    private void autoAcceptDeliveryIfNecessary() {
        if (!this.receiver.receiverOptions().autoAccept() || this.protonDelivery.isSettled() || this.protonDelivery.isPartial()) {
            return;
        }
        try {
            this.receiver.disposition(this.protonDelivery, Accepted.getInstance(), this.receiver.receiverOptions().autoSettle());
        } catch (Exception e) {
            LOG.trace("Caught error while attempting to auto accept the fully read delivery.", e);
        }
    }

    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientDeliverable, com.rabbitmq.qpid.protonj2.client.Delivery
    public /* bridge */ /* synthetic */ StreamDelivery settle() throws ClientException {
        return (StreamDelivery) super.settle();
    }

    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientDeliverable, com.rabbitmq.qpid.protonj2.client.Delivery
    public /* bridge */ /* synthetic */ StreamDelivery disposition(DeliveryState deliveryState, boolean z) throws ClientException {
        return (StreamDelivery) super.disposition(deliveryState, z);
    }

    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientDeliverable, com.rabbitmq.qpid.protonj2.client.Delivery
    public /* bridge */ /* synthetic */ StreamDelivery modified(boolean z, boolean z2) throws ClientException {
        return (StreamDelivery) super.modified(z, z2);
    }

    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientDeliverable, com.rabbitmq.qpid.protonj2.client.Delivery
    public /* bridge */ /* synthetic */ StreamDelivery reject(String str, String str2) throws ClientException {
        return (StreamDelivery) super.reject(str, str2);
    }

    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientDeliverable, com.rabbitmq.qpid.protonj2.client.Delivery
    public /* bridge */ /* synthetic */ StreamDelivery release() throws ClientException {
        return (StreamDelivery) super.release();
    }

    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientDeliverable, com.rabbitmq.qpid.protonj2.client.Delivery
    public /* bridge */ /* synthetic */ StreamDelivery accept() throws ClientException {
        return (StreamDelivery) super.accept();
    }
}
