package org.apache.james.queue.rabbitmq.view.cassandra;

import com.google.common.base.Preconditions;
import java.time.Clock;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
import javax.inject.Inject;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.blob.api.Store;
import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.lifecycle.api.LifecycleUtil;
import org.apache.james.mime4j.dom.Disposable;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.rabbitmq.EnqueueId;
import org.apache.james.queue.rabbitmq.EnqueuedItem;
import org.apache.james.queue.rabbitmq.MailQueueName;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices;
import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.class */
public class CassandraMailQueueBrowser {
    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMailQueueBrowser.class);
    private final BrowseStartDAO browseStartDao;
    private final DeletedMailsDAO deletedMailsDao;
    private final EnqueuedMailsDAO enqueuedMailsDao;
    private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
    private final CassandraMailQueueViewConfiguration configuration;
    private final Clock clock;

    /* loaded from: input_file:org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser$CassandraMailQueueItemView.class */
    public static class CassandraMailQueueItemView implements ManageableMailQueue.MailQueueItemView, Disposable {
        private final EnqueuedItem enqueuedItem;
        private final Mail mail;

        public CassandraMailQueueItemView(Pair<EnqueuedItem, Mail> pair) {
            this((EnqueuedItem) pair.getLeft(), (Mail) pair.getRight());
        }

        public CassandraMailQueueItemView(EnqueuedItem enqueuedItem, Mail mail) {
            this.enqueuedItem = enqueuedItem;
            this.mail = mail;
        }

        public EnqueueId getEnqueuedId() {
            return this.enqueuedItem.getEnqueueId();
        }

        public MimeMessagePartsId getEnqueuedPartsId() {
            return this.enqueuedItem.getPartsId();
        }

        public Mail getMail() {
            return this.mail;
        }

        public Optional<ZonedDateTime> getNextDelivery() {
            return Optional.empty();
        }

        public void dispose() {
            LifecycleUtil.dispose(this.mail);
        }
    }

    /* loaded from: input_file:org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser$CassandraMailQueueIterator.class */
    static class CassandraMailQueueIterator implements ManageableMailQueue.MailQueueIterator {
        private final Iterator<CassandraMailQueueItemView> iterator;

        /* JADX INFO: Access modifiers changed from: package-private */
        public CassandraMailQueueIterator(Iterator<CassandraMailQueueItemView> it) {
            Preconditions.checkNotNull(it);
            this.iterator = it;
        }

        public void close() {
        }

        public boolean hasNext() {
            return this.iterator.hasNext();
        }

        /* renamed from: next, reason: merged with bridge method [inline-methods] */
        public CassandraMailQueueItemView m6next() {
            return this.iterator.next();
        }
    }

    @Inject
    CassandraMailQueueBrowser(BrowseStartDAO browseStartDAO, DeletedMailsDAO deletedMailsDAO, EnqueuedMailsDAO enqueuedMailsDAO, MimeMessageStore.Factory factory, CassandraMailQueueViewConfiguration cassandraMailQueueViewConfiguration, Clock clock) {
        this.browseStartDao = browseStartDAO;
        this.deletedMailsDao = deletedMailsDAO;
        this.enqueuedMailsDao = enqueuedMailsDAO;
        this.mimeMessageStore = factory.mimeMessageStore();
        this.configuration = cassandraMailQueueViewConfiguration;
        this.clock = clock;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<CassandraMailQueueItemView> browse(MailQueueName mailQueueName) {
        return browseReferences(mailQueueName).flatMapSequential(this::toMailFuture).map(CassandraMailQueueItemView::new);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<CassandraMailQueueItemView> browseOlderThan(MailQueueName mailQueueName, Instant instant) {
        return browseReferencesOlderThan(mailQueueName, instant).flatMapSequential(this::toMailFuture).map(CassandraMailQueueItemView::new);
    }

    Flux<EnqueuedItemWithSlicingContext> browseReferencesOlderThan(MailQueueName mailQueueName, Instant instant) {
        return this.browseStartDao.findBrowseStart(mailQueueName).flatMapMany(this::allSlicesStartingAt).filter(slice -> {
            return slice.getStartSliceInstant().isBefore(instant);
        }).flatMapSequential(slice2 -> {
            return browseSlice(mailQueueName, slice2);
        }).filter(enqueuedItemWithSlicingContext -> {
            return enqueuedItemWithSlicingContext.getEnqueuedItem().getEnqueuedTime().isBefore(instant);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName mailQueueName) {
        return this.browseStartDao.findBrowseStart(mailQueueName).flatMapMany(instant -> {
            return browseReferences(mailQueueName, instant);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName mailQueueName, Instant instant) {
        return allSlicesStartingAt(instant).flatMapSequential(slice -> {
            return browseSlice(mailQueueName, slice);
        });
    }

    private Mono<Pair<EnqueuedItem, Mail>> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
        EnqueuedItem enqueuedItem = enqueuedItemWithSlicingContext.getEnqueuedItem();
        return this.mimeMessageStore.read(enqueuedItem.getPartsId()).map(mimeMessage -> {
            return Pair.of(enqueuedItem, toMail(enqueuedItem, mimeMessage));
        });
    }

    private Mail toMail(EnqueuedItem enqueuedItem, MimeMessage mimeMessage) {
        Mail mail = enqueuedItem.getMail();
        try {
            mail.setMessage(mimeMessage);
        } catch (MessagingException e) {
            LOGGER.error("error while setting mime message to mail {}", mail.getName(), e);
        }
        return mail;
    }

    private Flux<EnqueuedItemWithSlicingContext> browseSlice(MailQueueName mailQueueName, BucketedSlices.Slice slice) {
        return allBucketIds().concatMap(bucketId -> {
            return browseBucket(mailQueueName, slice, bucketId);
        }, 16).sort(Comparator.comparing(enqueuedItemWithSlicingContext -> {
            return enqueuedItemWithSlicingContext.getEnqueuedItem().getEnqueuedTime();
        }));
    }

    private Flux<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName mailQueueName, BucketedSlices.Slice slice, BucketedSlices.BucketId bucketId) {
        return this.enqueuedMailsDao.selectEnqueuedMails(mailQueueName, slice, bucketId).filterWhen(enqueuedItemWithSlicingContext -> {
            return this.deletedMailsDao.isStillEnqueued(mailQueueName, enqueuedItemWithSlicingContext.getEnqueuedItem().getEnqueueId());
        }, 16);
    }

    private Flux<BucketedSlices.Slice> allSlicesStartingAt(Instant instant) {
        return Flux.fromStream(BucketedSlices.Slice.of(instant).allSlicesTill(this.clock.instant(), this.configuration.getSliceWindow()));
    }

    private Flux<BucketedSlices.BucketId> allBucketIds() {
        return Flux.range(0, this.configuration.getBucketCount()).map((v0) -> {
            return BucketedSlices.BucketId.of(v0);
        });
    }
}
