package net.osomahe.esk.eventstore.boundary;

import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.PreDestroy;
import javax.ejb.Stateless;
import javax.inject.Inject;
import net.osomahe.esk.eventstore.control.TopicService;
import net.osomahe.esk.eventstore.entity.EventGroupKey;
import net.osomahe.esk.eventstore.entity.EventKey;
import net.osomahe.esk.eventstore.entity.EventNotPublishedException;
import net.osomahe.esk.eventstore.entity.EventStoreEvent;
import net.osomahe.esk.eventstore.entity.EventStoreException;
import net.osomahe.esk.eventstore.entity.LoggableEvent;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

@Stateless
/* loaded from: input_file:net/osomahe/esk/eventstore/boundary/EventStorePublisher.class */
public class EventStorePublisher {
    private static final Logger logger = Logger.getLogger(EventStorePublisher.class.getName());

    @Inject
    private KafkaProducer<String, EventStoreEvent> kafkaProducer;

    @Inject
    private TopicService topicService;

    public <T extends EventStoreEvent> RecordMetadata publish(T t) {
        try {
            RecordMetadata recordMetadata = publishAsync(t).get();
            if (recordMetadata == null) {
                throw new EventNotPublishedException(t);
            }
            return recordMetadata;
        } catch (InterruptedException | ExecutionException e) {
            throw new EventStoreException("Cannot publish the event: " + t, e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T extends EventStoreEvent> CompletableFuture<RecordMetadata> publishAsync(T t) {
        ProducerRecord producerRecord = new ProducerRecord(this.topicService.getTopicName(t.getClass()), getPartition(t), getEventKey(t), t);
        if (t.getClass().isAnnotationPresent(LoggableEvent.class)) {
            logger.fine(String.format("EventStoreEvent (%s) publishing %s", t.getClass().getSimpleName(), producerRecord));
        }
        return CompletableFuture.supplyAsync(() -> {
            try {
                return (RecordMetadata) this.kafkaProducer.send(producerRecord).get();
            } catch (InterruptedException | ExecutionException e) {
                throw new EventStoreException("Cannot publish the event: " + t, e);
            }
        }).exceptionally(th -> {
            logger.log(Level.SEVERE, "EventStoreEvent was NOT published", th);
            return null;
        });
    }

    private <T extends EventStoreEvent> String getEventKey(T t) {
        Object valueForAnnotation = getValueForAnnotation(t, t.getClass(), EventKey.class);
        return valueForAnnotation != null ? valueForAnnotation.toString() : System.currentTimeMillis() + "-" + UUID.randomUUID().toString();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T extends EventStoreEvent> Integer getPartition(T t) {
        Object groupKeyValue = getGroupKeyValue(t);
        if (groupKeyValue == null) {
            return null;
        }
        return Integer.valueOf(Math.abs(groupKeyValue.hashCode()) % this.topicService.getPartitionCount((Class<? extends EventStoreEvent>) t.getClass()));
    }

    private <T extends EventStoreEvent> Object getGroupKeyValue(T t) {
        return getValueForAnnotation(t, t.getClass(), EventGroupKey.class);
    }

    private <T extends EventStoreEvent> Object getValueForAnnotation(T t, Class<?> cls, Class<? extends Annotation> cls2) {
        Object valueFromField = getValueFromField(t, cls, cls2);
        if (valueFromField != null) {
            return valueFromField;
        }
        Object valueFromMethod = getValueFromMethod(t, cls, cls2);
        if (valueFromMethod != null) {
            return valueFromMethod;
        }
        if (cls.getSuperclass() != null) {
            return getValueForAnnotation(t, cls.getSuperclass(), cls2);
        }
        return null;
    }

    private <T extends EventStoreEvent> Object getValueFromField(T t, Class<?> cls, Class<? extends Annotation> cls2) {
        for (Field field : cls.getDeclaredFields()) {
            if (field.isAnnotationPresent(cls2)) {
                try {
                    field.setAccessible(true);
                    return field.get(t);
                } catch (IllegalAccessException e) {
                    logger.log(Level.SEVERE, "Cannot get field value of " + cls2 + " for event: " + t, (Throwable) e);
                }
            }
        }
        return null;
    }

    private <T extends EventStoreEvent> Object getValueFromMethod(T t, Class<?> cls, Class<? extends Annotation> cls2) {
        for (Method method : cls.getDeclaredMethods()) {
            if (method.isAnnotationPresent(cls2)) {
                try {
                    if (!method.isAccessible()) {
                        method.setAccessible(true);
                    }
                    return method.invoke(t, new Object[0]);
                } catch (IllegalAccessException | InvocationTargetException e) {
                    logger.log(Level.SEVERE, "Cannot get method value of " + cls2 + " for event: " + t, e);
                }
            }
        }
        return null;
    }

    @PreDestroy
    public void destroy() {
        this.kafkaProducer.close(5L, TimeUnit.SECONDS);
    }
}
