package org.apache.flink.streaming.connectors.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.utility.Utility;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:org/apache/flink/streaming/connectors/rabbitmq/QueueingConsumer.class */
class QueueingConsumer extends DefaultConsumer {
    private final BlockingQueue<Delivery> queue;
    private volatile ShutdownSignalException shutdown;
    private volatile ConsumerCancelledException cancelled;
    private static final Delivery POISON = new Delivery((Envelope) null, (AMQP.BasicProperties) null, (byte[]) null);

    public QueueingConsumer(Channel channel) {
        this(channel, Integer.MAX_VALUE);
    }

    public QueueingConsumer(Channel channel, int i) {
        super(channel);
        this.queue = new LinkedBlockingQueue(i);
    }

    private void checkShutdown() {
        if (this.shutdown != null) {
            throw Utility.fixStackTrace(this.shutdown);
        }
    }

    private Delivery handle(Delivery delivery) {
        if (delivery == POISON || (delivery == null && (this.shutdown != null || this.cancelled != null))) {
            if (delivery == POISON) {
                this.queue.add(POISON);
                if (this.shutdown == null && this.cancelled == null) {
                    throw new IllegalStateException("POISON in queue, but null shutdown and null cancelled. This should never happen, please report as a BUG");
                }
            }
            if (null != this.shutdown) {
                throw Utility.fixStackTrace(this.shutdown);
            }
            if (null != this.cancelled) {
                throw Utility.fixStackTrace(this.cancelled);
            }
        }
        return delivery;
    }

    public Delivery nextDelivery() throws InterruptedException, ShutdownSignalException, ConsumerCancelledException {
        return handle(this.queue.take());
    }

    public Delivery nextDelivery(long j) throws InterruptedException, ShutdownSignalException, ConsumerCancelledException {
        return nextDelivery(j, TimeUnit.MILLISECONDS);
    }

    public Delivery nextDelivery(long j, TimeUnit timeUnit) throws InterruptedException, ShutdownSignalException, ConsumerCancelledException {
        return handle(this.queue.poll(j, timeUnit));
    }

    public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
        this.shutdown = shutdownSignalException;
        this.queue.add(POISON);
    }

    public void handleCancel(String str) throws IOException {
        this.cancelled = new ConsumerCancelledException();
        this.queue.add(POISON);
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        checkShutdown();
        this.queue.add(new Delivery(envelope, basicProperties, bArr));
    }
}
