package net.osomahe.esk.eventstore.control;

import java.time.ZonedDateTime;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.ejb.Schedule;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.enterprise.concurrent.ManagedScheduledExecutorService;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import javax.json.JsonObject;
import javax.json.bind.Jsonb;
import javax.json.bind.JsonbBuilder;
import net.osomahe.esk.config.boundary.ConfigurationBoundary;
import net.osomahe.esk.eventstore.entity.AsyncEvent;
import net.osomahe.esk.eventstore.entity.EventExpirationSecs;
import net.osomahe.esk.eventstore.entity.EventName;
import net.osomahe.esk.eventstore.entity.EventStoreEvent;
import net.osomahe.esk.eventstore.entity.LoggableEvent;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

@Singleton
@Startup
/* loaded from: input_file:net/osomahe/esk/eventstore/control/EventStoreSubscriber.class */
public class EventStoreSubscriber {
    private static final Logger logger = Logger.getLogger(EventStoreSubscriber.class.getName());

    @Inject
    private ConfigurationBoundary config;

    @Inject
    private EventSubscriptionDataStore eventDataStore;

    @Inject
    private TopicService topicService;

    @Inject
    private Event<EventStoreEvent> events;

    @Resource
    private ManagedScheduledExecutorService mses;
    private Map<String, Map<String, Class<? extends EventStoreEvent>>> mapTopics = new ConcurrentHashMap();
    private Map<Class<? extends EventStoreEvent>, Long> mapExpiration = new ConcurrentHashMap();
    private Jsonb jsonb;
    private KafkaConsumer<String, JsonObject> consumer;
    private ScheduledFuture<?> sfConsumerPoll;
    private String applicationName;

    @PostConstruct
    public void init() {
        this.jsonb = JsonbBuilder.create();
        this.eventDataStore.getEventClasses().forEach(this::subscribeForTopic);
        Properties kafkaConsumerConfig = this.config.getKafkaConsumerConfig();
        this.applicationName = kafkaConsumerConfig.getProperty("group.id");
        logger.info(String.format("Subscribing as %s for topics %s", this.applicationName, this.mapTopics));
        if (this.mapTopics.size() > 0) {
            this.consumer = new KafkaConsumer<>(kafkaConsumerConfig, new StringDeserializer(), new EventDeserializer());
            this.consumer.subscribe(this.mapTopics.keySet());
            this.sfConsumerPoll = this.mses.scheduleAtFixedRate(this::pollMessages, 1000L, 200L, TimeUnit.MILLISECONDS);
        }
    }

    @Schedule(hour = "*", minute = "*", persistent = false)
    public void checkLiveness() {
        if (this.consumer != null) {
            if (this.sfConsumerPoll.isCancelled() || this.sfConsumerPoll.isDone()) {
                logger.warning(String.format("KafkaConsumer polling has to be restarted for %s ", this.applicationName));
                this.sfConsumerPoll = this.mses.scheduleAtFixedRate(this::pollMessages, 1000L, 200L, TimeUnit.MILLISECONDS);
            }
        }
    }

    private void subscribeForTopic(Class<? extends EventStoreEvent> cls) {
        Long eventExpiration;
        String topicName = this.topicService.getTopicName(cls);
        if (!this.mapTopics.containsKey(topicName)) {
            this.mapTopics.put(topicName, new ConcurrentHashMap());
        }
        this.mapTopics.get(topicName).put(getEventName(cls), cls);
        if (this.mapExpiration.containsKey(cls) || (eventExpiration = getEventExpiration(cls)) == null) {
            return;
        }
        this.mapExpiration.put(cls, eventExpiration);
    }

    private Long getEventExpiration(Class<? extends EventStoreEvent> cls) {
        EventExpirationSecs eventExpirationSecs = (EventExpirationSecs) cls.getAnnotation(EventExpirationSecs.class);
        if (eventExpirationSecs != null) {
            return Long.valueOf(eventExpirationSecs.value());
        }
        return null;
    }

    private String getEventName(Class<? extends EventStoreEvent> cls) {
        EventName eventName = (EventName) cls.getAnnotation(EventName.class);
        return eventName != null ? eventName.value() : cls.getSimpleName();
    }

    private void pollMessages() {
        synchronized (this.consumer) {
            try {
                Iterator it = this.consumer.poll(100L).iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    JsonObject jsonObject = (JsonObject) consumerRecord.value();
                    String string = jsonObject.getString("name");
                    Map<String, Class<? extends EventStoreEvent>> map = this.mapTopics.get(consumerRecord.topic());
                    if (map.containsKey(string)) {
                        Class<? extends EventStoreEvent> cls = map.get(string);
                        if (isEventExpired(jsonObject, cls)) {
                            logger.fine(String.format("Skipping expired event: %s", jsonObject));
                        } else {
                            EventStoreEvent eventStoreEvent = (EventStoreEvent) this.jsonb.fromJson(jsonObject.getJsonObject("data").toString(), cls);
                            if (eventStoreEvent.getClass().isAnnotationPresent(LoggableEvent.class)) {
                                logger.fine(String.format("EventStoreEvent (%s) firing for %s %s", eventStoreEvent.getClass().getSimpleName(), this.applicationName, eventStoreEvent));
                            }
                            try {
                                if (cls.isAnnotationPresent(AsyncEvent.class)) {
                                    this.events.fireAsync(eventStoreEvent);
                                } else {
                                    this.events.fire(eventStoreEvent);
                                }
                            } catch (Exception e) {
                                logger.log(Level.SEVERE, "Error in firing polled kafka messages", (Throwable) e);
                            }
                        }
                    }
                }
            } catch (Exception e2) {
                logger.log(Level.SEVERE, "Error in polling kafka messages", (Throwable) e2);
            }
        }
    }

    private boolean isEventExpired(JsonObject jsonObject, Class<? extends EventStoreEvent> cls) {
        if (this.mapExpiration.containsKey(cls) && jsonObject.containsKey("dateTime")) {
            return ZonedDateTime.now().toEpochSecond() > jsonObject.getJsonNumber("dateTime").longValue() + this.mapExpiration.get(cls).longValue();
        }
        return false;
    }

    @PreDestroy
    public void destroy() {
        if (this.sfConsumerPoll != null) {
            synchronized (this.consumer) {
                this.sfConsumerPoll.cancel(false);
                this.consumer.close(5L, TimeUnit.SECONDS);
            }
        }
    }
}
