/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.sparkreceiver;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.beam.sdk.io.sparkreceiver.HasOffset;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RabbitMqReceiverWithOffset
extends Receiver<String>
implements HasOffset {
    private static final Logger LOG = LoggerFactory.getLogger(RabbitMqReceiverWithOffset.class);
    private static final int MAX_PREFETCH_COUNT = 65535;
    private final String rabbitmqUrl;
    private final String streamName;
    private final long totalMessagesNumber;
    private long startOffset;
    private long recordsProcessed = 0L;
    private final AtomicBoolean isStopped = new AtomicBoolean(false);
    private transient Connection connection;
    private transient Channel channel;

    RabbitMqReceiverWithOffset(String uri, String streamName, long totalMessagesNumber) {
        super(StorageLevel.MEMORY_AND_DISK_2());
        this.rabbitmqUrl = uri;
        this.streamName = streamName;
        this.totalMessagesNumber = totalMessagesNumber;
    }

    public void setStartOffset(Long startOffset) {
        this.startOffset = startOffset != null ? startOffset : 0L;
    }

    public Long getEndOffset() {
        return Long.MAX_VALUE;
    }

    public void setCheckpoint(Long recordsProcessed) {
        this.recordsProcessed = recordsProcessed;
    }

    public void onStart() {
        Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().build()).submit(this::receive);
    }

    public void onStop() {
    }

    private void receive() {
        try {
            LOG.info("Starting receiver with offset {}", (Object)this.startOffset);
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri(this.rabbitmqUrl);
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setConnectionTimeout(600000);
            connectionFactory.setNetworkRecoveryInterval(5000);
            connectionFactory.setRequestedHeartbeat(60);
            connectionFactory.setTopologyRecoveryEnabled(true);
            connectionFactory.setRequestedChannelMax(0);
            connectionFactory.setRequestedFrameMax(0);
            this.connection = connectionFactory.newConnection();
            this.channel = this.connection.createChannel();
            this.channel.queueDeclare(this.streamName, true, false, false, Collections.singletonMap("x-queue-type", "stream"));
            this.channel.basicQos(Math.min(65535, (int)this.totalMessagesNumber));
            TestConsumer testConsumer = new TestConsumer(this.channel, arg_0 -> ((RabbitMqReceiverWithOffset)this).store(arg_0), this.isStopped);
            this.channel.basicConsume(this.streamName, false, Collections.singletonMap("x-stream-offset", this.startOffset), (com.rabbitmq.client.Consumer)testConsumer);
        }
        catch (Exception e) {
            LOG.error("Can not basic consume", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    public void stop(String message) {
        LOG.info(message);
        this.isStopped.set(true);
        super.stop(message);
        try {
            if (this.recordsProcessed != 0L) {
                LOG.info("Try to multiple ack on {}", (Object)this.recordsProcessed);
                this.channel.basicAck(this.recordsProcessed, true);
            }
            this.channel.abort();
            this.connection.close();
            LOG.info("RabbitMQ channel and connection were closed");
        }
        catch (Exception e) {
            LOG.error("Exception during stopping of the RabbitMQ receiver", (Throwable)e);
        }
    }

    public void stop(String message, Throwable error) {
        LOG.error(message, error);
        this.isStopped.set(true);
        super.stop(message, error);
        try {
            if (this.recordsProcessed != 0L) {
                LOG.info("Try to multiple ack on {}", (Object)this.recordsProcessed);
                this.channel.basicAck(this.recordsProcessed, true);
            }
            this.channel.abort();
            this.connection.close();
            LOG.info("Closed RabbitMQ channel and connection");
        }
        catch (Exception e) {
            LOG.error("Can't close RabbitMQ channel and connection", (Throwable)e);
        }
    }

    static class TestConsumer
    extends DefaultConsumer {
        private final Consumer<String> messageConsumer;
        private final AtomicBoolean isReceiverStopped;
        private final Channel channel;

        public TestConsumer(Channel channel, Consumer<String> messageConsumer, AtomicBoolean isStopped) {
            super(channel);
            this.channel = channel;
            this.isReceiverStopped = isStopped;
            this.messageConsumer = messageConsumer;
        }

        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
            try {
                String sMessage = new String(body, StandardCharsets.UTF_8);
                if (this.channel.isOpen() && !this.isReceiverStopped.get()) {
                    this.messageConsumer.accept(sMessage);
                }
            }
            catch (Exception e) {
                LOG.error("Can't read from RabbitMQ: {}", (Object)e.getMessage());
            }
        }
    }
}

