package net.osomahe.esk.eventstore.boundary;

import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.PreDestroy;
import javax.ejb.Stateless;
import javax.inject.Inject;
import net.osomahe.esk.eventstore.control.TopicService;
import net.osomahe.esk.eventstore.entity.AbstractEvent;
import net.osomahe.esk.eventstore.entity.EventNotPublishedException;
import net.osomahe.esk.eventstore.entity.EventStoreException;
import net.osomahe.esk.eventstore.entity.LoggableEvent;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

@Stateless
/* loaded from: input_file:net/osomahe/esk/eventstore/boundary/EventStorePublisher.class */
public class EventStorePublisher {
    private static final Logger logger = Logger.getLogger(EventStorePublisher.class.getName());

    @Inject
    private KafkaProducer<String, AbstractEvent> kafkaProducer;

    @Inject
    private TopicService topicService;

    public <T extends AbstractEvent> RecordMetadata publish(T t) {
        try {
            RecordMetadata recordMetadata = publishAsync(t).get();
            if (recordMetadata == null) {
                throw new EventNotPublishedException(t);
            }
            return recordMetadata;
        } catch (InterruptedException | ExecutionException e) {
            throw new EventStoreException("Cannot publish the event: " + t, e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T extends AbstractEvent> CompletableFuture<RecordMetadata> publishAsync(T t) {
        fillMetadata(t);
        ProducerRecord producerRecord = new ProducerRecord(this.topicService.getTopicName(t.getClass()), Integer.valueOf(getPartition(t)), t.getAggregateId(), t);
        if (t.getClass().isAnnotationPresent(LoggableEvent.class)) {
            logger.fine(String.format("Event id %s (%s) publishing %s", t.getAggregateId(), t.getClass().getSimpleName(), producerRecord));
        }
        return CompletableFuture.supplyAsync(() -> {
            try {
                return (RecordMetadata) this.kafkaProducer.send(producerRecord).get();
            } catch (InterruptedException | ExecutionException e) {
                throw new EventStoreException("Cannot publish the event: " + t, e);
            }
        }).exceptionally(th -> {
            logger.log(Level.SEVERE, "Event was NOT published", th);
            return null;
        });
    }

    private <T extends AbstractEvent> int getPartition(T t) {
        if (t.getAggregateId().contains("-")) {
            return Integer.parseInt(t.getAggregateId().substring(t.getAggregateId().lastIndexOf(45) + 1));
        }
        throw new IllegalArgumentException("Event aggregateId does NOT contain info about partition number. event: " + t);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T extends AbstractEvent> void fillMetadata(T t) {
        if (t.getAggregateId() == null) {
            int partitionCount = this.topicService.getPartitionCount((Class<? extends AbstractEvent>) t.getClass());
            String uuid = UUID.randomUUID().toString();
            t.setAggregateId(String.format("%s-%s-%s", uuid, Long.valueOf(System.currentTimeMillis()), Integer.valueOf(Math.abs(uuid.hashCode()) % partitionCount)));
        }
        if (t.getDateTime() == null) {
            t.setDateTime(ZonedDateTime.now(ZoneOffset.UTC));
        }
    }

    @PreDestroy
    public void destroy() {
        this.kafkaProducer.close(5L, TimeUnit.SECONDS);
    }
}
