package org.apache.druid.firehose.rabbitmq;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nullable;
import net.jodah.lyra.ConnectionOptions;
import net.jodah.lyra.Connections;
import net.jodah.lyra.config.Config;
import net.jodah.lyra.config.ConfigurableConnection;
import net.jodah.lyra.retry.RetryPolicy;
import net.jodah.lyra.util.Duration;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.java.util.common.logger.Logger;

/* loaded from: input_file:org/apache/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.class */
public class RabbitMQFirehoseFactory implements FirehoseFactory<InputRowParser<ByteBuffer>> {
    private static final Logger log = new Logger(RabbitMQFirehoseFactory.class);
    private final RabbitMQFirehoseConfig config;
    private final JacksonifiedConnectionFactory connectionFactory;

    /* loaded from: input_file:org/apache/druid/firehose/rabbitmq/RabbitMQFirehoseFactory$QueueingConsumer.class */
    private static class QueueingConsumer extends DefaultConsumer {
        private final BlockingQueue<QueueingConsumer.Delivery> _queue;

        public QueueingConsumer(Channel channel) {
            this(channel, new LinkedBlockingQueue());
        }

        public QueueingConsumer(Channel channel, BlockingQueue<QueueingConsumer.Delivery> blockingQueue) {
            super(channel);
            this._queue = blockingQueue;
        }

        public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
            this._queue.clear();
        }

        public void handleCancel(String str) {
            this._queue.clear();
        }

        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
            this._queue.add(new QueueingConsumer.Delivery(envelope, basicProperties, bArr));
        }

        public QueueingConsumer.Delivery nextDelivery() throws InterruptedException, ShutdownSignalException, ConsumerCancelledException {
            return this._queue.take();
        }
    }

    @JsonCreator
    public RabbitMQFirehoseFactory(@JsonProperty("connection") JacksonifiedConnectionFactory jacksonifiedConnectionFactory, @JsonProperty("config") RabbitMQFirehoseConfig rabbitMQFirehoseConfig, @JsonProperty("connectionFactory") JacksonifiedConnectionFactory jacksonifiedConnectionFactory2) throws Exception {
        this.connectionFactory = jacksonifiedConnectionFactory == null ? jacksonifiedConnectionFactory2 == null ? JacksonifiedConnectionFactory.makeDefaultConnectionFactory() : jacksonifiedConnectionFactory2 : jacksonifiedConnectionFactory;
        this.config = rabbitMQFirehoseConfig == null ? RabbitMQFirehoseConfig.makeDefaultConfig() : rabbitMQFirehoseConfig;
    }

    @JsonProperty
    public RabbitMQFirehoseConfig getConfig() {
        return this.config;
    }

    @JsonProperty("connection")
    public JacksonifiedConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    public Firehose connect(final InputRowParser<ByteBuffer> inputRowParser, File file) throws IOException {
        ConnectionOptions connectionOptions = new ConnectionOptions(this.connectionFactory);
        Config withRecoveryPolicy = new Config().withRecoveryPolicy(new RetryPolicy().withMaxRetries(this.config.getMaxRetries()).withRetryInterval(Duration.seconds(this.config.getRetryIntervalSeconds())).withMaxDuration(Duration.seconds(this.config.getMaxDurationSeconds())));
        String queue = this.config.getQueue();
        String exchange = this.config.getExchange();
        String routingKey = this.config.getRoutingKey();
        boolean isDurable = this.config.isDurable();
        boolean isExclusive = this.config.isExclusive();
        boolean isAutoDelete = this.config.isAutoDelete();
        final ConfigurableConnection create = Connections.create(connectionOptions, withRecoveryPolicy);
        create.addShutdownListener(new ShutdownListener() { // from class: org.apache.druid.firehose.rabbitmq.RabbitMQFirehoseFactory.1
            public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
                RabbitMQFirehoseFactory.log.warn(shutdownSignalException, "Connection closed!", new Object[0]);
            }
        });
        final Channel createChannel = create.createChannel();
        createChannel.queueDeclare(queue, isDurable, isExclusive, isAutoDelete, (Map) null);
        createChannel.queueBind(queue, exchange, routingKey);
        createChannel.addShutdownListener(new ShutdownListener() { // from class: org.apache.druid.firehose.rabbitmq.RabbitMQFirehoseFactory.2
            public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
                RabbitMQFirehoseFactory.log.warn(shutdownSignalException, "Channel closed!", new Object[0]);
            }
        });
        final QueueingConsumer queueingConsumer = new QueueingConsumer(createChannel);
        createChannel.basicConsume(queue, false, queueingConsumer);
        return new Firehose() { // from class: org.apache.druid.firehose.rabbitmq.RabbitMQFirehoseFactory.3
            private InputRow nextRow;
            private long lastDeliveryTag;
            private Iterator<InputRow> nextIterator = Collections.emptyIterator();

            public boolean hasMore() {
                this.nextRow = null;
                try {
                    if (this.nextIterator.hasNext()) {
                        this.nextRow = this.nextIterator.next();
                        return true;
                    }
                    QueueingConsumer.Delivery nextDelivery = queueingConsumer.nextDelivery();
                    if (nextDelivery == null) {
                        return false;
                    }
                    this.lastDeliveryTag = nextDelivery.getEnvelope().getDeliveryTag();
                    this.nextIterator = inputRowParser.parseBatch(ByteBuffer.wrap(nextDelivery.getBody())).iterator();
                    if (!this.nextIterator.hasNext()) {
                        return false;
                    }
                    this.nextRow = this.nextIterator.next();
                    return true;
                } catch (InterruptedException e) {
                    RabbitMQFirehoseFactory.log.wtf(e, "Got interrupted while waiting for next delivery. Doubt this should ever happen.", new Object[0]);
                    return false;
                }
            }

            @Nullable
            public InputRow nextRow() {
                if (this.nextRow != null) {
                    return this.nextRow;
                }
                RabbitMQFirehoseFactory.log.wtf("I have nothing in delivery. Method hasMore() should have returned false.", new Object[0]);
                return null;
            }

            public Runnable commit() {
                return new Runnable() { // from class: org.apache.druid.firehose.rabbitmq.RabbitMQFirehoseFactory.3.1
                    final long deliveryTag;

                    {
                        this.deliveryTag = AnonymousClass3.this.lastDeliveryTag;
                    }

                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            RabbitMQFirehoseFactory.log.info("Acknowledging delivery of messages up to tag: " + this.deliveryTag, new Object[0]);
                            createChannel.basicAck(this.deliveryTag, true);
                        } catch (IOException e) {
                            RabbitMQFirehoseFactory.log.error(e, "Unable to acknowledge message reception to message queue.", new Object[0]);
                        }
                    }
                };
            }

            public void close() throws IOException {
                RabbitMQFirehoseFactory.log.info("Closing connection to RabbitMQ", new Object[0]);
                createChannel.close();
                create.close();
            }
        };
    }
}
