package org.apache.james.mailbox.cassandra.mail;

import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import javax.mail.Flags;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.mailbox.ApplicableFlagBuilder;
import org.apache.james.mailbox.FlagsBuilder;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.ModSeq;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.cassandra.mail.utils.FlagsUpdateStageResult;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.ComposedMessageId;
import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxCounters;
import org.apache.james.mailbox.model.MessageMetaData;
import org.apache.james.mailbox.model.MessageRange;
import org.apache.james.mailbox.model.UpdatedFlags;
import org.apache.james.mailbox.store.FlagsUpdateCalculator;
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
import org.apache.james.mailbox.store.transaction.Mapper;
import org.apache.james.util.OptionalUtils;
import org.apache.james.util.ReactorUtils;
import org.apache.james.util.streams.Limit;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.class */
public class CassandraMessageMapper implements MessageMapper {
    public static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageMapper.class);
    private final CassandraModSeqProvider modSeqProvider;
    private final CassandraUidProvider uidProvider;
    private final CassandraMessageDAO messageDAO;
    private final CassandraMessageIdDAO messageIdDAO;
    private final CassandraMessageIdToImapUidDAO imapUidDAO;
    private final CassandraMailboxCounterDAO mailboxCounterDAO;
    private final CassandraMailboxRecentsDAO mailboxRecentDAO;
    private final CassandraApplicableFlagDAO applicableFlagDAO;
    private final CassandraIndexTableHandler indexTableHandler;
    private final CassandraFirstUnseenDAO firstUnseenDAO;
    private final AttachmentLoader attachmentLoader;
    private final CassandraDeletedMessageDAO deletedMessageDAO;
    private final CassandraConfiguration cassandraConfiguration;

    public CassandraMessageMapper(CassandraUidProvider cassandraUidProvider, CassandraModSeqProvider cassandraModSeqProvider, CassandraAttachmentMapper cassandraAttachmentMapper, CassandraMessageDAO cassandraMessageDAO, CassandraMessageIdDAO cassandraMessageIdDAO, CassandraMessageIdToImapUidDAO cassandraMessageIdToImapUidDAO, CassandraMailboxCounterDAO cassandraMailboxCounterDAO, CassandraMailboxRecentsDAO cassandraMailboxRecentsDAO, CassandraApplicableFlagDAO cassandraApplicableFlagDAO, CassandraIndexTableHandler cassandraIndexTableHandler, CassandraFirstUnseenDAO cassandraFirstUnseenDAO, CassandraDeletedMessageDAO cassandraDeletedMessageDAO, CassandraConfiguration cassandraConfiguration) {
        this.uidProvider = cassandraUidProvider;
        this.modSeqProvider = cassandraModSeqProvider;
        this.messageDAO = cassandraMessageDAO;
        this.messageIdDAO = cassandraMessageIdDAO;
        this.imapUidDAO = cassandraMessageIdToImapUidDAO;
        this.mailboxCounterDAO = cassandraMailboxCounterDAO;
        this.mailboxRecentDAO = cassandraMailboxRecentsDAO;
        this.indexTableHandler = cassandraIndexTableHandler;
        this.firstUnseenDAO = cassandraFirstUnseenDAO;
        this.attachmentLoader = new AttachmentLoader(cassandraAttachmentMapper);
        this.applicableFlagDAO = cassandraApplicableFlagDAO;
        this.deletedMessageDAO = cassandraDeletedMessageDAO;
        this.cassandraConfiguration = cassandraConfiguration;
    }

    public Iterator<MessageUid> listAllMessageUids(Mailbox mailbox) {
        return this.messageIdDAO.retrieveMessages((CassandraId) mailbox.getMailboxId(), MessageRange.all()).map(composedMessageIdWithMetaData -> {
            return composedMessageIdWithMetaData.getComposedMessageId().getUid();
        }).toIterable().iterator();
    }

    public long countMessagesInMailbox(Mailbox mailbox) {
        return getMailboxCounters(mailbox).getCount();
    }

    public MailboxCounters getMailboxCounters(Mailbox mailbox) {
        return (MailboxCounters) getMailboxCountersAsMono(mailbox).block();
    }

    private Mono<MailboxCounters> getMailboxCountersAsMono(Mailbox mailbox) {
        CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
        return this.mailboxCounterDAO.retrieveMailboxCounters(cassandraId).defaultIfEmpty(MailboxCounters.builder().mailboxId(cassandraId).count(0L).unseen(0L).build());
    }

    public List<MailboxCounters> getMailboxCounters(Collection<Mailbox> collection) {
        return (List) Flux.fromIterable(collection).publishOn(Schedulers.elastic()).concatMap(this::getMailboxCountersAsMono).toStream().collect(Guavate.toImmutableList());
    }

    public void delete(Mailbox mailbox, MailboxMessage mailboxMessage) {
        deleteAsFuture(mailboxMessage).block();
    }

    private Mono<Void> deleteAsFuture(MailboxMessage mailboxMessage) {
        return deleteUsingMailboxId(mailboxMessage.getComposedMessageIdWithMetaData());
    }

    private Mono<Void> deleteUsingMailboxId(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
        ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId();
        CassandraMessageId cassandraMessageId = (CassandraMessageId) composedMessageId.getMessageId();
        CassandraId cassandraId = (CassandraId) composedMessageId.getMailboxId();
        return Flux.merge(new Publisher[]{this.imapUidDAO.delete(cassandraMessageId, cassandraId), this.messageIdDAO.delete(cassandraId, composedMessageId.getUid())}).then(this.indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, cassandraId));
    }

    public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange messageRange, MessageMapper.FetchType fetchType, int i) {
        Flux flatMap = retrieveMessageIds((CassandraId) mailbox.getMailboxId(), messageRange).flatMap(list -> {
            return retrieveMessages(list, fetchType, Limit.from(i));
        });
        Class<MailboxMessage> cls = MailboxMessage.class;
        Objects.requireNonNull(MailboxMessage.class);
        return flatMap.map((v1) -> {
            return r1.cast(v1);
        }).sort(Comparator.comparing((v0) -> {
            return v0.getUid();
        })).toIterable().iterator();
    }

    private Flux<List<ComposedMessageIdWithMetaData>> retrieveMessageIds(CassandraId cassandraId, MessageRange messageRange) {
        return this.messageIdDAO.retrieveMessages(cassandraId, messageRange).window(this.cassandraConfiguration.getMessageReadChunkSize()).flatMap(flux -> {
            return flux.collect(Guavate.toImmutableList());
        });
    }

    private Flux<MailboxMessage> retrieveMessages(List<ComposedMessageIdWithMetaData> list, MessageMapper.FetchType fetchType, Limit limit) {
        return this.messageDAO.retrieveMessages(list, fetchType, limit).filter((v0) -> {
            return v0.isFound();
        }).map((v0) -> {
            return v0.message();
        }).flatMap(pair -> {
            return this.attachmentLoader.addAttachmentToMessage(pair, fetchType);
        });
    }

    public List<MessageUid> findRecentMessageUidsInMailbox(Mailbox mailbox) {
        return (List) this.mailboxRecentDAO.getRecentMessageUidsInMailbox((CassandraId) mailbox.getMailboxId()).collectList().block();
    }

    public MessageUid findFirstUnseenMessageUid(Mailbox mailbox) {
        return (MessageUid) this.firstUnseenDAO.retrieveFirstUnread((CassandraId) mailbox.getMailboxId()).blockOptional().orElse(null);
    }

    public List<MessageUid> retrieveMessagesMarkedForDeletion(Mailbox mailbox, MessageRange messageRange) {
        return (List) this.deletedMessageDAO.retrieveDeletedMessage((CassandraId) mailbox.getMailboxId(), messageRange).collect(Guavate.toImmutableList()).block();
    }

    public Map<MessageUid, MessageMetaData> deleteMessages(Mailbox mailbox, List<MessageUid> list) {
        CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
        return (Map) Flux.fromStream(list.stream()).flatMap(messageUid -> {
            return expungeOne(cassandraId, messageUid);
        }, this.cassandraConfiguration.getExpungeChunkSize()).collect(Guavate.toImmutableMap((v0) -> {
            return v0.getUid();
        }, (v0) -> {
            return v0.metaData();
        })).block();
    }

    private Flux<SimpleMailboxMessage> expungeOne(CassandraId cassandraId, MessageUid messageUid) {
        return retrieveComposedId(cassandraId, messageUid).flatMap(composedMessageIdWithMetaData -> {
            return deleteUsingMailboxId(composedMessageIdWithMetaData).thenReturn(composedMessageIdWithMetaData);
        }).flatMapMany(composedMessageIdWithMetaData2 -> {
            return this.messageDAO.retrieveMessages(ImmutableList.of(composedMessageIdWithMetaData2), MessageMapper.FetchType.Metadata, Limit.unlimited());
        }).filter((v0) -> {
            return v0.isFound();
        }).map((v0) -> {
            return v0.message();
        }).map(pair -> {
            return ((MessageWithoutAttachment) pair.getKey()).toMailboxMessage(ImmutableList.of());
        });
    }

    private Mono<ComposedMessageIdWithMetaData> retrieveComposedId(CassandraId cassandraId, MessageUid messageUid) {
        return this.messageIdDAO.retrieve(cassandraId, messageUid).doOnNext(optional -> {
            OptionalUtils.executeIfEmpty(optional, () -> {
                LOGGER.warn("Could not retrieve message {} {}", cassandraId, messageUid);
            });
        }).handle((optional2, synchronousSink) -> {
            Objects.requireNonNull(synchronousSink);
            optional2.ifPresent((v1) -> {
                r1.next(v1);
            });
        });
    }

    public MessageMetaData move(Mailbox mailbox, MailboxMessage mailboxMessage) throws MailboxException {
        ComposedMessageIdWithMetaData composedMessageIdWithMetaData = mailboxMessage.getComposedMessageIdWithMetaData();
        MessageMetaData copy = copy(mailbox, mailboxMessage);
        deleteUsingMailboxId(composedMessageIdWithMetaData).block();
        return copy;
    }

    public void endRequest() {
    }

    public ModSeq getHighestModSeq(Mailbox mailbox) throws MailboxException {
        return this.modSeqProvider.highestModSeq(mailbox);
    }

    public MessageMetaData add(Mailbox mailbox, MailboxMessage mailboxMessage) throws MailboxException {
        CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
        return (MessageMetaData) block(addUidAndModseq(mailboxMessage, cassandraId).flatMap(Throwing.function(mailboxMessage2 -> {
            return save(mailbox, mailboxMessage2).thenReturn(mailboxMessage2);
        })).flatMap(mailboxMessage3 -> {
            return this.indexTableHandler.updateIndexOnAdd(mailboxMessage, cassandraId).thenReturn(mailboxMessage3);
        }).map((v0) -> {
            return v0.metaData();
        }));
    }

    private Mono<MailboxMessage> addUidAndModseq(MailboxMessage mailboxMessage, CassandraId cassandraId) {
        return Mono.zip(this.uidProvider.nextUid(cassandraId).switchIfEmpty(Mono.error(() -> {
            return new MailboxException("Can not find a UID to save " + mailboxMessage.getMessageId() + " in " + cassandraId);
        })), this.modSeqProvider.nextModSeq(cassandraId).switchIfEmpty(Mono.error(() -> {
            return new MailboxException("Can not find a MODSEQ to save " + mailboxMessage.getMessageId() + " in " + cassandraId);
        }))).doOnNext(tuple2 -> {
            mailboxMessage.setUid((MessageUid) tuple2.getT1());
            mailboxMessage.setModSeq((ModSeq) tuple2.getT2());
        }).thenReturn(mailboxMessage);
    }

    private <T> T block(Mono<T> mono) throws MailboxException {
        try {
            return (T) mono.block();
        } catch (RuntimeException e) {
            if (e.getCause() instanceof MailboxException) {
                throw e.getCause();
            }
            throw e;
        }
    }

    public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, FlagsUpdateCalculator flagsUpdateCalculator, MessageRange messageRange) {
        CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
        FlagsUpdateStageResult handleUpdatesStagedRetry = handleUpdatesStagedRetry(cassandraId, flagsUpdateCalculator, (FlagsUpdateStageResult) runUpdateStage(cassandraId, this.messageIdDAO.retrieveMessages(cassandraId, messageRange), flagsUpdateCalculator).block());
        if (handleUpdatesStagedRetry.containsFailedResults()) {
            LOGGER.error("Can not update following UIDs {} for mailbox {}", handleUpdatesStagedRetry.getFailed(), cassandraId.asUuid());
        }
        return handleUpdatesStagedRetry.getSucceeded().iterator();
    }

    private FlagsUpdateStageResult handleUpdatesStagedRetry(CassandraId cassandraId, FlagsUpdateCalculator flagsUpdateCalculator, FlagsUpdateStageResult flagsUpdateStageResult) {
        FlagsUpdateStageResult flagsUpdateStageResult2 = flagsUpdateStageResult;
        int i = 0;
        while (i < this.cassandraConfiguration.getFlagsUpdateMessageMaxRetry() && flagsUpdateStageResult2.containsFailedResults()) {
            i++;
            flagsUpdateStageResult2 = flagsUpdateStageResult2.keepSucceded().merge((FlagsUpdateStageResult) retryUpdatesStage(cassandraId, flagsUpdateCalculator, flagsUpdateStageResult2.getFailed()).block());
        }
        return flagsUpdateStageResult2;
    }

    private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId cassandraId, FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> list) {
        return !list.isEmpty() ? runUpdateStage(cassandraId, Flux.fromIterable(list).flatMap(messageUid -> {
            return this.messageIdDAO.retrieve(cassandraId, messageUid);
        }).handle((optional, synchronousSink) -> {
            Objects.requireNonNull(synchronousSink);
            optional.ifPresent((v1) -> {
                r1.next(v1);
            });
        }), flagsUpdateCalculator) : Mono.empty();
    }

    private Mono<FlagsUpdateStageResult> runUpdateStage(CassandraId cassandraId, Flux<ComposedMessageIdWithMetaData> flux, FlagsUpdateCalculator flagsUpdateCalculator) {
        Mono<ModSeq> computeNewModSeq = computeNewModSeq(cassandraId);
        return flux.concatMap(composedMessageIdWithMetaData -> {
            return computeNewModSeq.flatMap(modSeq -> {
                return tryFlagsUpdate(flagsUpdateCalculator, modSeq, composedMessageIdWithMetaData);
            });
        }).reduce(FlagsUpdateStageResult.none(), (v0, v1) -> {
            return v0.merge(v1);
        }).flatMap(flagsUpdateStageResult -> {
            return updateIndexesForUpdatesResult(cassandraId, flagsUpdateStageResult);
        });
    }

    private Mono<ModSeq> computeNewModSeq(CassandraId cassandraId) {
        return this.modSeqProvider.nextModSeq(cassandraId).switchIfEmpty(ReactorUtils.executeAndEmpty(() -> {
            new RuntimeException("ModSeq generation failed for mailbox " + cassandraId.asUuid());
        }));
    }

    private Mono<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId cassandraId, FlagsUpdateStageResult flagsUpdateStageResult) {
        return Flux.fromIterable(flagsUpdateStageResult.getSucceeded()).flatMap(Throwing.function(updatedFlags -> {
            return this.indexTableHandler.updateIndexOnFlagsUpdate(cassandraId, updatedFlags);
        }).fallbackTo(updatedFlags2 -> {
            LOGGER.error("Could not update flag indexes for mailboxId {} UID {}. This will lead to inconsistencies across Cassandra tables", cassandraId, updatedFlags2.getUid());
            return Mono.empty();
        })).then(Mono.just(flagsUpdateStageResult));
    }

    public <T> T execute(Mapper.Transaction<T> transaction) throws MailboxException {
        return (T) transaction.run();
    }

    public MessageMetaData copy(Mailbox mailbox, MailboxMessage mailboxMessage) throws MailboxException {
        mailboxMessage.setFlags(new FlagsBuilder().add(new Flags[]{mailboxMessage.createFlags()}).add(new Flags.Flag[]{Flags.Flag.RECENT}).build());
        return setInMailbox(mailbox, mailboxMessage);
    }

    public Optional<MessageUid> getLastUid(Mailbox mailbox) throws MailboxException {
        return this.uidProvider.lastUid(mailbox);
    }

    public Flags getApplicableFlag(Mailbox mailbox) {
        return ApplicableFlagBuilder.builder().add(new Flags[]{(Flags) this.applicableFlagDAO.retrieveApplicableFlag((CassandraId) mailbox.getMailboxId()).defaultIfEmpty(new Flags()).block()}).build();
    }

    private MessageMetaData setInMailbox(Mailbox mailbox, MailboxMessage mailboxMessage) throws MailboxException {
        CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
        return (MessageMetaData) block(addUidAndModseq(mailboxMessage, cassandraId).flatMap(mailboxMessage2 -> {
            return insertIds(mailboxMessage2, cassandraId).thenReturn(mailboxMessage2);
        }).flatMap(mailboxMessage3 -> {
            return this.indexTableHandler.updateIndexOnAdd(mailboxMessage, cassandraId).thenReturn(mailboxMessage3);
        }).map((v0) -> {
            return v0.metaData();
        }));
    }

    private Mono<Void> save(Mailbox mailbox, MailboxMessage mailboxMessage) throws MailboxException {
        return this.messageDAO.save(mailboxMessage).thenEmpty(insertIds(mailboxMessage, (CassandraId) mailbox.getMailboxId()));
    }

    private Mono<Void> insertIds(MailboxMessage mailboxMessage, CassandraId cassandraId) {
        ComposedMessageIdWithMetaData build = ComposedMessageIdWithMetaData.builder().composedMessageId(new ComposedMessageId(cassandraId, mailboxMessage.getMessageId(), mailboxMessage.getUid())).flags(mailboxMessage.createFlags()).modSeq(mailboxMessage.getModSeq()).build();
        return Flux.merge(new Publisher[]{this.messageIdDAO.insert(build), this.imapUidDAO.insert(build)}).then();
    }

    private Mono<FlagsUpdateStageResult> tryFlagsUpdate(FlagsUpdateCalculator flagsUpdateCalculator, ModSeq modSeq, ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
        Flags flags = composedMessageIdWithMetaData.getFlags();
        Flags buildNewFlags = flagsUpdateCalculator.buildNewFlags(flags);
        return identicalFlags(flags, buildNewFlags) ? Mono.just(FlagsUpdateStageResult.success(UpdatedFlags.builder().uid(composedMessageIdWithMetaData.getComposedMessageId().getUid()).modSeq(composedMessageIdWithMetaData.getModSeq()).oldFlags(flags).newFlags(buildNewFlags).build())) : updateFlags(composedMessageIdWithMetaData, buildNewFlags, modSeq).map(bool -> {
            return bool.booleanValue() ? FlagsUpdateStageResult.success(UpdatedFlags.builder().uid(composedMessageIdWithMetaData.getComposedMessageId().getUid()).modSeq(modSeq).oldFlags(flags).newFlags(buildNewFlags).build()) : FlagsUpdateStageResult.fail(composedMessageIdWithMetaData.getComposedMessageId().getUid());
        });
    }

    private boolean identicalFlags(Flags flags, Flags flags2) {
        return flags.equals(flags2);
    }

    private Mono<Boolean> updateFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Flags flags, ModSeq modSeq) {
        ComposedMessageIdWithMetaData build = ComposedMessageIdWithMetaData.builder().composedMessageId(composedMessageIdWithMetaData.getComposedMessageId()).modSeq(modSeq).flags(flags).build();
        return this.imapUidDAO.updateMetadata(build, composedMessageIdWithMetaData.getModSeq()).flatMap(bool -> {
            return bool.booleanValue() ? this.messageIdDAO.updateMetadata(build).thenReturn(true) : Mono.just(false);
        });
    }
}
