package org.apache.james.eventsourcing.eventstore.cassandra;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.steveash.guavate.Guavate;
import java.io.IOException;
import java.util.List;
import javax.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.backends.cassandra.utils.CassandraUtils;
import org.apache.james.eventsourcing.AggregateId;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.eventstore.History;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.class */
public class EventStoreDao {
    private final CassandraUtils cassandraUtils;
    private final CassandraAsyncExecutor cassandraAsyncExecutor;
    private final PreparedStatement insert;
    private final PreparedStatement select;
    private final JsonEventSerializer jsonEventSerializer;

    @Inject
    public EventStoreDao(Session session, CassandraUtils cassandraUtils, JsonEventSerializer jsonEventSerializer) {
        this.cassandraUtils = cassandraUtils;
        this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
        this.jsonEventSerializer = jsonEventSerializer;
        this.insert = prepareInsert(session);
        this.select = prepareSelect(session);
    }

    private PreparedStatement prepareInsert(Session session) {
        return session.prepare(QueryBuilder.insertInto(CassandraEventStoreTable.EVENTS_TABLE).value(CassandraEventStoreTable.AGGREGATE_ID, QueryBuilder.bindMarker(CassandraEventStoreTable.AGGREGATE_ID)).value(CassandraEventStoreTable.EVENT_ID, QueryBuilder.bindMarker(CassandraEventStoreTable.EVENT_ID)).value(CassandraEventStoreTable.EVENT, QueryBuilder.bindMarker(CassandraEventStoreTable.EVENT)).ifNotExists());
    }

    private PreparedStatement prepareSelect(Session session) {
        return session.prepare(QueryBuilder.select().from(CassandraEventStoreTable.EVENTS_TABLE).where(QueryBuilder.eq(CassandraEventStoreTable.AGGREGATE_ID, QueryBuilder.bindMarker(CassandraEventStoreTable.AGGREGATE_ID))));
    }

    public Mono<Boolean> appendAll(List<Event> list) {
        BatchStatement batchStatement = new BatchStatement();
        list.forEach(event -> {
            batchStatement.add(insertEvent(event));
        });
        return this.cassandraAsyncExecutor.executeReturnApplied(batchStatement);
    }

    private BoundStatement insertEvent(Event event) {
        try {
            return this.insert.bind().setString(CassandraEventStoreTable.AGGREGATE_ID, event.getAggregateId().asAggregateKey()).setInt(CassandraEventStoreTable.EVENT_ID, event.eventId().serialize()).setString(CassandraEventStoreTable.EVENT, this.jsonEventSerializer.serialize(event));
        } catch (JsonProcessingException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public History getEventsOfAggregate(AggregateId aggregateId) {
        return toHistory((ResultSet) this.cassandraAsyncExecutor.execute(this.select.bind().setString(CassandraEventStoreTable.AGGREGATE_ID, aggregateId.asAggregateKey())).join());
    }

    private History toHistory(ResultSet resultSet) {
        return History.of((List) this.cassandraUtils.convertToStream(resultSet).map(this::toEvent).collect(Guavate.toImmutableList()));
    }

    private Event toEvent(Row row) {
        try {
            return this.jsonEventSerializer.deserialize(row.getString(CassandraEventStoreTable.EVENT));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
