package org.apache.flink.streaming.connectors.activemq;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.activemq.internal.AMQUtil;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/activemq/AMQSink.class */
public class AMQSink<IN> extends RichSinkFunction<IN> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(AMQSink.class);
    private final ActiveMQConnectionFactory connectionFactory;
    private final String destinationName;
    private final SerializationSchema<IN> serializationSchema;
    private final boolean persistentDelivery;
    private final DestinationType destinationType;
    private boolean logFailuresOnly = false;
    private transient MessageProducer producer;
    private transient Session session;
    private transient Connection connection;

    public AMQSink(AMQSinkConfig<IN> aMQSinkConfig) {
        this.connectionFactory = aMQSinkConfig.getConnectionFactory();
        this.destinationName = aMQSinkConfig.getDestinationName();
        this.serializationSchema = aMQSinkConfig.getSerializationSchema();
        this.persistentDelivery = aMQSinkConfig.isPersistentDelivery();
        this.destinationType = aMQSinkConfig.getDestinationType();
    }

    public void setLogFailuresOnly(boolean z) {
        this.logFailuresOnly = z;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.connection = this.connectionFactory.createConnection();
        this.connection.start();
        this.session = this.connection.createSession(false, 1);
        this.producer = this.session.createProducer(AMQUtil.getDestination(this.session, this.destinationType, this.destinationName));
        this.producer.setDeliveryMode(getDeliveryMode());
    }

    private int getDeliveryMode() {
        return this.persistentDelivery ? 2 : 1;
    }

    public void invoke(IN in) {
        try {
            byte[] serialize = this.serializationSchema.serialize(in);
            BytesMessage createBytesMessage = this.session.createBytesMessage();
            createBytesMessage.writeBytes(serialize);
            this.producer.send(createBytesMessage);
        } catch (JMSException e) {
            if (!this.logFailuresOnly) {
                throw new RuntimeException("Failed to send message to ActiveMQ", e);
            }
            LOG.error("Failed to send message to ActiveMQ", e);
        }
    }

    public void close() {
        RuntimeException runtimeException = null;
        try {
            this.session.close();
        } catch (JMSException e) {
            if (this.logFailuresOnly) {
                LOG.error("Failed to close ActiveMQ session", e);
            } else {
                runtimeException = new RuntimeException("Failed to close ActiveMQ session", e);
            }
        }
        try {
            this.connection.close();
        } catch (JMSException e2) {
            if (this.logFailuresOnly) {
                LOG.error("Failed to close ActiveMQ connection", e2);
            } else {
                runtimeException = runtimeException == null ? new RuntimeException("Failed to close ActiveMQ session", e2) : runtimeException;
            }
        }
        if (runtimeException != null) {
            throw runtimeException;
        }
    }
}
