package org.apache.unomi.kafka;

import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.Dictionary;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.unomi.api.Event;
import org.apache.unomi.api.services.EventService;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(name = "org.apache.unomi.kafka", immediate = true)
/* loaded from: input_file:org/apache/unomi/kafka/KafkaInjector.class */
public class KafkaInjector implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaInjector.class);
    private Dictionary<String, Object> properties;
    private KafkaConsumer<String, String> consumer;
    private String topic;
    private String messageType;
    private boolean consuming = false;
    private ObjectMapper objectMapper;

    @Reference
    private EventService eventService;

    @Activate
    public void activate(ComponentContext componentContext) {
        this.objectMapper = new ObjectMapper();
        this.properties = componentContext.getProperties();
        this.topic = getValue(this.properties, "topic", "unomi");
        this.messageType = getValue(this.properties, "message.type", "text");
        Properties properties = new Properties();
        properties.put("bootstrap.servers", getValue(this.properties, "bootstrap.servers", "localhost:9092"));
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, getValue(this.properties, ConsumerConfig.GROUP_ID_CONFIG, "unomi"));
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, getValue(this.properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"));
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, getValue(this.properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"));
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getValue(this.properties, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"));
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, getValue(this.properties, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"));
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getValue(this.properties, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ValueDeserializer"));
        String value = getValue(this.properties, "security.protocol", null);
        if (value != null) {
            properties.put("security.protocol", value);
        }
        String value2 = getValue(this.properties, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, null);
        if (value2 != null) {
            properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, value2);
        }
        String value3 = getValue(this.properties, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, null);
        if (value3 != null) {
            properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, value3);
        }
        String value4 = getValue(this.properties, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, null);
        if (value4 != null) {
            properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, value4);
        }
        String value5 = getValue(this.properties, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, null);
        if (value5 != null) {
            properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, value5);
        }
        String value6 = getValue(this.properties, SslConfigs.SSL_KEY_PASSWORD_CONFIG, null);
        if (value6 != null) {
            properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, value6);
        }
        String value7 = getValue(this.properties, SslConfigs.SSL_PROVIDER_CONFIG, null);
        if (value7 != null) {
            properties.put(SslConfigs.SSL_PROVIDER_CONFIG, value7);
        }
        String value8 = getValue(this.properties, SslConfigs.SSL_CIPHER_SUITES_CONFIG, null);
        if (value8 != null) {
            properties.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, value8);
        }
        String value9 = getValue(this.properties, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, null);
        if (value9 != null) {
            properties.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, value9);
        }
        String value10 = getValue(this.properties, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, null);
        if (value10 != null) {
            properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, value10);
        }
        String value11 = getValue(this.properties, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, null);
        if (value11 != null) {
            properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, value11);
        }
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(null);
            this.consumer = new KafkaConsumer<>(properties);
            this.consumer.subscribe(Arrays.asList(this.topic));
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            this.consuming = true;
            Executors.newSingleThreadExecutor().execute(this);
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.consuming) {
            try {
                consume();
            } catch (Exception e) {
                LOGGER.warn(e.getMessage(), e);
            }
        }
    }

    private void consume() throws UnsupportedEncodingException, IOException, JsonMappingException {
        ConsumerRecords<String, String> poll = this.consumer.poll(10000L);
        if (poll.isEmpty()) {
            return;
        }
        Event event = null;
        Iterator<ConsumerRecord<String, String>> it = poll.iterator();
        while (it.hasNext()) {
            String value = it.next().value();
            if (this.messageType.equalsIgnoreCase("text")) {
                event = (Event) this.objectMapper.readValue(value, Event.class);
            }
        }
        if (event != null) {
            this.eventService.send(event);
        }
    }

    private String getValue(Dictionary<String, Object> dictionary, String str, String str2) {
        String str3 = (String) dictionary.get(str);
        return str3 != null ? str3 : str2;
    }
}
