package org.apache.streams.monitoring.tasks;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.Lists;
import java.lang.management.ManagementFactory;
import java.net.URI;
import java.util.ArrayList;
import java.util.Map;
import javax.management.MBeanServer;
import javax.management.NotificationBroadcasterSupport;
import javax.management.ObjectName;
import javax.management.QueryExp;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.jackson.DatumStatusCounterDeserializer;
import org.apache.streams.jackson.MemoryUsageDeserializer;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.jackson.StreamsTaskCounterDeserializer;
import org.apache.streams.jackson.ThroughputQueueDeserializer;
import org.apache.streams.local.monitoring.MonitoringConfiguration;
import org.apache.streams.monitoring.persist.MessagePersister;
import org.apache.streams.monitoring.persist.impl.BroadcastMessagePersister;
import org.apache.streams.monitoring.persist.impl.LogstashUdpMessagePersister;
import org.apache.streams.monitoring.persist.impl.SLF4JMessagePersister;
import org.apache.streams.pojo.json.Broadcast;
import org.apache.streams.pojo.json.DatumStatusCounterBroadcast;
import org.apache.streams.pojo.json.MemoryUsageBroadcast;
import org.apache.streams.pojo.json.StreamsTaskCounterBroadcast;
import org.apache.streams.pojo.json.ThroughputQueueBroadcast;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/monitoring/tasks/BroadcastMonitorThread.class */
public class BroadcastMonitorThread extends NotificationBroadcasterSupport implements Runnable {
    private static MBeanServer server;
    private MonitoringConfiguration configuration;
    private URI broadcastURI;
    private MessagePersister messagePersister;
    private volatile boolean keepRunning;
    private static final Logger LOGGER = LoggerFactory.getLogger(BroadcastMonitorThread.class);
    private static ObjectMapper objectMapper = StreamsJacksonMapper.getInstance();

    @Deprecated
    public BroadcastMonitorThread(Map<String, Object> map) {
        this((MonitoringConfiguration) objectMapper.convertValue(map, MonitoringConfiguration.class));
    }

    public BroadcastMonitorThread(StreamsConfiguration streamsConfiguration) {
        this((MonitoringConfiguration) objectMapper.convertValue(streamsConfiguration.getAdditionalProperties().get("monitoring"), MonitoringConfiguration.class));
    }

    public BroadcastMonitorThread(MonitoringConfiguration monitoringConfiguration) {
        this.broadcastURI = null;
        this.configuration = monitoringConfiguration;
        if (this.configuration == null) {
            this.configuration = (MonitoringConfiguration) new ComponentConfigurator(MonitoringConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().atPath("monitoring"));
        }
        LOGGER.info("BroadcastMonitorThread created");
        initializeObjectMapper();
        prepare();
        LOGGER.info("BroadcastMonitorThread initialized");
    }

    private void initializeObjectMapper() {
        SimpleModule simpleModule = new SimpleModule();
        simpleModule.addDeserializer(MemoryUsageBroadcast.class, new MemoryUsageDeserializer());
        simpleModule.addDeserializer(ThroughputQueueBroadcast.class, new ThroughputQueueDeserializer());
        simpleModule.addDeserializer(StreamsTaskCounterBroadcast.class, new StreamsTaskCounterDeserializer());
        simpleModule.addDeserializer(DatumStatusCounterBroadcast.class, new DatumStatusCounterDeserializer());
        objectMapper.registerModule(simpleModule);
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    }

    @Override // java.lang.Runnable
    public void run() {
        LOGGER.info("BroadcastMonitorThread running");
        while (this.keepRunning) {
            try {
                ArrayList newArrayList = Lists.newArrayList();
                for (ObjectName objectName : server.queryNames((ObjectName) null, (QueryExp) null)) {
                    String writeValueAsString = objectMapper.writeValueAsString(objectName);
                    Broadcast broadcast = null;
                    if (objectName.getKeyPropertyList().get("type") != null) {
                        if (((String) objectName.getKeyPropertyList().get("type")).equals("ThroughputQueue")) {
                            broadcast = (Broadcast) objectMapper.readValue(writeValueAsString, ThroughputQueueBroadcast.class);
                        } else if (((String) objectName.getKeyPropertyList().get("type")).equals("StreamsTaskCounter")) {
                            broadcast = (Broadcast) objectMapper.readValue(writeValueAsString, StreamsTaskCounterBroadcast.class);
                        } else if (((String) objectName.getKeyPropertyList().get("type")).equals("DatumStatusCounter")) {
                            broadcast = (Broadcast) objectMapper.readValue(writeValueAsString, DatumStatusCounterBroadcast.class);
                        } else if (((String) objectName.getKeyPropertyList().get("type")).equals("Memory")) {
                            broadcast = (Broadcast) objectMapper.readValue(writeValueAsString, MemoryUsageBroadcast.class);
                        }
                        if (broadcast != null) {
                            newArrayList.add(objectMapper.writeValueAsString(broadcast));
                        }
                    }
                }
                this.messagePersister.persistMessages(newArrayList);
                Thread.sleep(this.configuration.getMonitoringBroadcastIntervalMs().longValue());
            } catch (InterruptedException e) {
                LOGGER.debug("Broadcast Monitor Interrupted!");
                Thread.currentThread().interrupt();
                this.keepRunning = false;
            } catch (Exception e2) {
                LOGGER.error("Exception: {}", e2);
                this.keepRunning = false;
            }
        }
    }

    public void prepare() {
        this.keepRunning = true;
        LOGGER.info("BroadcastMonitorThread setup " + this.configuration);
        server = ManagementFactory.getPlatformMBeanServer();
        if (this.configuration == null || this.configuration.getBroadcastURI() == null) {
            this.messagePersister = new SLF4JMessagePersister();
            return;
        }
        try {
            this.broadcastURI = new URI(this.configuration.getBroadcastURI());
        } catch (Exception e) {
            LOGGER.error("invalid URI: ", e);
        }
        if (this.broadcastURI == null) {
            this.messagePersister = new SLF4JMessagePersister();
            return;
        }
        if (this.broadcastURI.getScheme().equals("http")) {
            this.messagePersister = new BroadcastMessagePersister(this.broadcastURI.toString());
        } else if (this.broadcastURI.getScheme().equals("udp")) {
            this.messagePersister = new LogstashUdpMessagePersister(this.broadcastURI.toString());
        } else {
            LOGGER.error("You need to specify a broadcast URI with either a HTTP or UDP protocol defined.");
            throw new RuntimeException();
        }
    }

    public void shutdown() {
        this.keepRunning = false;
        LOGGER.debug("Shutting down BroadcastMonitor Thread");
    }

    public String getBroadcastURI() {
        return this.configuration.getBroadcastURI();
    }

    public long getWaitTime() {
        return this.configuration.getMonitoringBroadcastIntervalMs().longValue();
    }
}
