package com.ning.metrics.collector.realtime.amq;

import com.ning.metrics.collector.binder.config.CollectorConfig;
import com.ning.metrics.collector.realtime.EventQueueSession;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Timer;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.AlreadyClosedException;
import org.apache.activemq.ConnectionFailedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ning/metrics/collector/realtime/amq/ActiveMQSession.class */
public class ActiveMQSession implements EventQueueSession {
    private static final Logger logger = LoggerFactory.getLogger(ActiveMQSession.class);
    private static final Charset UTF8 = Charset.forName("UTF-8");
    private final Timer timer;
    private final CollectorConfig config;
    private final ActiveMQConnection connection;
    private final String topic;
    private final AtomicBoolean useBytesMessage;
    private TopicPublisher publisher;
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private final Object sessionMonitor = new Object();
    private TopicSession session = null;

    public ActiveMQSession(CollectorConfig collectorConfig, ActiveMQConnection activeMQConnection, String str, AtomicBoolean atomicBoolean) {
        this.config = collectorConfig;
        this.connection = activeMQConnection;
        this.topic = str;
        this.useBytesMessage = atomicBoolean;
        this.timer = Metrics.newTimer(ActiveMQSession.class, str, TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
        reinit();
    }

    @Override // com.ning.metrics.collector.realtime.EventQueueSession
    public CollectorConfig getConfig() {
        return this.config;
    }

    @Override // com.ning.metrics.collector.realtime.EventQueueSession
    public void close() {
        if (this.isRunning.get()) {
            synchronized (this.sessionMonitor) {
                this.isRunning.set(false);
                if (this.publisher != null) {
                    try {
                        this.publisher.close();
                    } catch (JMSException e) {
                        logger.warn(String.format("Got error while trying to close a producer for topic %s", this.topic), (Throwable) e);
                    }
                    this.publisher = null;
                }
                if (this.session != null) {
                    try {
                        this.session.close();
                    } catch (JMSException e2) {
                        logger.warn(String.format("Got error while trying to close a session for topic %s", this.topic), (Throwable) e2);
                    }
                    this.session = null;
                }
            }
        }
    }

    private void reinit() {
        long currentTimeMillis = System.currentTimeMillis();
        synchronized (this.sessionMonitor) {
            close();
            while (!this.isRunning.get()) {
                try {
                    this.session = this.connection.createTopicSession();
                    this.publisher = this.session.createPublisher(this.session.createTopic(this.topic));
                    this.publisher.setDeliveryMode(1);
                    this.publisher.setTimeToLive(this.config.getMessagesTTLMilliseconds());
                    this.isRunning.set(true);
                } catch (JMSException e) {
                    logger.debug(String.format("Got error while trying to get a session for topic %s", this.topic));
                }
            }
        }
        logger.info(String.format("Recreated topic [%s] in %d seconds", this.topic, Long.valueOf((System.currentTimeMillis() - currentTimeMillis) / 1000)));
    }

    private boolean shouldReinit(JMSException jMSException) {
        return (jMSException instanceof AlreadyClosedException) || (jMSException instanceof IllegalStateException) || (jMSException instanceof ConnectionFailedException) || (jMSException.getCause() instanceof IOException);
    }

    @Override // com.ning.metrics.collector.realtime.EventQueueSession
    public void send(Object obj) {
        if (this.isRunning.get()) {
            try {
                long nanoTime = System.nanoTime();
                this.publisher.send(createMessage(obj));
                this.timer.update(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            } catch (JMSException e) {
                if (shouldReinit(e)) {
                    reinit();
                } else {
                    logger.debug(String.format("Got error while trying to send a message to topic %s", this.topic));
                }
            }
        }
    }

    protected Message createMessage(Object obj) throws JMSException {
        String obj2 = obj.toString();
        if (!this.useBytesMessage.get()) {
            return this.session.createTextMessage(obj2);
        }
        BytesMessage createBytesMessage = this.session.createBytesMessage();
        createBytesMessage.writeBytes(obj2.getBytes(UTF8));
        return createBytesMessage;
    }
}
