/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import lombok.Generated;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.rabbitmq.RabbitMQSourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name="rabbitmq", type=IOType.SOURCE, help="A simple connector to move messages from a RabbitMQ queue to a Pulsar topic", configClass=RabbitMQSourceConfig.class)
public class RabbitMQSource
extends PushSource<byte[]> {
    private static Logger logger = LoggerFactory.getLogger(RabbitMQSource.class);
    private Connection rabbitMQConnection;
    private Channel rabbitMQChannel;
    private RabbitMQSourceConfig rabbitMQSourceConfig;

    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        this.rabbitMQSourceConfig = RabbitMQSourceConfig.load(config, sourceContext);
        this.rabbitMQSourceConfig.validate();
        ConnectionFactory connectionFactory = this.rabbitMQSourceConfig.createConnectionFactory();
        this.rabbitMQConnection = connectionFactory.newConnection(this.rabbitMQSourceConfig.getConnectionName());
        logger.info("A new connection to {}:{} has been opened successfully.", (Object)this.rabbitMQConnection.getAddress().getCanonicalHostName(), (Object)this.rabbitMQConnection.getPort());
        this.rabbitMQChannel = this.rabbitMQConnection.createChannel();
        if (this.rabbitMQSourceConfig.isPassive()) {
            this.rabbitMQChannel.queueDeclarePassive(this.rabbitMQSourceConfig.getQueueName());
        } else {
            this.rabbitMQChannel.queueDeclare(this.rabbitMQSourceConfig.getQueueName(), false, false, false, null);
        }
        logger.info("Setting channel.basicQos({}, {}).", (Object)this.rabbitMQSourceConfig.getPrefetchCount(), (Object)this.rabbitMQSourceConfig.isPrefetchGlobal());
        this.rabbitMQChannel.basicQos(this.rabbitMQSourceConfig.getPrefetchCount(), this.rabbitMQSourceConfig.isPrefetchGlobal());
        RabbitMQConsumer consumer = new RabbitMQConsumer(this, this.rabbitMQChannel);
        this.rabbitMQChannel.basicConsume(this.rabbitMQSourceConfig.getQueueName(), (Consumer)consumer);
        logger.info("A consumer for queue {} has been successfully started.", (Object)this.rabbitMQSourceConfig.getQueueName());
    }

    public void close() throws Exception {
        this.rabbitMQChannel.close();
        this.rabbitMQConnection.close();
    }

    private class RabbitMQConsumer
    extends DefaultConsumer {
        private RabbitMQSource source;

        public RabbitMQConsumer(RabbitMQSource source, Channel channel) {
            super(channel);
            this.source = source;
        }

        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            this.source.consume(new RabbitMQRecord(Optional.ofNullable(envelope.getRoutingKey()), body));
            long deliveryTag = envelope.getDeliveryTag();
            this.getChannel().basicAck(deliveryTag, true);
        }
    }

    private static class RabbitMQRecord
    implements Record<byte[]> {
        private final Optional<String> key;
        private final byte[] value;

        @Generated
        public RabbitMQRecord(Optional<String> key, byte[] value) {
            this.key = key;
            this.value = value;
        }

        @Generated
        public Optional<String> getKey() {
            return this.key;
        }

        @Generated
        public byte[] getValue() {
            return this.value;
        }

        @Generated
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof RabbitMQRecord)) {
                return false;
            }
            RabbitMQRecord other = (RabbitMQRecord)o;
            if (!other.canEqual(this)) {
                return false;
            }
            Optional<String> this$key = this.getKey();
            Optional<String> other$key = other.getKey();
            if (this$key == null ? other$key != null : !((Object)this$key).equals(other$key)) {
                return false;
            }
            return Arrays.equals(this.getValue(), other.getValue());
        }

        @Generated
        protected boolean canEqual(Object other) {
            return other instanceof RabbitMQRecord;
        }

        @Generated
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Optional<String> $key = this.getKey();
            result = result * 59 + ($key == null ? 43 : ((Object)$key).hashCode());
            result = result * 59 + Arrays.hashCode(this.getValue());
            return result;
        }

        @Generated
        public String toString() {
            return "RabbitMQSource.RabbitMQRecord(key=" + this.getKey() + ", value=" + Arrays.toString(this.getValue()) + ")";
        }
    }
}

