package com.ning.metrics.collector.events.processing;

import com.google.inject.Inject;
import com.ning.metrics.collector.binder.config.CollectorConfig;
import com.ning.metrics.serialization.util.Managed;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/ning/metrics/collector/events/processing/ActiveMQSenderImpl.class */
public class ActiveMQSenderImpl implements ActiveMQSender {
    private static final Logger logger = Logger.getLogger(ActiveMQSenderImpl.class);
    private final AtomicBoolean stopReconnecting;
    private final AtomicBoolean shouldReconnect;
    private volatile Session session;
    private final ActiveMQConnectionFactory connectionFactory;
    private volatile Connection connection;
    private final int messagesTTLMilliseconds;
    private volatile int connectionRetries;
    private volatile int sessionRetries;

    @Inject
    public ActiveMQSenderImpl(CollectorConfig collectorConfig) {
        this(collectorConfig.getActiveMQHost(), collectorConfig.getActiveMQPort(), collectorConfig.getMessagesTTLMilliseconds());
    }

    public ActiveMQSenderImpl(String str, int i, int i2) {
        this.stopReconnecting = new AtomicBoolean(false);
        this.shouldReconnect = new AtomicBoolean(false);
        this.session = null;
        this.connection = null;
        this.connectionRetries = 0;
        this.sessionRetries = 0;
        this.connectionFactory = new ActiveMQConnectionFactory("tcp://" + str + ":" + i);
        this.messagesTTLMilliseconds = i2;
        requestReconnection();
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: com.ning.metrics.collector.events.processing.ActiveMQSenderImpl.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    ActiveMQSenderImpl.this.shutdown();
                } catch (JMSException e) {
                    ActiveMQSenderImpl.logger.error("Unable to shutdown the ActiveMQ connection");
                }
            }
        });
        new ScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory()).scheduleWithFixedDelay(new Runnable() { // from class: com.ning.metrics.collector.events.processing.ActiveMQSenderImpl.2
            private final Random random = new Random();

            @Override // java.lang.Runnable
            public void run() {
                if (ActiveMQSenderImpl.this.shouldReconnect.get()) {
                    try {
                        Thread.sleep(this.random.nextInt(2000));
                        ActiveMQSenderImpl.this.connection = null;
                        ActiveMQSenderImpl.this.session = null;
                        ActiveMQSenderImpl.this.createConnectionAndSession();
                    } catch (InterruptedException e) {
                        ActiveMQSenderImpl.logger.debug("Interrupted sleeping on reconnect", e);
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }, 0L, 1L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createConnectionAndSession() {
        if (this.stopReconnecting.get()) {
            return;
        }
        try {
            this.connectionRetries++;
            this.connection = this.connectionFactory.createConnection();
            this.connection.start();
            this.sessionRetries++;
            this.session = this.connection.createSession(false, 1);
            this.shouldReconnect.set(false);
            logger.info("Connection to ActiveMQ established");
        } catch (JMSException e) {
            logger.warn("Unable to create a connection and/or a session with the ActiveMQ endpoint");
        }
    }

    private void requestReconnection() {
        this.shouldReconnect.set(true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void shutdown() throws JMSException {
        this.shouldReconnect.set(false);
        this.stopReconnecting.set(true);
        if (this.connection != null) {
            this.connection.stop();
        }
    }

    private void send(MessageProducer messageProducer, String str) throws JMSException {
        TextMessage createTextMessage = this.session.createTextMessage(str);
        messageProducer.send(createTextMessage);
        logger.debug("Sent message to ActiveMQ: " + createTextMessage.hashCode());
    }

    @Override // com.ning.metrics.collector.events.processing.ActiveMQSender
    public boolean sendMessage(String str, String str2) {
        if (this.session == null) {
            logger.info(String.format("Connection to ActiveMQ endpoint not established, dropping message: [%s] [%s] and retrying to connect", str, str2));
            requestReconnection();
            return false;
        }
        try {
            MessageProducer createProducer = this.session.createProducer(new ActiveMQTopic(str));
            createProducer.setDeliveryMode(1);
            createProducer.setTimeToLive(this.messagesTTLMilliseconds);
            send(createProducer, str2);
            return true;
        } catch (JMSException e) {
            logger.debug("Something went wrong while sending message to ActiveMQ", e);
            requestReconnection();
            return false;
        }
    }

    @Managed(description = "get the number of times we retried to connect to ActiveMQ")
    public int getConnectionRetries() {
        return this.connectionRetries;
    }

    @Managed(description = "get the number of times we retried to establish a session to ActiveMQ")
    public int getSessionRetries() {
        return this.sessionRetries;
    }

    @Managed(description = "don't keep trying to (re)connect to ActiveMQ")
    public void giveUpTryingToConnect() {
        this.stopReconnecting.set(true);
    }

    @Managed(description = "keep trying to (re)connect to ActiveMQ")
    public void keepTryingToConnect() {
        this.stopReconnecting.set(false);
    }
}
