package org.apache.flink.streaming.connectors.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.util.SerializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/rabbitmq/RMQSink.class */
public class RMQSink<IN> extends RichSinkFunction<IN> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class);
    private String QUEUE_NAME;
    private String HOST_NAME;
    private transient ConnectionFactory factory;
    private transient Connection connection;
    private transient Channel channel;
    private SerializationSchema<IN, byte[]> scheme;

    public RMQSink(String str, String str2, SerializationSchema<IN, byte[]> serializationSchema) {
        this.HOST_NAME = str;
        this.QUEUE_NAME = str2;
        this.scheme = serializationSchema;
    }

    public void initializeConnection() {
        this.factory = new ConnectionFactory();
        this.factory.setHost(this.HOST_NAME);
        try {
            this.connection = this.factory.newConnection();
            this.channel = this.connection.createChannel();
            this.channel.queueDeclare(this.QUEUE_NAME, false, false, false, (Map) null);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void invoke(IN in) {
        try {
            this.channel.basicPublish("", this.QUEUE_NAME, (AMQP.BasicProperties) null, this.scheme.serialize(in));
        } catch (IOException e) {
            if (LOG.isErrorEnabled()) {
                LOG.error("Cannot send RMQ message {} at {}", this.QUEUE_NAME, this.HOST_NAME);
            }
        }
    }

    private void closeChannel() {
        try {
            this.channel.close();
            this.connection.close();
        } catch (IOException e) {
            throw new RuntimeException("Error while closing RMQ connection with " + this.QUEUE_NAME + " at " + this.HOST_NAME, e);
        }
    }

    public void open(Configuration configuration) {
        initializeConnection();
    }

    public void close() {
        closeChannel();
    }
}
