package org.apache.james.queue.rabbitmq.view.cassandra;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import java.util.Date;
import javax.inject.Inject;
import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.backends.cassandra.utils.CassandraUtils;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.james.queue.rabbitmq.EnqueuedItem;
import org.apache.james.queue.rabbitmq.MailQueueName;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule;
import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices;
import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
import org.apache.mailet.Mail;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.class */
public class EnqueuedMailsDAO {
    private final CassandraAsyncExecutor executor;
    private final PreparedStatement selectFrom;
    private final PreparedStatement insert;
    private final CassandraUtils cassandraUtils;
    private final CassandraTypesProvider cassandraTypesProvider;
    private final BlobId.Factory blobFactory;

    @Inject
    EnqueuedMailsDAO(Session session, CassandraUtils cassandraUtils, CassandraTypesProvider cassandraTypesProvider, BlobId.Factory factory) {
        this.executor = new CassandraAsyncExecutor(session);
        this.cassandraUtils = cassandraUtils;
        this.cassandraTypesProvider = cassandraTypesProvider;
        this.selectFrom = prepareSelectFrom(session);
        this.insert = prepareInsert(session);
        this.blobFactory = factory;
    }

    private PreparedStatement prepareSelectFrom(Session session) {
        return session.prepare(QueryBuilder.select().from(CassandraMailQueueViewModule.EnqueuedMailsTable.TABLE_NAME).where(QueryBuilder.eq("queueName", QueryBuilder.bindMarker("queueName"))).and(QueryBuilder.eq(CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START))).and(QueryBuilder.eq(CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID))));
    }

    private PreparedStatement prepareInsert(Session session) {
        return session.prepare(QueryBuilder.insertInto(CassandraMailQueueViewModule.EnqueuedMailsTable.TABLE_NAME).value("queueName", QueryBuilder.bindMarker("queueName")).value(CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID)).value("mailKey", QueryBuilder.bindMarker("mailKey")).value(CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_BLOB_ID, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_BLOB_ID)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.BODY_BLOB_ID, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.BODY_BLOB_ID)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.ENQUEUED_TIME, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.ENQUEUED_TIME)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.STATE, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.STATE)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.SENDER, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.SENDER)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.RECIPIENTS, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.RECIPIENTS)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.ATTRIBUTES, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.ATTRIBUTES)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.ERROR_MESSAGE, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.ERROR_MESSAGE)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_ADDR, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_ADDR)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_HOST, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_HOST)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.LAST_UPDATED, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.LAST_UPDATED)).value(CassandraMailQueueViewModule.EnqueuedMailsTable.PER_RECIPIENT_SPECIFIC_HEADERS, QueryBuilder.bindMarker(CassandraMailQueueViewModule.EnqueuedMailsTable.PER_RECIPIENT_SPECIFIC_HEADERS)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> insert(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
        EnqueuedItem enqueuedItem = enqueuedItemWithSlicingContext.getEnqueuedItem();
        EnqueuedItemWithSlicingContext.SlicingContext slicingContext = enqueuedItemWithSlicingContext.getSlicingContext();
        Mail mail = enqueuedItem.getMail();
        MimeMessagePartsId partsId = enqueuedItem.getPartsId();
        return Mono.fromCompletionStage(this.executor.executeVoid(this.insert.bind().setString("queueName", enqueuedItem.getMailQueueName().asString()).setTimestamp(CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START, Date.from(slicingContext.getTimeRangeStart())).setInt(CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID, slicingContext.getBucketId().getValue()).setTimestamp(CassandraMailQueueViewModule.EnqueuedMailsTable.ENQUEUED_TIME, Date.from(enqueuedItem.getEnqueuedTime())).setString("mailKey", mail.getName()).setString(CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_BLOB_ID, partsId.getHeaderBlobId().asString()).setString(CassandraMailQueueViewModule.EnqueuedMailsTable.BODY_BLOB_ID, partsId.getBodyBlobId().asString()).setString(CassandraMailQueueViewModule.EnqueuedMailsTable.STATE, mail.getState()).setString(CassandraMailQueueViewModule.EnqueuedMailsTable.SENDER, mail.getMaybeSender().asString((String) null)).setList(CassandraMailQueueViewModule.EnqueuedMailsTable.RECIPIENTS, EnqueuedMailsDaoUtil.asStringList(mail.getRecipients())).setString(CassandraMailQueueViewModule.EnqueuedMailsTable.ERROR_MESSAGE, mail.getErrorMessage()).setString(CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_ADDR, mail.getRemoteAddr()).setString(CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_HOST, mail.getRemoteHost()).setTimestamp(CassandraMailQueueViewModule.EnqueuedMailsTable.LAST_UPDATED, mail.getLastUpdated()).setMap(CassandraMailQueueViewModule.EnqueuedMailsTable.ATTRIBUTES, EnqueuedMailsDaoUtil.toRawAttributeMap(mail)).setMap(CassandraMailQueueViewModule.EnqueuedMailsTable.PER_RECIPIENT_SPECIFIC_HEADERS, EnqueuedMailsDaoUtil.toHeaderMap(this.cassandraTypesProvider, mail.getPerRecipientSpecificHeaders()))));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<EnqueuedItemWithSlicingContext> selectEnqueuedMails(MailQueueName mailQueueName, BucketedSlices.Slice slice, BucketedSlices.BucketId bucketId) {
        Mono fromCompletionStage = Mono.fromCompletionStage(this.executor.execute(this.selectFrom.bind().setString("queueName", mailQueueName.asString()).setTimestamp(CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START, Date.from(slice.getStartSliceInstant())).setInt(CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID, bucketId.getValue())));
        CassandraUtils cassandraUtils = this.cassandraUtils;
        cassandraUtils.getClass();
        return fromCompletionStage.map(cassandraUtils::convertToStream).flatMapMany(Flux::fromStream).map(row -> {
            return EnqueuedMailsDaoUtil.toEnqueuedMail(row, this.blobFactory);
        });
    }
}
