package io.quarkus.smallrye.reactivemessaging.kafka;

import io.quarkus.hibernate.orm.PersistenceUnit;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;
import io.smallrye.reactive.messaging.kafka.commit.ProcessingState;
import io.vertx.mutiny.core.Vertx;
import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import org.apache.kafka.common.TopicPartition;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.Transaction;

/* loaded from: input_file:io/quarkus/smallrye/reactivemessaging/kafka/HibernateOrmStateStore.class */
public class HibernateOrmStateStore implements CheckpointStateStore {
    public static final String QUARKUS_HIBERNATE_ORM = "quarkus-hibernate-orm";
    private final String consumerGroupId;
    private final SessionFactory sf;
    private final Class<? extends CheckpointEntity> stateType;

    @Identifier(HibernateOrmStateStore.QUARKUS_HIBERNATE_ORM)
    @ApplicationScoped
    /* loaded from: input_file:io/quarkus/smallrye/reactivemessaging/kafka/HibernateOrmStateStore$Factory.class */
    public static class Factory implements CheckpointStateStore.Factory {

        @Inject
        @Any
        Instance<SessionFactory> sessionFactories;

        public CheckpointStateStore create(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Vertx vertx, KafkaConsumer<?, ?> kafkaConsumer, Class<?> cls) {
            String str = (String) kafkaConsumer.configuration().get("group.id");
            if (!CheckpointEntity.class.isAssignableFrom(cls)) {
                throw new IllegalArgumentException("State type needs to extend `CheckpointEntity`");
            }
            String str2 = (String) kafkaConnectorIncomingConfiguration.config().getOptionalValue("checkpoint.quarkus-hibernate-orm.persistence-unit", String.class).orElse(null);
            return new HibernateOrmStateStore(str, str2 != null ? (SessionFactory) this.sessionFactories.select(new Annotation[]{new PersistenceUnit.PersistenceUnitLiteral(str2)}).get() : (SessionFactory) this.sessionFactories.get(), cls);
        }
    }

    public HibernateOrmStateStore(String str, SessionFactory sessionFactory, Class<? extends CheckpointEntity> cls) {
        this.consumerGroupId = str;
        this.sf = sessionFactory;
        this.stateType = cls;
    }

    public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(Collection<TopicPartition> collection) {
        return Uni.createFrom().deferred(() -> {
            Object[] array = collection.stream().map(topicPartition -> {
                return new CheckpointEntityId(this.consumerGroupId, topicPartition);
            }).toArray(i -> {
                return new Object[i];
            });
            return Vertx.currentContext().executeBlocking(Uni.createFrom().item(() -> {
                ArrayList arrayList = new ArrayList();
                Session openSession = this.sf.openSession();
                try {
                    for (Object obj : array) {
                        CheckpointEntity checkpointEntity = (CheckpointEntity) openSession.find(this.stateType, obj);
                        if (checkpointEntity != null) {
                            arrayList.add(checkpointEntity);
                        }
                    }
                    if (openSession != null) {
                        openSession.close();
                    }
                    return (Map) arrayList.stream().filter(checkpointEntity2 -> {
                        return (checkpointEntity2 == null || CheckpointEntity.topicPartition(checkpointEntity2) == null) ? false : true;
                    }).collect(Collectors.toMap(CheckpointEntity::topicPartition, checkpointEntity3 -> {
                        return new ProcessingState(checkpointEntity3, checkpointEntity3.offset.longValue());
                    }));
                } catch (Throwable th) {
                    if (openSession != null) {
                        try {
                            openSession.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }));
        });
    }

    public Uni<Void> persistProcessingState(Map<TopicPartition, ProcessingState<?>> map) {
        return Uni.createFrom().deferred(() -> {
            Object[] array = map.entrySet().stream().filter(entry -> {
                return !ProcessingState.isEmptyOrNull((ProcessingState) entry.getValue());
            }).map(entry2 -> {
                return CheckpointEntity.from((ProcessingState) entry2.getValue(), new CheckpointEntityId(this.consumerGroupId, (TopicPartition) entry2.getKey()));
            }).toArray();
            return Vertx.currentContext().executeBlocking(Uni.createFrom().emitter(uniEmitter -> {
                Transaction transaction = null;
                try {
                    Session openSession = this.sf.openSession();
                    try {
                        transaction = openSession.beginTransaction();
                        for (Object obj : array) {
                            openSession.merge(obj);
                        }
                        openSession.flush();
                        transaction.commit();
                        uniEmitter.complete((Object) null);
                        if (openSession != null) {
                            openSession.close();
                        }
                    } finally {
                    }
                } catch (Throwable th) {
                    if (transaction != null) {
                        transaction.rollback();
                    }
                    uniEmitter.fail(th);
                }
            }));
        });
    }
}
