/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.List;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RMQSource<OUT>
extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long>
implements ResultTypeQueryable<OUT> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
    private final RMQConnectionConfig rmqConnectionConfig;
    private final String queueName;
    private final boolean usesCorrelationId;
    protected DeserializationSchema<OUT> schema;
    protected transient Connection connection;
    protected transient Channel channel;
    protected transient QueueingConsumer consumer;
    protected transient boolean autoAck;
    private volatile transient boolean running;

    public RMQSource(RMQConnectionConfig rmqConnectionConfig, String queueName, DeserializationSchema<OUT> deserializationSchema) {
        this(rmqConnectionConfig, queueName, false, deserializationSchema);
    }

    public RMQSource(RMQConnectionConfig rmqConnectionConfig, String queueName, boolean usesCorrelationId, DeserializationSchema<OUT> deserializationSchema) {
        super(String.class);
        this.rmqConnectionConfig = rmqConnectionConfig;
        this.queueName = queueName;
        this.usesCorrelationId = usesCorrelationId;
        this.schema = deserializationSchema;
    }

    protected ConnectionFactory setupConnectionFactory() throws Exception {
        return this.rmqConnectionConfig.getConnectionFactory();
    }

    protected void setupQueue() throws IOException {
        this.channel.queueDeclare(this.queueName, true, false, false, null);
    }

    public void open(Configuration config) throws Exception {
        super.open(config);
        ConnectionFactory factory = this.setupConnectionFactory();
        try {
            this.connection = factory.newConnection();
            this.channel = this.connection.createChannel();
            if (this.channel == null) {
                throw new RuntimeException("None of RabbitMQ channels are available");
            }
            this.setupQueue();
            this.consumer = new QueueingConsumer(this.channel);
            RuntimeContext runtimeContext = this.getRuntimeContext();
            if (runtimeContext instanceof StreamingRuntimeContext && ((StreamingRuntimeContext)runtimeContext).isCheckpointingEnabled()) {
                this.autoAck = false;
                this.channel.txSelect();
            } else {
                this.autoAck = true;
            }
            LOG.debug("Starting RabbitMQ source with autoAck status: " + this.autoAck);
            this.channel.basicConsume(this.queueName, this.autoAck, (Consumer)this.consumer);
        }
        catch (IOException e) {
            throw new RuntimeException("Cannot create RMQ connection with " + this.queueName + " at " + this.rmqConnectionConfig.getHost(), e);
        }
        this.running = true;
    }

    public void close() throws Exception {
        super.close();
        try {
            this.connection.close();
        }
        catch (IOException e) {
            throw new RuntimeException("Error while closing RMQ connection with " + this.queueName + " at " + this.rmqConnectionConfig.getHost(), e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<OUT> ctx) throws Exception {
        while (this.running) {
            QueueingConsumer.Delivery delivery = this.consumer.nextDelivery();
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                Object result = this.schema.deserialize(delivery.getBody());
                if (this.schema.isEndOfStream(result)) {
                    break;
                }
                if (!this.autoAck) {
                    long deliveryTag = delivery.getEnvelope().getDeliveryTag();
                    if (this.usesCorrelationId) {
                        String correlationId = delivery.getProperties().getCorrelationId();
                        Preconditions.checkNotNull((Object)correlationId, (String)"RabbitMQ source was instantiated with usesCorrelationId set to true but a message was received with correlation id set to null!");
                        if (!this.addId(correlationId)) {
                            continue;
                        }
                    }
                    this.sessionIds.add(deliveryTag);
                }
                ctx.collect(result);
            }
        }
    }

    public void cancel() {
        this.running = false;
    }

    protected void acknowledgeSessionIDs(List<Long> sessionIds) {
        try {
            for (long id : sessionIds) {
                this.channel.basicAck(id, false);
            }
            this.channel.txCommit();
        }
        catch (IOException e) {
            throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e);
        }
    }

    public TypeInformation<OUT> getProducedType() {
        return this.schema.getProducedType();
    }
}

