package org.apache.flink.streaming.connectors.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/rabbitmq/RMQSink.class */
public class RMQSink<IN> extends RichSinkFunction<IN> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class);

    @Nullable
    protected final String queueName;
    private final RMQConnectionConfig rmqConnectionConfig;
    protected transient Connection connection;
    protected transient Channel channel;
    protected SerializationSchema<IN> schema;
    private boolean logFailuresOnly;

    @Nullable
    private final RMQSinkPublishOptions<IN> publishOptions;

    @Nullable
    private final SerializableReturnListener returnListener;

    private RMQSink(RMQConnectionConfig rMQConnectionConfig, @Nullable String str, SerializationSchema<IN> serializationSchema, @Nullable RMQSinkPublishOptions<IN> rMQSinkPublishOptions, @Nullable SerializableReturnListener serializableReturnListener) {
        this.logFailuresOnly = false;
        this.rmqConnectionConfig = rMQConnectionConfig;
        this.queueName = str;
        this.schema = serializationSchema;
        this.publishOptions = rMQSinkPublishOptions;
        this.returnListener = serializableReturnListener;
    }

    @PublicEvolving
    public RMQSink(RMQConnectionConfig rMQConnectionConfig, String str, SerializationSchema<IN> serializationSchema) {
        this(rMQConnectionConfig, str, serializationSchema, null, null);
    }

    @PublicEvolving
    public RMQSink(RMQConnectionConfig rMQConnectionConfig, SerializationSchema<IN> serializationSchema, RMQSinkPublishOptions<IN> rMQSinkPublishOptions) {
        this(rMQConnectionConfig, null, serializationSchema, rMQSinkPublishOptions, null);
    }

    @PublicEvolving
    public RMQSink(RMQConnectionConfig rMQConnectionConfig, SerializationSchema<IN> serializationSchema, RMQSinkPublishOptions<IN> rMQSinkPublishOptions, SerializableReturnListener serializableReturnListener) {
        this(rMQConnectionConfig, null, serializationSchema, rMQSinkPublishOptions, serializableReturnListener);
    }

    protected void setupQueue() throws IOException {
        if (this.queueName != null) {
            this.channel.queueDeclare(this.queueName, false, false, false, (Map) null);
        }
    }

    public void setLogFailuresOnly(boolean z) {
        this.logFailuresOnly = z;
    }

    public void open(Configuration configuration) throws Exception {
        try {
            this.connection = this.rmqConnectionConfig.getConnectionFactory().newConnection();
            this.channel = this.connection.createChannel();
            if (this.channel == null) {
                throw new RuntimeException("None of RabbitMQ channels are available");
            }
            setupQueue();
            if (this.returnListener != null) {
                this.channel.addReturnListener(this.returnListener);
            }
        } catch (IOException e) {
            throw new RuntimeException("Error while creating the channel", e);
        }
    }

    public void invoke(IN in) {
        try {
            byte[] serialize = this.schema.serialize(in);
            if (this.publishOptions == null) {
                this.channel.basicPublish("", this.queueName, (AMQP.BasicProperties) null, serialize);
            } else {
                boolean computeMandatory = this.publishOptions.computeMandatory(in);
                boolean computeImmediate = this.publishOptions.computeImmediate(in);
                Preconditions.checkState((this.returnListener == null && (computeMandatory || computeImmediate)) ? false : true, "Setting mandatory and/or immediate flags to true requires a ReturnListener.");
                this.channel.basicPublish(this.publishOptions.computeExchange(in), this.publishOptions.computeRoutingKey(in), computeMandatory, computeImmediate, this.publishOptions.computeProperties(in), serialize);
            }
        } catch (IOException e) {
            if (!this.logFailuresOnly) {
                throw new RuntimeException("Cannot send RMQ message " + this.queueName + " at " + this.rmqConnectionConfig.getHost(), e);
            }
            LOG.error("Cannot send RMQ message {} at {}", new Object[]{this.queueName, this.rmqConnectionConfig.getHost(), e});
        }
    }

    public void close() {
        Throwable th = null;
        try {
            if (this.channel != null) {
                this.channel.close();
            }
        } catch (IOException | TimeoutException e) {
            th = e;
        }
        try {
            if (this.connection != null) {
                this.connection.close();
            }
        } catch (IOException e2) {
            if (th != null) {
                LOG.warn("Both channel and connection closing failed. Logging channel exception and failing with connection exception", th);
            }
            th = e2;
        }
        if (th != null) {
            throw new RuntimeException("Error while closing RMQ connection with " + this.queueName + " at " + this.rmqConnectionConfig.getHost(), th);
        }
    }
}
