package org.apache.seatunnel.connectors.seatunnel.rabbitmq.client;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.utility.Utility;
import java.io.IOException;
import org.apache.seatunnel.common.Handover;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/QueueingConsumer.class */
public class QueueingConsumer extends DefaultConsumer {
    private final Handover<Delivery> handover;
    private volatile ShutdownSignalException shutdown;
    private volatile ConsumerCancelledException cancelled;
    private static final Logger log = LoggerFactory.getLogger(QueueingConsumer.class);
    private static final Delivery POISON = new Delivery(null, null, null);

    public QueueingConsumer(Channel channel, Handover<Delivery> handover) {
        this(channel, Integer.MAX_VALUE, handover);
    }

    public QueueingConsumer(Channel channel, int i, Handover<Delivery> handover) {
        super(channel);
        this.handover = handover;
    }

    private void checkShutdown() {
        if (this.shutdown != null) {
            throw ((ShutdownSignalException) Utility.fixStackTrace(this.shutdown));
        }
    }

    @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
    public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
        this.shutdown = shutdownSignalException;
        try {
            this.handover.produce(POISON);
        } catch (InterruptedException | Handover.ClosedException e) {
            throw new RabbitmqConnectorException(RabbitmqConnectorErrorCode.HANDLE_SHUTDOWN_SIGNAL_FAILED, e);
        }
    }

    @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
    public void handleCancel(String str) throws IOException {
        this.cancelled = new ConsumerCancelledException();
        this.handover.produce(POISON);
    }

    @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        checkShutdown();
        this.handover.produce(new Delivery(envelope, basicProperties, bArr));
    }
}
