package com.rabbitmq.client.amqp.impl;

import com.rabbitmq.client.amqp.AmqpException;
import com.rabbitmq.client.amqp.BackOffDelayPolicy;
import com.rabbitmq.client.amqp.Consumer;
import com.rabbitmq.client.amqp.ConsumerBuilder;
import com.rabbitmq.client.amqp.Resource;
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
import com.rabbitmq.qpid.protonj2.client.Delivery;
import com.rabbitmq.qpid.protonj2.client.DeliveryMode;
import com.rabbitmq.qpid.protonj2.client.DeliveryState;
import com.rabbitmq.qpid.protonj2.client.Message;
import com.rabbitmq.qpid.protonj2.client.Receiver;
import com.rabbitmq.qpid.protonj2.client.ReceiverOptions;
import com.rabbitmq.qpid.protonj2.client.Session;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientIOException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientLinkRemotelyClosedException;
import com.rabbitmq.qpid.protonj2.client.impl.ClientReceiver;
import com.rabbitmq.qpid.protonj2.client.util.DeliveryQueue;
import com.rabbitmq.qpid.protonj2.engine.EventHandler;
import com.rabbitmq.qpid.protonj2.engine.Scheduler;
import com.rabbitmq.qpid.protonj2.engine.impl.ProtonLinkCreditState;
import com.rabbitmq.qpid.protonj2.engine.impl.ProtonReceiver;
import com.rabbitmq.qpid.protonj2.engine.impl.ProtonSessionIncomingWindow;
import java.time.Duration;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/rabbitmq/client/amqp/impl/AmqpConsumer.class */
public final class AmqpConsumer extends ResourceBase implements Consumer {
    private static final AtomicLong ID_SEQUENCE = new AtomicLong(0);
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConsumer.class);
    private volatile ClientReceiver nativeReceiver;
    private final AtomicBoolean closed;
    private volatile Future<?> receiveLoop;
    private final int initialCredits;
    private final Consumer.MessageHandler messageHandler;
    private final Long id;
    private final String address;
    private final String queue;
    private final Map<String, Object> filters;
    private final Map<String, Object> linkProperties;
    private final ConsumerBuilder.SubscriptionListener subscriptionListener;
    private final AmqpConnection connection;
    private final AtomicReference<PauseStatus> pauseStatus;
    private final AtomicReference<CountDownLatch> echoedFlowAfterPauseLatch;
    private final MetricsCollector metricsCollector;
    private final SessionHandler sessionHandler;
    private final AtomicLong unsettledMessageCount;
    private final Runnable replenishCreditOperation;
    private ProtonReceiver protonReceiver;
    private volatile Scheduler protonExecutor;
    private DeliveryQueue protonDeliveryQueue;
    private ProtonSessionIncomingWindow sessionWindow;
    private ProtonLinkCreditState creditState;

    /* loaded from: input_file:com/rabbitmq/client/amqp/impl/AmqpConsumer$DeliveryContext.class */
    private static class DeliveryContext implements Consumer.Context {
        private final AtomicBoolean settled = new AtomicBoolean(false);
        private final Delivery delivery;
        private final Scheduler protonExecutor;
        private final MetricsCollector metricsCollector;
        private final AtomicLong unsettledMessageCount;
        private final Runnable replenishCreditOperation;

        private DeliveryContext(Delivery delivery, Scheduler scheduler, MetricsCollector metricsCollector, AtomicLong atomicLong, Runnable runnable) {
            this.delivery = delivery;
            this.protonExecutor = scheduler;
            this.metricsCollector = metricsCollector;
            this.unsettledMessageCount = atomicLong;
            this.replenishCreditOperation = runnable;
        }

        @Override // com.rabbitmq.client.amqp.Consumer.Context
        public void accept() {
            if (this.settled.compareAndSet(false, true)) {
                try {
                    this.protonExecutor.execute(this.replenishCreditOperation);
                    this.delivery.disposition(DeliveryState.accepted(), true);
                    this.unsettledMessageCount.decrementAndGet();
                    this.metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.ACCEPTED);
                } catch (ClientIOException | ClientIllegalStateException | RejectedExecutionException e) {
                    AmqpConsumer.LOGGER.debug("message accept failed: {}", e.getMessage());
                } catch (ClientException e2) {
                    throw ExceptionUtils.convert(e2);
                }
            }
        }

        @Override // com.rabbitmq.client.amqp.Consumer.Context
        public void discard() {
            if (this.settled.compareAndSet(false, true)) {
                try {
                    this.protonExecutor.execute(this.replenishCreditOperation);
                    this.delivery.disposition(DeliveryState.rejected("", ""), true);
                    this.unsettledMessageCount.decrementAndGet();
                    this.metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.DISCARDED);
                } catch (ClientIOException | ClientIllegalStateException | RejectedExecutionException e) {
                    AmqpConsumer.LOGGER.debug("message discard failed: {}", e.getMessage());
                } catch (ClientException e2) {
                    throw ExceptionUtils.convert(e2);
                }
            }
        }

        @Override // com.rabbitmq.client.amqp.Consumer.Context
        public void discard(Map<String, Object> map) {
            Map<String, Object> emptyMap;
            if (this.settled.compareAndSet(false, true)) {
                if (map == null) {
                    try {
                        emptyMap = Collections.emptyMap();
                    } catch (ClientIOException | ClientIllegalStateException | RejectedExecutionException e) {
                        AmqpConsumer.LOGGER.debug("message discard (modified) failed: {}", e.getMessage());
                        return;
                    } catch (ClientException e2) {
                        throw ExceptionUtils.convert(e2);
                    }
                } else {
                    emptyMap = map;
                }
                Map<String, Object> map2 = emptyMap;
                Utils.checkMessageAnnotations(map2);
                this.protonExecutor.execute(this.replenishCreditOperation);
                this.delivery.disposition(DeliveryState.modified(true, true, map2), true);
                this.unsettledMessageCount.decrementAndGet();
                this.metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.DISCARDED);
            }
        }

        @Override // com.rabbitmq.client.amqp.Consumer.Context
        public void requeue() {
            if (this.settled.compareAndSet(false, true)) {
                try {
                    this.protonExecutor.execute(this.replenishCreditOperation);
                    this.delivery.disposition(DeliveryState.released(), true);
                    this.unsettledMessageCount.decrementAndGet();
                    this.metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.REQUEUED);
                } catch (ClientIOException | ClientIllegalStateException | RejectedExecutionException e) {
                    AmqpConsumer.LOGGER.debug("message requeue failed: {}", e.getMessage());
                } catch (ClientException e2) {
                    throw ExceptionUtils.convert(e2);
                }
            }
        }

        @Override // com.rabbitmq.client.amqp.Consumer.Context
        public void requeue(Map<String, Object> map) {
            Map<String, Object> emptyMap;
            if (this.settled.compareAndSet(false, true)) {
                if (map == null) {
                    try {
                        emptyMap = Collections.emptyMap();
                    } catch (ClientIOException | ClientIllegalStateException | RejectedExecutionException e) {
                        AmqpConsumer.LOGGER.debug("message requeue (modified) failed: {}", e.getMessage());
                        return;
                    } catch (ClientException e2) {
                        throw ExceptionUtils.convert(e2);
                    }
                } else {
                    emptyMap = map;
                }
                Map<String, Object> map2 = emptyMap;
                Utils.checkMessageAnnotations(map2);
                this.protonExecutor.execute(this.replenishCreditOperation);
                this.delivery.disposition(DeliveryState.modified(false, false, map2), true);
                this.unsettledMessageCount.decrementAndGet();
                this.metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.REQUEUED);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/rabbitmq/client/amqp/impl/AmqpConsumer$PauseStatus.class */
    public enum PauseStatus {
        UNPAUSED,
        PAUSING,
        PAUSED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AmqpConsumer(AmqpConsumerBuilder amqpConsumerBuilder) {
        super(amqpConsumerBuilder.listeners());
        this.closed = new AtomicBoolean(false);
        this.pauseStatus = new AtomicReference<>(PauseStatus.UNPAUSED);
        this.echoedFlowAfterPauseLatch = new AtomicReference<>();
        this.unsettledMessageCount = new AtomicLong(0L);
        this.replenishCreditOperation = this::replenishCreditIfNeeded;
        this.id = Long.valueOf(ID_SEQUENCE.getAndIncrement());
        this.initialCredits = amqpConsumerBuilder.initialCredits();
        this.messageHandler = amqpConsumerBuilder.connection().observationCollector().subscribe(amqpConsumerBuilder.queue(), amqpConsumerBuilder.messageHandler());
        DefaultAddressBuilder<?> addressBuilder = Utils.addressBuilder();
        addressBuilder.queue(amqpConsumerBuilder.queue());
        this.address = addressBuilder.address();
        this.queue = amqpConsumerBuilder.queue();
        this.filters = Map.copyOf(amqpConsumerBuilder.filters());
        this.linkProperties = Map.copyOf(amqpConsumerBuilder.properties());
        this.subscriptionListener = (ConsumerBuilder.SubscriptionListener) Optional.ofNullable(amqpConsumerBuilder.subscriptionListener()).orElse(AmqpConsumerBuilder.NO_OP_SUBSCRIPTION_LISTENER);
        this.connection = amqpConsumerBuilder.connection();
        this.sessionHandler = this.connection.createSessionHandler();
        this.nativeReceiver = createNativeReceiver(this.sessionHandler.session(), this.address, this.linkProperties, this.filters, this.subscriptionListener);
        initStateFromNativeReceiver(this.nativeReceiver);
        this.metricsCollector = this.connection.metricsCollector();
        startReceivingLoop();
        state(Resource.State.OPEN);
        this.metricsCollector.openConsumer();
    }

    @Override // com.rabbitmq.client.amqp.Consumer
    public void pause() {
        if (this.pauseStatus.compareAndSet(PauseStatus.UNPAUSED, PauseStatus.PAUSING)) {
            try {
                CountDownLatch countDownLatch = new CountDownLatch(1);
                this.echoedFlowAfterPauseLatch.set(countDownLatch);
                this.protonExecutor.execute(this::doPause);
                try {
                    if (countDownLatch.await(10L, TimeUnit.SECONDS)) {
                        this.pauseStatus.set(PauseStatus.PAUSED);
                    } else {
                        LOGGER.warn("Did not receive echoed flow to pause receiver");
                        this.pauseStatus.set(PauseStatus.UNPAUSED);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            } catch (Exception e2) {
                this.pauseStatus.set(PauseStatus.UNPAUSED);
            }
        }
    }

    @Override // com.rabbitmq.client.amqp.Consumer
    public void unpause() {
        checkOpen();
        if (this.pauseStatus.compareAndSet(PauseStatus.PAUSED, PauseStatus.UNPAUSED)) {
            try {
                this.nativeReceiver.addCredit(this.initialCredits);
            } catch (ClientException e) {
                throw ExceptionUtils.convert(e);
            }
        }
    }

    @Override // com.rabbitmq.client.amqp.Consumer
    public long unsettledMessageCount() {
        return this.unsettledMessageCount.get();
    }

    @Override // com.rabbitmq.client.amqp.Consumer, java.lang.AutoCloseable
    public void close() {
        close(null);
    }

    private ClientReceiver createNativeReceiver(Session session, String str, Map<String, Object> map, Map<String, Object> map2, ConsumerBuilder.SubscriptionListener subscriptionListener) {
        try {
            LinkedHashMap linkedHashMap = new LinkedHashMap(map2);
            ConsumerBuilder.StreamOptions streamOptions = AmqpConsumerBuilder.streamOptions(linkedHashMap);
            subscriptionListener.preSubscribe(() -> {
                return streamOptions;
            });
            ReceiverOptions properties = new ReceiverOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE).autoAccept(false).autoSettle(false).creditWindow(0).properties(map);
            if (!linkedHashMap.isEmpty()) {
                properties.sourceOptions().filters(linkedHashMap);
            }
            return (ClientReceiver) ExceptionUtils.wrapGet(session.openReceiver(str, properties).openFuture());
        } catch (ClientException e) {
            throw ExceptionUtils.convert(e, "Error while creating receiver from '%s'", str);
        }
    }

    private Runnable createReceiveTask(Receiver receiver, Consumer.MessageHandler messageHandler) {
        return () -> {
            try {
                receiver.addCredit(this.initialCredits);
                while (!Thread.currentThread().isInterrupted()) {
                    Delivery receive = receiver.receive(100L, TimeUnit.MILLISECONDS);
                    if (receive != null) {
                        this.unsettledMessageCount.incrementAndGet();
                        this.metricsCollector.consume();
                        messageHandler.handle(new DeliveryContext(receive, this.protonExecutor, this.metricsCollector, this.unsettledMessageCount, this.replenishCreditOperation), new AmqpMessage((Message<byte[]>) receive.message()));
                    }
                }
            } catch (ClientConnectionRemotelyClosedException e) {
            } catch (ClientLinkRemotelyClosedException e2) {
                if (ExceptionUtils.notFound(e2) || ExceptionUtils.resourceDeleted(e2)) {
                    close(ExceptionUtils.convert(e2));
                }
            } catch (ClientException e3) {
                (this.closed.get() ? str -> {
                    LOGGER.debug(str, e3);
                } : str2 -> {
                    LOGGER.warn(str2, e3);
                }).accept("Error while polling AMQP receiver");
            } catch (Exception e4) {
                LOGGER.warn("Unexpected error in consumer loop", e4);
            }
        };
    }

    private void startReceivingLoop() {
        this.receiveLoop = this.connection.environment().consumerExecutorService().submit(createReceiveTask(this.nativeReceiver, this.messageHandler));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recoverAfterConnectionFailure() {
        this.nativeReceiver = (ClientReceiver) RetryUtils.callAndMaybeRetry(() -> {
            return createNativeReceiver(this.sessionHandler.sessionNoCheck(), this.address, this.linkProperties, this.filters, this.subscriptionListener);
        }, (Predicate<Exception>) exc -> {
            boolean z = (exc instanceof AmqpException.AmqpResourceClosedException) && exc.getMessage().contains("stream queue") && exc.getMessage().contains("does not have a running replica on the local node");
            LOGGER.debug("Retrying receiver creation on consumer recovery: {}", Boolean.valueOf(z));
            return z;
        }, (List<Duration>) List.of(Duration.ofSeconds(1L), Duration.ofSeconds(2L), Duration.ofSeconds(3L), BackOffDelayPolicy.TIMEOUT), "Create AMQP receiver to address '%s'", this.address);
        initStateFromNativeReceiver(this.nativeReceiver);
        this.pauseStatus.set(PauseStatus.UNPAUSED);
        this.unsettledMessageCount.set(0L);
        startReceivingLoop();
    }

    private void close(Throwable th) {
        if (this.closed.compareAndSet(false, true)) {
            state(Resource.State.CLOSING, th);
            this.connection.removeConsumer(this);
            if (this.receiveLoop != null) {
                this.receiveLoop.cancel(true);
            }
            try {
                this.nativeReceiver.close();
                this.sessionHandler.close();
            } catch (Exception e) {
                LOGGER.warn("Error while closing receiver", e);
            }
            state(Resource.State.CLOSED, th);
            this.metricsCollector.closeConsumer();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long id() {
        return this.id.longValue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String queue() {
        return this.queue;
    }

    private void initStateFromNativeReceiver(ClientReceiver clientReceiver) {
        try {
            Scheduler executor = clientReceiver.executor();
            CountDownLatch countDownLatch = new CountDownLatch(1);
            executor.execute(() -> {
                this.protonReceiver = (ProtonReceiver) clientReceiver.protonReceiver();
                this.creditState = this.protonReceiver.getCreditState();
                this.sessionWindow = this.protonReceiver.sessionWindow();
                this.protonDeliveryQueue = clientReceiver.deliveryQueue();
                EventHandler<com.rabbitmq.qpid.protonj2.engine.Receiver> linkCreditUpdatedHandler = this.protonReceiver.linkCreditUpdatedHandler();
                this.protonReceiver.creditStateUpdateHandler(receiver -> {
                    linkCreditUpdatedHandler.handle(receiver);
                    CountDownLatch andSet = this.echoedFlowAfterPauseLatch.getAndSet(null);
                    if (andSet != null) {
                        andSet.countDown();
                    }
                });
                this.protonExecutor = executor;
                countDownLatch.countDown();
            });
            if (countDownLatch.await(10L, TimeUnit.SECONDS)) {
            } else {
                throw new AmqpException("Could not initialize consumer internal state", new Object[0]);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void replenishCreditIfNeeded() {
        if (pausedOrPausing() || state() != Resource.State.OPEN) {
            return;
        }
        int i = this.initialCredits;
        int credit = this.protonReceiver.getCredit();
        if (credit <= i * 0.5d) {
            int size = credit + this.protonDeliveryQueue.size();
            if (size <= i * 0.7d) {
                try {
                    this.protonReceiver.addCredit(i - size);
                } catch (Exception e) {
                    LOGGER.debug("Error caught during credit top-up", e);
                }
            }
        }
    }

    private void doPause() {
        this.creditState.updateCredit(0);
        this.creditState.updateEcho(true);
        this.sessionWindow.writeFlow(this.protonReceiver);
    }

    boolean pausedOrPausing() {
        return this.pauseStatus.get() != PauseStatus.UNPAUSED;
    }
}
