/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RMQSink<IN>
extends RichSinkFunction<IN> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class);
    private String queueName;
    private RMQConnectionConfig rmqConnectionConfig;
    private transient Connection connection;
    private transient Channel channel;
    private SerializationSchema<IN> schema;
    private boolean logFailuresOnly = false;

    public RMQSink(RMQConnectionConfig rmqConnectionConfig, String queueName, SerializationSchema<IN> schema) {
        this.rmqConnectionConfig = rmqConnectionConfig;
        this.queueName = queueName;
        this.schema = schema;
    }

    public void setLogFailuresOnly(boolean logFailuresOnly) {
        this.logFailuresOnly = logFailuresOnly;
    }

    public void open(Configuration config) throws Exception {
        ConnectionFactory factory = this.rmqConnectionConfig.getConnectionFactory();
        try {
            this.connection = factory.newConnection();
            this.channel = this.connection.createChannel();
            if (this.channel == null) {
                throw new RuntimeException("None of RabbitMQ channels are available");
            }
            this.channel.queueDeclare(this.queueName, false, false, false, null);
        }
        catch (IOException e) {
            throw new RuntimeException("Error while creating the channel", e);
        }
    }

    public void invoke(IN value) {
        try {
            byte[] msg = this.schema.serialize(value);
            this.channel.basicPublish("", this.queueName, null, msg);
        }
        catch (IOException e) {
            if (this.logFailuresOnly) {
                LOG.error("Cannot send RMQ message {} at {}", new Object[]{this.queueName, this.rmqConnectionConfig.getHost(), e});
            }
            throw new RuntimeException("Cannot send RMQ message " + this.queueName + " at " + this.rmqConnectionConfig.getHost(), e);
        }
    }

    public void close() {
        IOException t = null;
        try {
            this.channel.close();
        }
        catch (IOException e) {
            t = e;
        }
        try {
            this.connection.close();
        }
        catch (IOException e) {
            if (t != null) {
                LOG.warn("Both channel and connection closing failed. Logging channel exception and failing with connection exception", (Throwable)t);
            }
            t = e;
        }
        if (t != null) {
            throw new RuntimeException("Error while closing RMQ connection with " + this.queueName + " at " + this.rmqConnectionConfig.getHost(), t);
        }
    }
}

