package org.apache.pulsar.connect.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.pulsar.connect.core.PushSource;
import org.apache.pulsar.connect.core.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/connect/rabbitmq/RabbitMQSource.class */
public class RabbitMQSource implements PushSource<byte[]> {
    private static Logger logger = LoggerFactory.getLogger(RabbitMQSource.class);
    private Function<Record<byte[]>, CompletableFuture<Void>> consumer;
    private Connection rabbitMQConnection;
    private Channel rabbitMQChannel;
    private RabbitMQConfig rabbitMQConfig;

    /* loaded from: input_file:org/apache/pulsar/connect/rabbitmq/RabbitMQSource$RabbitMQConsumer.class */
    private class RabbitMQConsumer extends DefaultConsumer {
        private Function<Record<byte[]>, CompletableFuture<Void>> consumeFunction;

        public RabbitMQConsumer(Function<Record<byte[]>, CompletableFuture<Void>> function, Channel channel) {
            super(channel);
            this.consumeFunction = function;
        }

        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            this.consumeFunction.apply(new RabbitMQRecord(bArr));
        }
    }

    /* loaded from: input_file:org/apache/pulsar/connect/rabbitmq/RabbitMQSource$RabbitMQRecord.class */
    private static class RabbitMQRecord implements Record<byte[]> {
        private final byte[] data;

        public RabbitMQRecord(byte[] bArr) {
            this.data = bArr;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public byte[] m1getValue() {
            return this.data;
        }
    }

    public void setConsumer(Function<Record<byte[]>, CompletableFuture<Void>> function) {
        this.consumer = function;
    }

    public void open(Map<String, Object> map) throws Exception {
        this.rabbitMQConfig = RabbitMQConfig.load(map);
        if (this.rabbitMQConfig.getAmqUri() == null || this.rabbitMQConfig.getQueueName() == null) {
            throw new IllegalArgumentException("Required property not set.");
        }
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(this.rabbitMQConfig.getAmqUri());
        this.rabbitMQConnection = connectionFactory.newConnection(this.rabbitMQConfig.getConnectionName());
        logger.info("A new connection to {}:{} has been opened successfully.", this.rabbitMQConnection.getAddress().getCanonicalHostName(), Integer.valueOf(this.rabbitMQConnection.getPort()));
        this.rabbitMQChannel = this.rabbitMQConnection.createChannel();
        this.rabbitMQChannel.queueDeclare(this.rabbitMQConfig.getQueueName(), false, false, false, (Map) null);
        this.rabbitMQChannel.basicConsume(this.rabbitMQConfig.getQueueName(), new RabbitMQConsumer(this.consumer, this.rabbitMQChannel));
        logger.info("A consumer for queue {} has been successfully started.", this.rabbitMQConfig.getQueueName());
    }

    public void close() throws Exception {
        this.rabbitMQChannel.close();
        this.rabbitMQConnection.close();
    }
}
