package org.apache.james.queue.rabbitmq.view.cassandra;

import com.google.common.base.Preconditions;
import java.time.Clock;
import java.time.Instant;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import org.apache.james.blob.api.Store;
import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.rabbitmq.EnqueuedItem;
import org.apache.james.queue.rabbitmq.MailQueueName;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices;
import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
import org.apache.james.util.FluentFutureStream;
import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.class */
class CassandraMailQueueBrowser {
    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMailQueueBrowser.class);
    private final BrowseStartDAO browseStartDao;
    private final DeletedMailsDAO deletedMailsDao;
    private final EnqueuedMailsDAO enqueuedMailsDao;
    private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
    private final CassandraMailQueueViewConfiguration configuration;
    private final Clock clock;

    /* loaded from: input_file:org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser$CassandraMailQueueIterator.class */
    static class CassandraMailQueueIterator implements ManageableMailQueue.MailQueueIterator {
        private final Iterator<ManageableMailQueue.MailQueueItemView> iterator;

        /* JADX INFO: Access modifiers changed from: package-private */
        public CassandraMailQueueIterator(Iterator<ManageableMailQueue.MailQueueItemView> it) {
            Preconditions.checkNotNull(it);
            this.iterator = it;
        }

        public void close() {
        }

        public boolean hasNext() {
            return this.iterator.hasNext();
        }

        /* renamed from: next, reason: merged with bridge method [inline-methods] */
        public ManageableMailQueue.MailQueueItemView m6next() {
            return this.iterator.next();
        }
    }

    @Inject
    CassandraMailQueueBrowser(BrowseStartDAO browseStartDAO, DeletedMailsDAO deletedMailsDAO, EnqueuedMailsDAO enqueuedMailsDAO, Store<MimeMessage, MimeMessagePartsId> store, CassandraMailQueueViewConfiguration cassandraMailQueueViewConfiguration, Clock clock) {
        this.browseStartDao = browseStartDAO;
        this.deletedMailsDao = deletedMailsDAO;
        this.enqueuedMailsDao = enqueuedMailsDAO;
        this.mimeMessageStore = store;
        this.configuration = cassandraMailQueueViewConfiguration;
        this.clock = clock;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Stream<ManageableMailQueue.MailQueueItemView>> browse(MailQueueName mailQueueName) {
        return browseReferences(mailQueueName).map(this::toMailFuture, FluentFutureStream::unboxFuture).map(ManageableMailQueue.MailQueueItemView::new).completableFuture();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FluentFutureStream<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName mailQueueName) {
        return FluentFutureStream.of(this.browseStartDao.findBrowseStart(mailQueueName).thenApply(this::allSlicesStartingAt)).map(slice -> {
            return browseSlice(mailQueueName, slice);
        }, FluentFutureStream::unboxFluentFuture);
    }

    private CompletableFuture<Mail> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
        EnqueuedItem enqueuedItem = enqueuedItemWithSlicingContext.getEnqueuedItem();
        return this.mimeMessageStore.read(enqueuedItem.getPartsId()).thenApply(mimeMessage -> {
            return toMail(enqueuedItem, mimeMessage);
        });
    }

    private Mail toMail(EnqueuedItem enqueuedItem, MimeMessage mimeMessage) {
        Mail mail = enqueuedItem.getMail();
        try {
            mail.setMessage(mimeMessage);
        } catch (MessagingException e) {
            LOGGER.error("error while setting mime message to mail {}", mail.getName(), e);
        }
        return mail;
    }

    private FluentFutureStream<EnqueuedItemWithSlicingContext> browseSlice(MailQueueName mailQueueName, BucketedSlices.Slice slice) {
        return FluentFutureStream.of(allBucketIds().map(bucketId -> {
            return browseBucket(mailQueueName, slice, bucketId).completableFuture();
        }), FluentFutureStream::unboxStream).sorted(Comparator.comparing(enqueuedItemWithSlicingContext -> {
            return enqueuedItemWithSlicingContext.getEnqueuedItem().getEnqueuedTime();
        }));
    }

    private FluentFutureStream<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName mailQueueName, BucketedSlices.Slice slice, BucketedSlices.BucketId bucketId) {
        return FluentFutureStream.of(this.enqueuedMailsDao.selectEnqueuedMails(mailQueueName, slice, bucketId)).thenFilter(enqueuedItemWithSlicingContext -> {
            return this.deletedMailsDao.isStillEnqueued(mailQueueName, enqueuedItemWithSlicingContext.getEnqueuedItem().getMailKey());
        });
    }

    private Stream<BucketedSlices.Slice> allSlicesStartingAt(Optional<Instant> optional) {
        return (Stream) optional.map(BucketedSlices.Slice::of).map(slice -> {
            return BucketedSlices.Slice.allSlicesTill(slice, this.clock.instant(), this.configuration.getSliceWindow());
        }).orElse(Stream.empty());
    }

    private Stream<BucketedSlices.BucketId> allBucketIds() {
        return IntStream.range(0, this.configuration.getBucketCount()).mapToObj(BucketedSlices.BucketId::of);
    }
}
