package org.apache.beam.sdk.io.sparkreceiver;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/sparkreceiver/RabbitMqReceiverWithOffset.class */
class RabbitMqReceiverWithOffset extends Receiver<String> implements HasOffset {
    private static final Logger LOG = LoggerFactory.getLogger(RabbitMqReceiverWithOffset.class);
    private static final int MAX_PREFETCH_COUNT = 65535;
    private final String rabbitmqUrl;
    private final String streamName;
    private final long totalMessagesNumber;
    private long startOffset;
    private long recordsProcessed;
    private final AtomicBoolean isStopped;
    private transient Connection connection;
    private transient Channel channel;

    /* loaded from: input_file:org/apache/beam/sdk/io/sparkreceiver/RabbitMqReceiverWithOffset$TestConsumer.class */
    static class TestConsumer extends DefaultConsumer {
        private final Consumer<String> messageConsumer;
        private final AtomicBoolean isReceiverStopped;
        private final Channel channel;

        public TestConsumer(Channel channel, Consumer<String> consumer, AtomicBoolean atomicBoolean) {
            super(channel);
            this.channel = channel;
            this.isReceiverStopped = atomicBoolean;
            this.messageConsumer = consumer;
        }

        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
            try {
                String str2 = new String(bArr, StandardCharsets.UTF_8);
                if (this.channel.isOpen() && !this.isReceiverStopped.get()) {
                    this.messageConsumer.accept(str2);
                }
            } catch (Exception e) {
                RabbitMqReceiverWithOffset.LOG.error("Can't read from RabbitMQ: {}", e.getMessage());
            }
        }
    }

    RabbitMqReceiverWithOffset(String str, String str2, long j) {
        super(StorageLevel.MEMORY_AND_DISK_2());
        this.recordsProcessed = 0L;
        this.isStopped = new AtomicBoolean(false);
        this.rabbitmqUrl = str;
        this.streamName = str2;
        this.totalMessagesNumber = j;
    }

    public void setStartOffset(Long l) {
        this.startOffset = l != null ? l.longValue() : 0L;
    }

    public Long getEndOffset() {
        return Long.MAX_VALUE;
    }

    public void setCheckpoint(Long l) {
        this.recordsProcessed = l.longValue();
    }

    public void onStart() {
        Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().build()).submit(this::receive);
    }

    public void onStop() {
    }

    private void receive() {
        try {
            LOG.info("Starting receiver with offset {}", Long.valueOf(this.startOffset));
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri(this.rabbitmqUrl);
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setConnectionTimeout(600000);
            connectionFactory.setNetworkRecoveryInterval(5000);
            connectionFactory.setRequestedHeartbeat(60);
            connectionFactory.setTopologyRecoveryEnabled(true);
            connectionFactory.setRequestedChannelMax(0);
            connectionFactory.setRequestedFrameMax(0);
            this.connection = connectionFactory.newConnection();
            this.channel = this.connection.createChannel();
            this.channel.queueDeclare(this.streamName, true, false, false, Collections.singletonMap("x-queue-type", "stream"));
            this.channel.basicQos(Math.min(MAX_PREFETCH_COUNT, (int) this.totalMessagesNumber));
            this.channel.basicConsume(this.streamName, false, Collections.singletonMap("x-stream-offset", Long.valueOf(this.startOffset)), new TestConsumer(this.channel, (v1) -> {
                store(v1);
            }, this.isStopped));
        } catch (Exception e) {
            LOG.error("Can not basic consume", e);
            throw new RuntimeException(e);
        }
    }

    public void stop(String str) {
        LOG.info(str);
        this.isStopped.set(true);
        super.stop(str);
        try {
            if (this.recordsProcessed != 0) {
                LOG.info("Try to multiple ack on {}", Long.valueOf(this.recordsProcessed));
                this.channel.basicAck(this.recordsProcessed, true);
            }
            this.channel.abort();
            this.connection.close();
            LOG.info("RabbitMQ channel and connection were closed");
        } catch (Exception e) {
            LOG.error("Exception during stopping of the RabbitMQ receiver", e);
        }
    }

    public void stop(String str, Throwable th) {
        LOG.error(str, th);
        this.isStopped.set(true);
        super.stop(str, th);
        try {
            if (this.recordsProcessed != 0) {
                LOG.info("Try to multiple ack on {}", Long.valueOf(this.recordsProcessed));
                this.channel.basicAck(this.recordsProcessed, true);
            }
            this.channel.abort();
            this.connection.close();
            LOG.info("Closed RabbitMQ channel and connection");
        } catch (Exception e) {
            LOG.error("Can't close RabbitMQ channel and connection", e);
        }
    }
}
