package com.ning.metrics.collector.realtime.amq;

import com.ning.metrics.collector.binder.config.CollectorConfig;
import com.ning.metrics.collector.realtime.EventQueueSession;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.AlreadyClosedException;
import org.apache.activemq.ConnectionFailedException;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/ning/metrics/collector/realtime/amq/ActiveMQSession.class */
class ActiveMQSession implements EventQueueSession {
    private static final Logger logger = Logger.getLogger(ActiveMQSession.class);
    private final CollectorConfig config;
    private final ActiveMQConnection connection;
    private final String topic;
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private final Object sessionMonitor = new Object();
    private TopicSession session = null;
    private TopicPublisher publisher;

    public ActiveMQSession(CollectorConfig collectorConfig, ActiveMQConnection activeMQConnection, String str) {
        this.config = collectorConfig;
        this.connection = activeMQConnection;
        this.topic = str;
        reinit();
    }

    @Override // com.ning.metrics.collector.realtime.EventQueueSession
    public void close() {
        if (this.isRunning.get()) {
            synchronized (this.sessionMonitor) {
                this.isRunning.set(false);
                if (this.publisher != null) {
                    try {
                        this.publisher.close();
                    } catch (JMSException e) {
                        logger.warn(String.format("Got error while trying to close a producer for topic %s", this.topic), e);
                    }
                    this.publisher = null;
                }
                if (this.session != null) {
                    try {
                        this.session.close();
                    } catch (JMSException e2) {
                        logger.warn(String.format("Got error while trying to close a session for topic %s", this.topic), e2);
                    }
                    this.session = null;
                }
            }
        }
    }

    private void reinit() {
        synchronized (this.sessionMonitor) {
            close();
            while (!this.isRunning.get()) {
                try {
                    this.session = this.connection.createTopicSession();
                    this.publisher = this.session.createPublisher(this.session.createTopic(this.topic));
                    this.publisher.setDeliveryMode(1);
                    this.publisher.setTimeToLive(this.config.getMessagesTTLMilliseconds());
                    this.isRunning.set(true);
                } catch (JMSException e) {
                    logger.debug(String.format("Got error while trying to get a session for topic %s", this.topic));
                }
            }
        }
    }

    private boolean shouldReinit(JMSException jMSException) {
        return (jMSException instanceof AlreadyClosedException) || (jMSException instanceof IllegalStateException) || (jMSException instanceof ConnectionFailedException) || (jMSException.getCause() instanceof IOException);
    }

    @Override // com.ning.metrics.collector.realtime.EventQueueSession
    public void send(Object obj) {
        if (this.isRunning.get()) {
            try {
                this.publisher.send(this.session.createTextMessage(obj.toString()));
            } catch (JMSException e) {
                if (shouldReinit(e)) {
                    reinit();
                } else {
                    logger.debug(String.format("Got error while trying to send a message to topic %s", this.topic));
                }
            }
        }
    }
}
