package com.ning.metrics.collector.realtime;

import com.google.inject.Inject;
import com.ning.metrics.collector.binder.config.CollectorConfig;
import com.ning.metrics.collector.util.NamedThreadFactory;
import com.ning.metrics.goodwill.access.CachingGoodwillAccessor;
import com.ning.metrics.goodwill.access.GoodwillSchema;
import com.ning.metrics.goodwill.access.GoodwillSchemaField;
import com.ning.metrics.serialization.event.Event;
import com.ning.metrics.serialization.schema.SchemaFieldType;
import com.ning.metrics.serialization.thrift.item.DataItem;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ObjectNode;
import org.weakref.jmx.Managed;

/* loaded from: input_file:com/ning/metrics/collector/realtime/EventQueueProcessorImpl.class */
public class EventQueueProcessorImpl implements EventQueueProcessor {
    private final CollectorConfig config;
    private final EventQueueConnection connection;
    private final GlobalEventQueueStats stats;
    private CachingGoodwillAccessor goodwillAccessor;
    private final ObjectMapper jsonMapper = new ObjectMapper();
    private final Logger log = Logger.getLogger(EventQueueProcessorImpl.class);
    private final AtomicBoolean enabled = new AtomicBoolean(false);
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private final Map<String, LocalQueueAndWorkers> queuesPerCategory = new HashMap();
    private final Object queueMapMonitor = new Object();
    private final AtomicReference<Set<String>> typesToCollect = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.ning.metrics.collector.realtime.EventQueueProcessorImpl$2, reason: invalid class name */
    /* loaded from: input_file:com/ning/metrics/collector/realtime/EventQueueProcessorImpl$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$com$ning$metrics$serialization$schema$SchemaFieldType = new int[SchemaFieldType.values().length];

        static {
            try {
                $SwitchMap$com$ning$metrics$serialization$schema$SchemaFieldType[SchemaFieldType.BOOLEAN.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$ning$metrics$serialization$schema$SchemaFieldType[SchemaFieldType.BYTE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$ning$metrics$serialization$schema$SchemaFieldType[SchemaFieldType.SHORT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$ning$metrics$serialization$schema$SchemaFieldType[SchemaFieldType.INTEGER.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$ning$metrics$serialization$schema$SchemaFieldType[SchemaFieldType.LONG.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$ning$metrics$serialization$schema$SchemaFieldType[SchemaFieldType.DOUBLE.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$com$ning$metrics$serialization$schema$SchemaFieldType[SchemaFieldType.DATE.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$com$ning$metrics$serialization$schema$SchemaFieldType[SchemaFieldType.IP.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$com$ning$metrics$serialization$schema$SchemaFieldType[SchemaFieldType.STRING.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
        }
    }

    @Inject
    public EventQueueProcessorImpl(CollectorConfig collectorConfig, EventQueueConnectionFactory eventQueueConnectionFactory, GlobalEventQueueStats globalEventQueueStats) {
        this.goodwillAccessor = null;
        this.config = collectorConfig;
        this.stats = globalEventQueueStats;
        this.enabled.set(collectorConfig.isActiveMQEnabled());
        String activeMQEventsToCollect = collectorConfig.getActiveMQEventsToCollect();
        this.typesToCollect.set(activeMQEventsToCollect == null ? new HashSet() : new HashSet(Arrays.asList(activeMQEventsToCollect.split("\\s*,\\s*"))));
        if (collectorConfig.isGoodwillEnabled()) {
            this.goodwillAccessor = new CachingGoodwillAccessor(collectorConfig.getGoodwillHost(), collectorConfig.getGoodwillPort(), collectorConfig.getGoodwillCacheTimeout());
        }
        this.connection = eventQueueConnectionFactory.createConnection();
        new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("EventQueueProcessorImpl")).execute(new Runnable() { // from class: com.ning.metrics.collector.realtime.EventQueueProcessorImpl.1
            @Override // java.lang.Runnable
            public void run() {
                EventQueueProcessorImpl.this.start();
            }
        });
    }

    void start() {
        if (!this.enabled.get() || this.connection == null) {
            return;
        }
        this.connection.reconnect();
        this.isRunning.set(true);
    }

    @Override // com.ning.metrics.collector.realtime.EventQueueProcessor
    public boolean isRunning() {
        return this.isRunning.get();
    }

    @Override // com.ning.metrics.collector.realtime.EventQueueProcessor
    public void stop() {
        if (this.isRunning.getAndSet(false)) {
            Iterator<LocalQueueAndWorkers> it = this.queuesPerCategory.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.queuesPerCategory.clear();
            this.connection.close();
        }
    }

    @Override // com.ning.metrics.collector.realtime.EventQueueProcessor
    public void send(Event event) {
        if (event == null || !this.isRunning.get() || !this.typesToCollect.get().contains(event.getName())) {
            this.stats.registerEventIgnored();
            return;
        }
        String name = event.getName();
        LocalQueueAndWorkers localQueueAndWorkers = this.queuesPerCategory.get(name);
        if (localQueueAndWorkers == null) {
            synchronized (this.queueMapMonitor) {
                localQueueAndWorkers = this.queuesPerCategory.get(name);
                if (localQueueAndWorkers == null) {
                    localQueueAndWorkers = new LocalQueueAndWorkers(this.config, name, this.connection.getSessionFor(name), this.stats);
                    this.queuesPerCategory.put(name, localQueueAndWorkers);
                }
            }
        }
        localQueueAndWorkers.offer(getMessageForActiveMQ(event));
    }

    private Object getMessageForActiveMQ(Event event) {
        GoodwillSchema schema;
        String str = null;
        if (this.goodwillAccessor != null && (schema = this.goodwillAccessor.getSchema(event.getName())) != null) {
            str = eventToJson(event, schema);
        }
        if (str == null) {
            str = event.getData().toString();
        }
        return str;
    }

    /* JADX WARN: Code restructure failed: missing block: B:27:0x0041, code lost:
    
        return r8.jsonMapper.writeValueAsString(r9.getData());
     */
    /* JADX WARN: Code restructure failed: missing block: B:29:0x0042, code lost:
    
        r11 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:30:0x0043, code lost:
    
        r8.log.debug("Got IOException trying to serialize stream", r11);
     */
    /* JADX WARN: Code restructure failed: missing block: B:33:0x0019, code lost:
    
        return r8.jsonMapper.writeValueAsString(r9.getData());
     */
    /* JADX WARN: Code restructure failed: missing block: B:35:0x001a, code lost:
    
        r11 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:36:0x001b, code lost:
    
        r8.log.debug("Got IOException trying to serialize JsonNode", r11);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private java.lang.String eventToJson(com.ning.metrics.serialization.event.Event r9, com.ning.metrics.goodwill.access.GoodwillSchema r10) {
        /*
            Method dump skipped, instructions count: 235
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.ning.metrics.collector.realtime.EventQueueProcessorImpl.eventToJson(com.ning.metrics.serialization.event.Event, com.ning.metrics.goodwill.access.GoodwillSchema):java.lang.String");
    }

    private void addToRoot(ObjectNode objectNode, DataItem dataItem, GoodwillSchemaField goodwillSchemaField) {
        switch (AnonymousClass2.$SwitchMap$com$ning$metrics$serialization$schema$SchemaFieldType[goodwillSchemaField.getType().ordinal()]) {
            case 1:
                objectNode.put(goodwillSchemaField.getName(), dataItem.getBoolean().booleanValue());
                return;
            case 2:
                objectNode.put(goodwillSchemaField.getName(), dataItem.getByte().byteValue());
                return;
            case 3:
            case 4:
                objectNode.put(goodwillSchemaField.getName(), dataItem.getInteger().intValue());
                return;
            case 5:
                objectNode.put(goodwillSchemaField.getName(), dataItem.getLong().longValue());
                return;
            case 6:
                objectNode.put(goodwillSchemaField.getName(), dataItem.getDouble().doubleValue());
                return;
            case 7:
            case 8:
            case 9:
            default:
                objectNode.put(goodwillSchemaField.getName(), dataItem.getString());
                return;
        }
    }

    @Managed(description = "whether forwarding events to the queue is enabled")
    public boolean isEnabled() {
        return this.enabled.get();
    }

    @Managed(description = "forwards events to the queue")
    public void enable() {
        if (this.enabled.getAndSet(true)) {
            return;
        }
        start();
    }

    @Managed(description = "disable forwarding of events to the queue")
    public void disable() {
        if (this.enabled.getAndSet(false)) {
            stop();
        }
    }

    @Managed(description = "add event type to collect")
    public void addTypeToCollect(String str) {
        Set<String> set = this.typesToCollect.get();
        if (set.add(str)) {
            this.typesToCollect.set(set);
            this.log.info(String.format("Added event type '%s' to list of events to send to the queue", str));
        }
    }

    @Managed(description = "remove event type to collect")
    public void removeTypeToCollect(String str) {
        Set<String> set = this.typesToCollect.get();
        if (set.remove(str)) {
            this.typesToCollect.set(set);
            this.log.info(String.format("Removed event type '%s' from list of events to send to the queue", str));
        }
    }

    @Managed(description = "list event types allowed to collect")
    public String getTypesToCollect() {
        return this.typesToCollect.get().toString();
    }
}
