package com.ning.metrics.eventtracker;

import com.ning.metrics.serialization.event.Event;
import com.ning.metrics.serialization.util.Managed;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import scribe.thrift.LogEntry;
import scribe.thrift.ResultCode;

/* loaded from: input_file:com/ning/metrics/eventtracker/ScribeSender.class */
public class ScribeSender implements EventSender {
    private static final Log log = LogFactory.getLog(ScribeSender.class);
    private ScribeClient scribeClient;
    private int messagesToSendBeforeReconnecting;
    private final AtomicInteger connectionRetries = new AtomicInteger(0);
    private final AtomicInteger messagesSuccessfullySent = new AtomicInteger(0);
    private final AtomicInteger messagesSuccessfullySentSinceLastReconnection = new AtomicInteger(0);
    private final AtomicBoolean sleeping = new AtomicBoolean(true);

    public ScribeSender(ScribeClient scribeClient, int i, int i2) {
        this.messagesToSendBeforeReconnecting = 0;
        this.scribeClient = scribeClient;
        this.messagesToSendBeforeReconnecting = i;
        new ScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory()).scheduleAtFixedRate(new Runnable() { // from class: com.ning.metrics.eventtracker.ScribeSender.1
            @Override // java.lang.Runnable
            public void run() {
                if (ScribeSender.this.sleeping.get()) {
                    ScribeSender.log.info("Idle connection to Scribe, re-opening it");
                    ScribeSender.this.createConnection();
                }
                ScribeSender.this.sleeping.set(true);
            }
        }, i2, i2, TimeUnit.MINUTES);
    }

    public synchronized void createConnection() {
        if (this.scribeClient == null) {
            log.warn("Scribe client has not been set up correctly.");
            return;
        }
        try {
            this.connectionRetries.incrementAndGet();
            this.scribeClient.closeLogger();
            this.scribeClient.openLogger();
            log.info("Connection to Scribe established");
        } catch (TTransportException e) {
            log.warn(String.format("Unable to connect to Scribe: %s", e.getLocalizedMessage()));
            this.scribeClient.closeLogger();
        }
    }

    public void shutdown() {
        if (this.scribeClient != null) {
            this.scribeClient.closeLogger();
        }
    }

    @Override // com.ning.metrics.eventtracker.EventSender
    public boolean send(Event event) throws IOException {
        if (this.scribeClient == null) {
            log.warn("Scribe client has not been set up correctly.");
            return false;
        }
        this.sleeping.set(false);
        ArrayList arrayList = new ArrayList(1);
        byte[] serializedEvent = event.getSerializedEvent();
        if (serializedEvent == null) {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            event.writeExternal(new ObjectOutputStream(byteArrayOutputStream));
            serializedEvent = new Base64().encode(byteArrayOutputStream.toByteArray());
        }
        arrayList.add(new LogEntry(event.getName(), String.format("%s:%s", Long.valueOf(event.getEventDateTime().getMillis()), new String(serializedEvent, Charset.forName("UTF-8")))));
        try {
            ResultCode log2 = this.scribeClient.log(arrayList);
            this.messagesSuccessfullySent.addAndGet(arrayList.size());
            this.messagesSuccessfullySentSinceLastReconnection.addAndGet(arrayList.size());
            if (this.messagesSuccessfullySentSinceLastReconnection.get() > this.messagesToSendBeforeReconnecting) {
                log.info("Recycling connection with Scribe");
                this.messagesSuccessfullySentSinceLastReconnection.set(0);
                createConnection();
            }
            return log2 == ResultCode.OK;
        } catch (TException e) {
            log.warn(String.format("Error while sending message to Scribe: %s", e.getLocalizedMessage()));
            createConnection();
            throw new IOException(e);
        }
    }

    @Managed(description = "Get the number of messages successfully sent since startup to Scribe")
    public long getMessagesSuccessfullySent() {
        return this.messagesSuccessfullySent.get();
    }

    @Managed(description = "Get the number of messages successfully sent since last reconnection to Scribe")
    public long getMessagesSuccessfullySentSinceLastReconnection() {
        return this.messagesSuccessfullySentSinceLastReconnection.get();
    }

    @Managed(description = "Get the number of times we retried to connect to Scribe")
    public long getConnectionRetries() {
        return this.connectionRetries.get();
    }
}
