package org.apache.flink.streaming.connectors.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.ConnectorSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/rabbitmq/RMQSource.class */
public class RMQSource<OUT> extends ConnectorSource<OUT> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
    private final String QUEUE_NAME;
    private final String HOST_NAME;
    private transient ConnectionFactory factory;
    private transient Connection connection;
    private transient Channel channel;
    private transient QueueingConsumer consumer;
    private transient QueueingConsumer.Delivery delivery;
    private volatile boolean isRunning;
    OUT out;

    public RMQSource(String str, String str2, DeserializationSchema<OUT> deserializationSchema) {
        super(deserializationSchema);
        this.isRunning = false;
        this.HOST_NAME = str;
        this.QUEUE_NAME = str2;
    }

    private void initializeConnection() {
        this.factory = new ConnectionFactory();
        this.factory.setHost(this.HOST_NAME);
        try {
            this.connection = this.factory.newConnection();
            this.channel = this.connection.createChannel();
            this.channel.queueDeclare(this.QUEUE_NAME, false, false, false, (Map) null);
            this.consumer = new QueueingConsumer(this.channel);
            this.channel.basicConsume(this.QUEUE_NAME, true, this.consumer);
        } catch (IOException e) {
            throw new RuntimeException("Cannot create RMQ connection with " + this.QUEUE_NAME + " at " + this.HOST_NAME, e);
        }
    }

    public void run(Collector<OUT> collector) throws Exception {
        this.isRunning = true;
        while (this.isRunning) {
            try {
                try {
                    this.delivery = this.consumer.nextDelivery();
                } catch (Exception e) {
                    if (LOG.isErrorEnabled()) {
                        LOG.error("Cannot recieve RMQ message {} at {}", this.QUEUE_NAME, this.HOST_NAME);
                    }
                }
                this.out = (OUT) this.schema.deserialize(this.delivery.getBody());
                if (this.schema.isEndOfStream(this.out)) {
                    break;
                } else {
                    collector.collect(this.out);
                }
            } finally {
                this.connection.close();
            }
        }
    }

    public void open(Configuration configuration) throws Exception {
        initializeConnection();
    }

    public void cancel() {
        this.isRunning = false;
        try {
            this.connection.close();
        } catch (IOException e) {
            throw new RuntimeException("Error while closing RMQ connection with " + this.QUEUE_NAME + " at " + this.HOST_NAME, e);
        }
    }
}
