package com.rabbitmq.qpid.protonj2.client.impl;

import com.rabbitmq.qpid.protonj2.client.StreamDelivery;
import com.rabbitmq.qpid.protonj2.client.StreamReceiver;
import com.rabbitmq.qpid.protonj2.client.StreamReceiverOptions;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException;
import com.rabbitmq.qpid.protonj2.client.futures.ClientFuture;
import com.rabbitmq.qpid.protonj2.engine.IncomingDelivery;
import com.rabbitmq.qpid.protonj2.engine.Receiver;
import com.rabbitmq.qpid.protonj2.types.messaging.Released;
import java.lang.invoke.MethodHandles;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/rabbitmq/qpid/protonj2/client/impl/ClientStreamReceiver.class */
public final class ClientStreamReceiver extends ClientReceiverLinkType<StreamReceiver> implements StreamReceiver {
    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private ClientFuture<StreamReceiver> drainingFuture;
    private Future<?> drainingTimeout;
    private final StreamReceiverOptions options;
    private final Map<ClientFuture<StreamDelivery>, Future<?>> receiveRequests;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClientStreamReceiver(ClientSession clientSession, StreamReceiverOptions streamReceiverOptions, String str, Receiver receiver) {
        super(clientSession, str, streamReceiverOptions, receiver);
        this.receiveRequests = new LinkedHashMap();
        this.options = streamReceiverOptions;
        if (streamReceiverOptions.creditWindow() > 0) {
            this.protonReceiver.addCredit(streamReceiverOptions.creditWindow());
        }
    }

    @Override // com.rabbitmq.qpid.protonj2.client.StreamReceiver
    public StreamDelivery receive() throws ClientException {
        return receive(-1L, TimeUnit.MILLISECONDS);
    }

    @Override // com.rabbitmq.qpid.protonj2.client.StreamReceiver
    public StreamDelivery receive(long j, TimeUnit timeUnit) throws ClientException {
        checkClosedOrFailed();
        ClientFuture createFuture = this.session.getFutureFactory().createFuture();
        this.executor.execute(() -> {
            if (notClosedOrFailed(createFuture)) {
                IncomingDelivery incomingDelivery = null;
                Iterator<IncomingDelivery> it = this.protonReceiver.unsettled().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    IncomingDelivery next = it.next();
                    if (next.getLinkedResource() == null) {
                        incomingDelivery = next;
                        break;
                    }
                }
                if (incomingDelivery == null) {
                    if (j == 0) {
                        createFuture.complete(null);
                        return;
                    } else {
                        this.receiveRequests.put(createFuture, j > 0 ? this.session.getScheduler().schedule(() -> {
                            this.receiveRequests.remove(createFuture);
                            createFuture.complete(null);
                        }, j, timeUnit) : null);
                        return;
                    }
                }
                createFuture.complete(new ClientStreamDelivery(this, incomingDelivery));
                if (this.options.creditWindow() > 0) {
                    this.executor.execute(() -> {
                        replenishCreditIfNeeded();
                    });
                }
            }
        });
        return (StreamDelivery) this.session.request(this, createFuture);
    }

    @Override // com.rabbitmq.qpid.protonj2.client.StreamReceiver
    public StreamDelivery tryReceive() throws ClientException {
        checkClosedOrFailed();
        return receive(0L, TimeUnit.MILLISECONDS);
    }

    @Override // com.rabbitmq.qpid.protonj2.client.StreamReceiver
    public StreamReceiver addCredit(int i) throws ClientException {
        checkClosedOrFailed();
        ClientFuture createFuture = this.session.getFutureFactory().createFuture();
        this.executor.execute(() -> {
            if (notClosedOrFailed(createFuture)) {
                if (this.options.creditWindow() != 0) {
                    createFuture.failed(new ClientIllegalStateException("Cannot add credit when a credit window has been configured"));
                    return;
                }
                if (this.protonReceiver.isDraining()) {
                    createFuture.failed(new ClientIllegalStateException("Cannot add credit while a drain is pending"));
                    return;
                }
                try {
                    this.protonReceiver.addCredit(i);
                    createFuture.complete(this);
                } catch (Exception e) {
                    createFuture.failed(ClientExceptionSupport.createNonFatalOrPassthrough(e));
                }
            }
        });
        return (StreamReceiver) this.session.request(this, createFuture);
    }

    @Override // com.rabbitmq.qpid.protonj2.client.StreamReceiver
    public Future<StreamReceiver> drain() throws ClientException {
        checkClosedOrFailed();
        ClientFuture createFuture = this.session.getFutureFactory().createFuture();
        this.executor.execute(() -> {
            if (notClosedOrFailed(createFuture)) {
                if (this.protonReceiver.isDraining()) {
                    createFuture.failed(new ClientIllegalStateException("StreamReceiver is already draining"));
                    return;
                }
                try {
                    if (this.protonReceiver.drain()) {
                        this.drainingFuture = createFuture;
                        this.drainingTimeout = this.session.scheduleRequestTimeout(this.drainingFuture, this.options.drainTimeout(), () -> {
                            return new ClientOperationTimedOutException("Timed out waiting for remote to respond to drain request");
                        });
                    } else {
                        createFuture.complete(this);
                    }
                } catch (Exception e) {
                    createFuture.failed(ClientExceptionSupport.createNonFatalOrPassthrough(e));
                }
            }
        });
        return createFuture;
    }

    @Override // com.rabbitmq.qpid.protonj2.client.StreamReceiver
    public long queuedDeliveries() throws ClientException {
        checkClosedOrFailed();
        ClientFuture createFuture = this.session.getFutureFactory().createFuture();
        this.executor.execute(() -> {
            if (notClosedOrFailed(createFuture)) {
                createFuture.complete(Integer.valueOf((int) this.protonReceiver.unsettled().stream().filter(incomingDelivery -> {
                    return incomingDelivery.getLinkedResource() == null;
                }).count()));
            }
        });
        return ((Integer) this.session.request(this, createFuture)).intValue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamReceiverOptions receiverOptions() {
        return this.options;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientLinkType
    public StreamReceiver self() {
        return this;
    }

    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientReceiverLinkType
    protected void handleDeliveryRead(IncomingDelivery incomingDelivery) {
        LOG.trace("Delivery data was received: {}", incomingDelivery);
        if (incomingDelivery.getDefaultDeliveryState() == null) {
            incomingDelivery.setDefaultDeliveryState(Released.getInstance());
        }
        if (incomingDelivery.getLinkedResource() != null || this.receiveRequests.isEmpty()) {
            return;
        }
        Iterator<Map.Entry<ClientFuture<StreamDelivery>, Future<?>>> it = this.receiveRequests.entrySet().iterator();
        Map.Entry<ClientFuture<StreamDelivery>, Future<?>> next = it.next();
        if (next.getValue() != null) {
            next.getValue().cancel(false);
        }
        try {
            next.getKey().complete(new ClientStreamDelivery(this, incomingDelivery));
            it.remove();
            if (this.options.creditWindow() > 0) {
                this.executor.execute(() -> {
                    replenishCreditIfNeeded();
                });
            }
        } catch (Throwable th) {
            it.remove();
            if (this.options.creditWindow() > 0) {
                this.executor.execute(() -> {
                    replenishCreditIfNeeded();
                });
            }
            throw th;
        }
    }

    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientReceiverLinkType
    protected void replenishCreditIfNeeded() {
        int creditWindow = this.options.creditWindow();
        if (creditWindow > 0) {
            int credit = this.protonReceiver.getCredit();
            if (credit <= creditWindow * 0.5d) {
                int count = credit + ((int) this.protonReceiver.unsettled().stream().filter(incomingDelivery -> {
                    return incomingDelivery.getLinkedResource() == null;
                }).count());
                if (count <= creditWindow * 0.7d) {
                    int i = creditWindow - count;
                    LOG.trace("Consumer granting additional credit: {}", Integer.valueOf(i));
                    try {
                        this.protonReceiver.addCredit(i);
                    } catch (Exception e) {
                        LOG.debug("Error caught during credit top-up", e);
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientReceiverLinkType, com.rabbitmq.qpid.protonj2.client.impl.ClientLinkType
    public void linkSpecificCleanupHandler(ClientException clientException) {
        this.session.closeAsync();
        this.receiveRequests.forEach((clientFuture, future) -> {
            if (future != null) {
                future.cancel(false);
            }
            if (clientException != null) {
                clientFuture.failed(clientException);
            } else {
                clientFuture.failed(new ClientResourceRemotelyClosedException("The Stream Receiver has closed"));
            }
        });
        this.protonReceiver.unsettled().forEach(incomingDelivery -> {
            if (incomingDelivery.getLinkedResource() != null) {
                try {
                    ((ClientStreamDelivery) incomingDelivery.getLinkedResource(ClientStreamDelivery.class)).handleReceiverClosed(this);
                } catch (Exception e) {
                }
            }
        });
        super.linkSpecificCleanupHandler(clientException);
    }

    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientLinkType
    protected void recreateLinkForReconnect() {
        int credit = this.protonReceiver.getCredit() + this.protonReceiver.unsettled().size();
        if (this.drainingFuture != null) {
            this.drainingFuture.complete(this);
            if (this.drainingTimeout != null) {
                this.drainingTimeout.cancel(false);
                this.drainingTimeout = null;
            }
        }
        this.protonReceiver.localCloseHandler(null);
        this.protonReceiver.localDetachHandler(null);
        this.protonReceiver.close2();
        this.protonReceiver = ClientReceiverBuilder.recreateReceiver(this.session, this.protonReceiver, this.options);
        this.protonReceiver.setLinkedResource(this);
        this.protonReceiver.addCredit(credit);
    }
}
