package org.apache.james.mailbox.cassandra.mail;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import java.security.SecureRandom;
import java.time.Clock;
import java.time.Duration;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import javax.mail.Flags;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.mailbox.ApplicableFlagBuilder;
import org.apache.james.mailbox.FlagsBuilder;
import org.apache.james.mailbox.MessageManager;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.ModSeq;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.cassandra.mail.task.RecomputeMailboxCountersService;
import org.apache.james.mailbox.cassandra.mail.utils.FlagsUpdateStageResult;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.ComposedMessageId;
import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxCounters;
import org.apache.james.mailbox.model.MessageMetaData;
import org.apache.james.mailbox.model.MessageRange;
import org.apache.james.mailbox.model.UpdatedFlags;
import org.apache.james.mailbox.store.BatchSizes;
import org.apache.james.mailbox.store.FlagsUpdateCalculator;
import org.apache.james.mailbox.store.MailboxReactorUtils;
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.ModSeqProvider;
import org.apache.james.mailbox.store.mail.UidProvider;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
import org.apache.james.task.Task;
import org.apache.james.util.ReactorUtils;
import org.apache.james.util.streams.Limit;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

/* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.class */
public class CassandraMessageMapper implements MessageMapper {
    private static final int MAX_RETRY = 5;
    private final ModSeqProvider modSeqProvider;
    private final UidProvider uidProvider;
    private final CassandraMessageDAO messageDAO;
    private final CassandraMessageDAOV3 messageDAOV3;
    private final CassandraMessageIdDAO messageIdDAO;
    private final CassandraMessageIdToImapUidDAO imapUidDAO;
    private final CassandraMailboxCounterDAO mailboxCounterDAO;
    private final CassandraMailboxRecentsDAO mailboxRecentDAO;
    private final CassandraApplicableFlagDAO applicableFlagDAO;
    private final CassandraIndexTableHandler indexTableHandler;
    private final CassandraFirstUnseenDAO firstUnseenDAO;
    private final AttachmentLoader attachmentLoader;
    private final CassandraDeletedMessageDAO deletedMessageDAO;
    private final BlobStore blobStore;
    private final CassandraConfiguration cassandraConfiguration;
    private final BatchSizes batchSizes;
    private final RecomputeMailboxCountersService recomputeMailboxCountersService;
    private final SecureRandom secureRandom = new SecureRandom();
    private final int reactorConcurrency = evaluateReactorConcurrency();
    private final Clock clock;
    public static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageMapper.class);
    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
    private static final Duration MIN_RETRY_BACKOFF = Duration.ofMillis(10);
    private static final Duration MAX_RETRY_BACKOFF = Duration.ofMillis(1000);

    public CassandraMessageMapper(UidProvider uidProvider, ModSeqProvider modSeqProvider, CassandraAttachmentMapper cassandraAttachmentMapper, CassandraMessageDAO cassandraMessageDAO, CassandraMessageDAOV3 cassandraMessageDAOV3, CassandraMessageIdDAO cassandraMessageIdDAO, CassandraMessageIdToImapUidDAO cassandraMessageIdToImapUidDAO, CassandraMailboxCounterDAO cassandraMailboxCounterDAO, CassandraMailboxRecentsDAO cassandraMailboxRecentsDAO, CassandraApplicableFlagDAO cassandraApplicableFlagDAO, CassandraIndexTableHandler cassandraIndexTableHandler, CassandraFirstUnseenDAO cassandraFirstUnseenDAO, CassandraDeletedMessageDAO cassandraDeletedMessageDAO, BlobStore blobStore, CassandraConfiguration cassandraConfiguration, BatchSizes batchSizes, RecomputeMailboxCountersService recomputeMailboxCountersService, Clock clock) {
        this.uidProvider = uidProvider;
        this.modSeqProvider = modSeqProvider;
        this.messageDAO = cassandraMessageDAO;
        this.messageDAOV3 = cassandraMessageDAOV3;
        this.messageIdDAO = cassandraMessageIdDAO;
        this.imapUidDAO = cassandraMessageIdToImapUidDAO;
        this.mailboxCounterDAO = cassandraMailboxCounterDAO;
        this.mailboxRecentDAO = cassandraMailboxRecentsDAO;
        this.indexTableHandler = cassandraIndexTableHandler;
        this.firstUnseenDAO = cassandraFirstUnseenDAO;
        this.attachmentLoader = new AttachmentLoader(cassandraAttachmentMapper);
        this.applicableFlagDAO = cassandraApplicableFlagDAO;
        this.deletedMessageDAO = cassandraDeletedMessageDAO;
        this.blobStore = blobStore;
        this.cassandraConfiguration = cassandraConfiguration;
        this.batchSizes = batchSizes;
        this.recomputeMailboxCountersService = recomputeMailboxCountersService;
        this.clock = clock;
    }

    public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) {
        return this.messageIdDAO.listUids((CassandraId) mailbox.getMailboxId());
    }

    public long countMessagesInMailbox(Mailbox mailbox) {
        return getMailboxCounters(mailbox).getCount();
    }

    public MailboxCounters getMailboxCounters(Mailbox mailbox) {
        return (MailboxCounters) getMailboxCountersReactive(mailbox).block();
    }

    public Mono<MailboxCounters> getMailboxCountersReactive(Mailbox mailbox) {
        CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
        return readMailboxCounters(cassandraId).flatMap(mailboxCounters -> {
            return !mailboxCounters.isValid() ? fixCounters(mailbox).then(readMailboxCounters(cassandraId)) : Mono.just(mailboxCounters);
        }).doOnNext(mailboxCounters2 -> {
            readRepair(mailbox, mailboxCounters2);
        });
    }

    public Mono<MailboxCounters> readMailboxCounters(CassandraId cassandraId) {
        return this.mailboxCounterDAO.retrieveMailboxCounters(cassandraId).defaultIfEmpty(MailboxCounters.empty(cassandraId));
    }

    private void readRepair(Mailbox mailbox, MailboxCounters mailboxCounters) {
        if (shouldReadRepair(mailboxCounters)) {
            fixCounters(mailbox).subscribeOn(Schedulers.parallel()).subscribe();
        }
    }

    private Mono<Task.Result> fixCounters(Mailbox mailbox) {
        return this.recomputeMailboxCountersService.recomputeMailboxCounter(new RecomputeMailboxCountersService.Context(), mailbox, RecomputeMailboxCountersService.Options.trustMessageProjection());
    }

    private boolean shouldReadRepair(MailboxCounters mailboxCounters) {
        return ((this.cassandraConfiguration.getMailboxCountersReadRepairChanceMax() > 0.0f ? 1 : (this.cassandraConfiguration.getMailboxCountersReadRepairChanceMax() == 0.0f ? 0 : -1)) != 0 || (this.cassandraConfiguration.getMailboxCountersReadRepairChanceOneHundred() > 0.0f ? 1 : (this.cassandraConfiguration.getMailboxCountersReadRepairChanceOneHundred() == 0.0f ? 0 : -1)) != 0) && ((double) this.secureRandom.nextFloat()) < Math.min((double) this.cassandraConfiguration.getMailboxCountersReadRepairChanceMax(), ((double) this.cassandraConfiguration.getMailboxCountersReadRepairChanceOneHundred()) * (100.0d / ((double) mailboxCounters.getUnseen())));
    }

    public void delete(Mailbox mailbox, MailboxMessage mailboxMessage) {
        deleteAndHandleIndexUpdates(mailboxMessage.getComposedMessageIdWithMetaData()).block();
    }

    private Mono<Void> deleteAndHandleIndexUpdates(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
        return delete(composedMessageIdWithMetaData).then(this.indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, (CassandraId) composedMessageIdWithMetaData.getComposedMessageId().getMailboxId()));
    }

    private Mono<Void> deleteAndHandleIndexUpdates(Collection<ComposedMessageIdWithMetaData> collection) {
        if (collection.isEmpty()) {
            return Mono.empty();
        }
        return Flux.fromIterable(collection).flatMap(this::delete, this.reactorConcurrency).then(this.indexTableHandler.updateIndexOnDeleteComposedId((CassandraId) collection.iterator().next().getComposedMessageId().getMailboxId(), collection));
    }

    private Mono<Void> delete(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
        ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId();
        CassandraMessageId cassandraMessageId = (CassandraMessageId) composedMessageId.getMessageId();
        CassandraId cassandraId = (CassandraId) composedMessageId.getMailboxId();
        return Flux.merge(new Publisher[]{this.imapUidDAO.delete(cassandraMessageId, cassandraId), this.messageIdDAO.delete(cassandraId, composedMessageId.getUid())}).then();
    }

    public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange messageRange, MessageMapper.FetchType fetchType, int i) {
        return findInMailboxReactive(mailbox, messageRange, fetchType, i).toIterable().iterator();
    }

    public Flux<ComposedMessageIdWithMetaData> listMessagesMetadata(Mailbox mailbox, MessageRange messageRange) {
        return this.messageIdDAO.listMessagesMetadata((CassandraId) mailbox.getMailboxId(), messageRange);
    }

    public Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, MessageRange messageRange, MessageMapper.FetchType fetchType, int i) {
        CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
        Limit from = Limit.from(i);
        int forFetchType = this.batchSizes.forFetchType(fetchType);
        return from.applyOnFlux(this.messageIdDAO.retrieveMessages(cassandraId, messageRange, from)).flatMapSequential(cassandraMessageMetadata -> {
            return toMailboxMessage(cassandraMessageMetadata, fetchType);
        }, forFetchType, forFetchType);
    }

    private Mono<MailboxMessage> toMailboxMessage(CassandraMessageMetadata cassandraMessageMetadata, MessageMapper.FetchType fetchType) {
        if (fetchType == MessageMapper.FetchType.METADATA && cassandraMessageMetadata.isComplete()) {
            return Mono.just(cassandraMessageMetadata.asMailboxMessage(EMPTY_BYTE_ARRAY));
        }
        if (fetchType != MessageMapper.FetchType.HEADERS || !cassandraMessageMetadata.isComplete()) {
            return this.messageDAOV3.retrieveMessage(cassandraMessageMetadata.getComposedMessageId(), fetchType).switchIfEmpty(Mono.defer(() -> {
                return this.messageDAO.retrieveMessage(cassandraMessageMetadata.getComposedMessageId(), fetchType);
            })).map(messageRepresentation -> {
                return Pair.of(cassandraMessageMetadata.getComposedMessageId(), messageRepresentation);
            }).flatMap(pair -> {
                return this.attachmentLoader.addAttachmentToMessage(pair, cassandraMessageMetadata.getSaveDate(), fetchType);
            });
        }
        Mono from = Mono.from(this.blobStore.readBytes(this.blobStore.getDefaultBucketName(), cassandraMessageMetadata.getHeaderContent().get(), BlobStore.StoragePolicy.SIZE_BASED));
        Objects.requireNonNull(cassandraMessageMetadata);
        return from.map(cassandraMessageMetadata::asMailboxMessage);
    }

    public List<MessageUid> findRecentMessageUidsInMailbox(Mailbox mailbox) {
        return (List) findRecentMessageUidsInMailboxReactive(mailbox).block();
    }

    public Mono<List<MessageUid>> findRecentMessageUidsInMailboxReactive(Mailbox mailbox) {
        return this.mailboxRecentDAO.getRecentMessageUidsInMailbox((CassandraId) mailbox.getMailboxId()).collectList();
    }

    public MessageUid findFirstUnseenMessageUid(Mailbox mailbox) {
        return (MessageUid) this.firstUnseenDAO.retrieveFirstUnread((CassandraId) mailbox.getMailboxId()).blockOptional().orElse(null);
    }

    public Mono<Optional<MessageUid>> findFirstUnseenMessageUidReactive(Mailbox mailbox) {
        return this.firstUnseenDAO.retrieveFirstUnread((CassandraId) mailbox.getMailboxId()).map((v0) -> {
            return Optional.of(v0);
        }).switchIfEmpty(Mono.just(Optional.empty()));
    }

    public List<MessageUid> retrieveMessagesMarkedForDeletion(Mailbox mailbox, MessageRange messageRange) {
        return (List) this.deletedMessageDAO.retrieveDeletedMessage((CassandraId) mailbox.getMailboxId(), messageRange).collect(ImmutableList.toImmutableList()).block();
    }

    public Map<MessageUid, MessageMetaData> deleteMessages(Mailbox mailbox, List<MessageUid> list) {
        return (Map) deleteMessagesReactive(mailbox, list).block();
    }

    public Flux<MessageUid> retrieveMessagesMarkedForDeletionReactive(Mailbox mailbox, MessageRange messageRange) {
        return this.deletedMessageDAO.retrieveDeletedMessage((CassandraId) mailbox.getMailboxId(), messageRange);
    }

    public Mono<Map<MessageUid, MessageMetaData>> deleteMessagesReactive(Mailbox mailbox, List<MessageUid> list) {
        CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
        return Flux.fromIterable(MessageRange.toRanges(list)).concatMap(messageRange -> {
            return this.messageIdDAO.retrieveMessages(cassandraId, messageRange, Limit.unlimited());
        }).flatMap(cassandraMessageMetadata -> {
            return expungeOne(cassandraMessageMetadata.getComposedMessageId(), cassandraMessageMetadata.getSaveDate());
        }, this.cassandraConfiguration.getExpungeChunkSize()).collect(ImmutableMap.toImmutableMap((v0) -> {
            return v0.getUid();
        }, (v0) -> {
            return v0.metaData();
        })).flatMap(immutableMap -> {
            return this.indexTableHandler.updateIndexOnDelete(cassandraId, (Collection<MessageMetaData>) immutableMap.values()).thenReturn(immutableMap);
        });
    }

    private Mono<SimpleMailboxMessage> expungeOne(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Optional<Date> optional) {
        return delete(composedMessageIdWithMetaData).then(this.messageDAOV3.retrieveMessage(composedMessageIdWithMetaData, MessageMapper.FetchType.METADATA).switchIfEmpty(Mono.defer(() -> {
            return this.messageDAO.retrieveMessage(composedMessageIdWithMetaData, MessageMapper.FetchType.METADATA);
        }))).map(messageRepresentation -> {
            return messageRepresentation.toMailboxMessage(composedMessageIdWithMetaData, ImmutableList.of(), optional);
        });
    }

    public MessageMetaData move(Mailbox mailbox, MailboxMessage mailboxMessage) throws MailboxException {
        return (MessageMetaData) MailboxReactorUtils.block(moveReactive(mailbox, mailboxMessage));
    }

    public List<MessageMetaData> move(Mailbox mailbox, List<MailboxMessage> list) throws MailboxException {
        return (List) MailboxReactorUtils.block(moveReactive(mailbox, list));
    }

    public Mono<MessageMetaData> moveReactive(Mailbox mailbox, MailboxMessage mailboxMessage) {
        ComposedMessageIdWithMetaData composedMessageIdWithMetaData = mailboxMessage.getComposedMessageIdWithMetaData();
        return copyReactive(mailbox, mailboxMessage).flatMap(messageMetaData -> {
            return deleteAndHandleIndexUpdates(composedMessageIdWithMetaData).thenReturn(messageMetaData);
        });
    }

    public Mono<List<MessageMetaData>> moveReactive(Mailbox mailbox, List<MailboxMessage> list) {
        List list2 = (List) list.stream().map((v0) -> {
            return v0.getComposedMessageIdWithMetaData();
        }).collect(ImmutableList.toImmutableList());
        return copyReactive(mailbox, list).flatMap(list3 -> {
            return deleteAndHandleIndexUpdates(list2).thenReturn(list3);
        });
    }

    public ModSeq getHighestModSeq(Mailbox mailbox) throws MailboxException {
        return this.modSeqProvider.highestModSeq(mailbox);
    }

    public Mono<Optional<MessageUid>> getLastUidReactive(Mailbox mailbox) {
        return this.uidProvider.lastUidReactive(mailbox);
    }

    public Mono<ModSeq> getHighestModSeqReactive(Mailbox mailbox) {
        return this.modSeqProvider.highestModSeqReactive(mailbox);
    }

    public MessageMetaData add(Mailbox mailbox, MailboxMessage mailboxMessage) throws MailboxException {
        return (MessageMetaData) block(m35addReactive(mailbox, mailboxMessage));
    }

    /* renamed from: addReactive, reason: merged with bridge method [inline-methods] */
    public Mono<MessageMetaData> m35addReactive(Mailbox mailbox, MailboxMessage mailboxMessage) {
        return addUidAndModseqAndSaveDate(mailboxMessage, (CassandraId) mailbox.getMailboxId()).flatMap(mailboxMessage2 -> {
            return save(mailbox, mailboxMessage2).thenReturn(mailboxMessage2.metaData());
        });
    }

    private Mono<MailboxMessage> addUidAndModseqAndSaveDate(MailboxMessage mailboxMessage, CassandraId cassandraId) {
        return Mono.zip(this.uidProvider.nextUidReactive(cassandraId).switchIfEmpty(Mono.error(() -> {
            return new MailboxException("Can not find a UID to save " + mailboxMessage.getMessageId() + " in " + cassandraId);
        })), this.modSeqProvider.nextModSeqReactive(cassandraId).switchIfEmpty(Mono.error(() -> {
            return new MailboxException("Can not find a MODSEQ to save " + mailboxMessage.getMessageId() + " in " + cassandraId);
        }))).doOnNext(tuple2 -> {
            mailboxMessage.setUid((MessageUid) tuple2.getT1());
            mailboxMessage.setModSeq((ModSeq) tuple2.getT2());
            mailboxMessage.setSaveDate(Date.from(this.clock.instant()));
        }).thenReturn(mailboxMessage);
    }

    private <T> T block(Mono<T> mono) throws MailboxException {
        try {
            return (T) mono.block();
        } catch (RuntimeException e) {
            if (e.getCause() instanceof MailboxException) {
                throw e.getCause();
            }
            throw e;
        }
    }

    public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, FlagsUpdateCalculator flagsUpdateCalculator, MessageRange messageRange) {
        return ((List) updateFlagsReactive(mailbox, flagsUpdateCalculator, messageRange).block()).iterator();
    }

    public Mono<List<UpdatedFlags>> updateFlagsReactive(Mailbox mailbox, FlagsUpdateCalculator flagsUpdateCalculator, MessageRange messageRange) {
        CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
        return updateFlags(flagsUpdateCalculator, cassandraId, this.messageIdDAO.retrieveMessages(cassandraId, messageRange, Limit.unlimited()).map((v0) -> {
            return v0.getComposedMessageId();
        }));
    }

    private Mono<List<UpdatedFlags>> updateFlags(FlagsUpdateCalculator flagsUpdateCalculator, CassandraId cassandraId, Flux<ComposedMessageIdWithMetaData> flux) {
        return runUpdateStage(cassandraId, flux, flagsUpdateCalculator).flatMap(flagsUpdateStageResult -> {
            return handleUpdatesStagedRetry(cassandraId, flagsUpdateCalculator, flagsUpdateStageResult).doOnNext(flagsUpdateStageResult -> {
                if (flagsUpdateStageResult.containsFailedResults()) {
                    LOGGER.error("Can not update following UIDs {} for mailbox {}", flagsUpdateStageResult.getFailed(), cassandraId.asUuid());
                }
            }).map((v0) -> {
                return v0.getSucceeded();
            });
        });
    }

    public List<UpdatedFlags> resetRecent(Mailbox mailbox) {
        return (List) resetRecentReactive(mailbox).block();
    }

    public Mono<List<UpdatedFlags>> resetRecentReactive(Mailbox mailbox) {
        CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
        return updateFlags(new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), MessageManager.FlagsUpdateMode.REMOVE), cassandraId, this.mailboxRecentDAO.getRecentMessageUidsInMailbox(cassandraId).collectList().flatMapIterable((v0) -> {
            return MessageRange.toRanges(v0);
        }).concatMap(messageRange -> {
            return this.messageIdDAO.retrieveMessages(cassandraId, messageRange, Limit.unlimited());
        }).map((v0) -> {
            return v0.getComposedMessageId();
        }).filter(composedMessageIdWithMetaData -> {
            return composedMessageIdWithMetaData.getFlags().contains(Flags.Flag.RECENT);
        }));
    }

    private Mono<FlagsUpdateStageResult> handleUpdatesStagedRetry(CassandraId cassandraId, FlagsUpdateCalculator flagsUpdateCalculator, FlagsUpdateStageResult flagsUpdateStageResult) {
        AtomicReference atomicReference = new AtomicReference(flagsUpdateStageResult);
        Flux concatMap = Flux.range(0, this.cassandraConfiguration.getFlagsUpdateMessageMaxRetry()).takeUntil(num -> {
            return ((FlagsUpdateStageResult) atomicReference.get()).containsFailedResults();
        }).concatMap(num2 -> {
            return retryUpdatesStage(cassandraId, flagsUpdateCalculator, ((FlagsUpdateStageResult) atomicReference.get()).getFailed()).doOnNext(flagsUpdateStageResult2 -> {
                atomicReference.set(((FlagsUpdateStageResult) atomicReference.get()).keepSucceded().merge(flagsUpdateStageResult2));
            });
        });
        Objects.requireNonNull(atomicReference);
        return concatMap.then(Mono.fromCallable(atomicReference::get));
    }

    private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId cassandraId, FlagsUpdateCalculator flagsUpdateCalculator, List<ComposedMessageId> list) {
        return !list.isEmpty() ? runUpdateStage(cassandraId, Flux.fromIterable(list).flatMap(composedMessageId -> {
            return this.imapUidDAO.retrieve((CassandraMessageId) composedMessageId.getMessageId(), Optional.of((CassandraId) composedMessageId.getMailboxId()), chooseReadConsistencyUponWrites()).map((v0) -> {
                return v0.getComposedMessageId();
            });
        }, 16), flagsUpdateCalculator) : Mono.empty();
    }

    private JamesExecutionProfiles.ConsistencyChoice chooseReadConsistencyUponWrites() {
        return this.cassandraConfiguration.isMessageWriteStrongConsistency() ? JamesExecutionProfiles.ConsistencyChoice.STRONG : JamesExecutionProfiles.ConsistencyChoice.WEAK;
    }

    private Mono<FlagsUpdateStageResult> runUpdateStage(CassandraId cassandraId, Flux<ComposedMessageIdWithMetaData> flux, FlagsUpdateCalculator flagsUpdateCalculator) {
        return computeNewModSeq(cassandraId).flatMapMany(modSeq -> {
            return flux.flatMap(composedMessageIdWithMetaData -> {
                return tryFlagsUpdate(flagsUpdateCalculator, modSeq, composedMessageIdWithMetaData);
            }, this.reactorConcurrency);
        }).reduce(FlagsUpdateStageResult.none(), (v0, v1) -> {
            return v0.merge(v1);
        }).flatMap(flagsUpdateStageResult -> {
            return updateIndexesForUpdatesResult(cassandraId, flagsUpdateStageResult);
        });
    }

    private Mono<ModSeq> computeNewModSeq(CassandraId cassandraId) {
        return this.modSeqProvider.nextModSeqReactive(cassandraId).switchIfEmpty(ReactorUtils.executeAndEmpty(() -> {
            new RuntimeException("ModSeq generation failed for mailbox " + cassandraId.asUuid());
        }));
    }

    private Mono<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId cassandraId, FlagsUpdateStageResult flagsUpdateStageResult) {
        return this.indexTableHandler.updateIndexOnFlagsUpdate(cassandraId, flagsUpdateStageResult.getSucceeded()).onErrorResume(th -> {
            LOGGER.error("Could not update flag indexes for mailboxId {}. This will lead to inconsistencies across Cassandra tables", cassandraId, th);
            return Mono.empty();
        }).thenReturn(flagsUpdateStageResult);
    }

    public MessageMetaData copy(Mailbox mailbox, MailboxMessage mailboxMessage) throws MailboxException {
        return (MessageMetaData) MailboxReactorUtils.block(copyReactive(mailbox, mailboxMessage));
    }

    public List<MessageMetaData> copy(Mailbox mailbox, List<MailboxMessage> list) throws MailboxException {
        return (List) MailboxReactorUtils.block(copyReactive(mailbox, list));
    }

    public Mono<MessageMetaData> copyReactive(Mailbox mailbox, MailboxMessage mailboxMessage) {
        mailboxMessage.setFlags(new FlagsBuilder().add(new Flags[]{mailboxMessage.createFlags()}).add(new Flags.Flag[]{Flags.Flag.RECENT}).build());
        mailboxMessage.setSaveDate(Date.from(this.clock.instant()));
        return setInMailboxReactive(mailbox, mailboxMessage);
    }

    public Mono<List<MessageMetaData>> copyReactive(Mailbox mailbox, List<MailboxMessage> list) {
        return list.isEmpty() ? Mono.empty() : setMessagesInMailboxReactive(mailbox, (List) list.stream().map(mailboxMessage -> {
            mailboxMessage.setFlags(new FlagsBuilder().add(new Flags[]{mailboxMessage.createFlags()}).add(new Flags.Flag[]{Flags.Flag.RECENT}).build());
            mailboxMessage.setSaveDate(Date.from(this.clock.instant()));
            return mailboxMessage;
        }).collect(ImmutableList.toImmutableList())).collectList();
    }

    public Optional<MessageUid> getLastUid(Mailbox mailbox) throws MailboxException {
        return this.uidProvider.lastUid(mailbox);
    }

    public Flags getApplicableFlag(Mailbox mailbox) {
        return ApplicableFlagBuilder.builder().add(new Flags[]{(Flags) this.applicableFlagDAO.retrieveApplicableFlag((CassandraId) mailbox.getMailboxId()).defaultIfEmpty(new Flags()).block()}).build();
    }

    public Mono<Flags> getApplicableFlagReactive(Mailbox mailbox) {
        return this.applicableFlagDAO.retrieveApplicableFlag((CassandraId) mailbox.getMailboxId()).defaultIfEmpty(new Flags()).map(flags -> {
            return ApplicableFlagBuilder.builder().add(new Flags[]{flags}).build();
        });
    }

    private Mono<MessageMetaData> setInMailboxReactive(Mailbox mailbox, MailboxMessage mailboxMessage) {
        CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
        return addUidAndModseqAndSaveDate(mailboxMessage, cassandraId).flatMap(mailboxMessage2 -> {
            return insertMetadata(mailboxMessage2, cassandraId, CassandraMessageMetadata.from(mailboxMessage2).withMailboxId(cassandraId)).thenReturn(mailboxMessage2);
        }).map((v0) -> {
            return v0.metaData();
        });
    }

    private Flux<MessageMetaData> setMessagesInMailboxReactive(Mailbox mailbox, List<MailboxMessage> list) {
        CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
        Mono nextUids = this.uidProvider.nextUids(cassandraId, list.size());
        return this.modSeqProvider.nextModSeqReactive(cassandraId).flatMap(modSeq -> {
            return nextUids.map(list2 -> {
                return Pair.of(list2, modSeq);
            });
        }).map(pair -> {
            return ((List) pair.getKey()).stream().map(messageUid -> {
                return Pair.of(messageUid, (ModSeq) pair.getRight());
            });
        }).map(stream -> {
            return (List) Streams.zip(stream, list.stream(), (pair2, mailboxMessage) -> {
                mailboxMessage.setUid((MessageUid) pair2.getKey());
                mailboxMessage.setModSeq((ModSeq) pair2.getValue());
                return mailboxMessage;
            }).collect(ImmutableList.toImmutableList());
        }).flatMapMany(list2 -> {
            return insertIds(list2, cassandraId).thenMany(Flux.fromIterable(list2));
        }).map((v0) -> {
            return v0.metaData();
        });
    }

    private Mono<Void> save(Mailbox mailbox, MailboxMessage mailboxMessage) {
        CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
        return this.messageDAOV3.save(mailboxMessage).flatMap(tuple2 -> {
            return insertIds(mailboxMessage, cassandraId, (BlobId) tuple2.getT1());
        });
    }

    private Mono<Void> insertIds(MailboxMessage mailboxMessage, CassandraId cassandraId, BlobId blobId) {
        return insertMetadata(mailboxMessage, cassandraId, CassandraMessageMetadata.from(mailboxMessage, blobId));
    }

    private Mono<Void> insertMetadata(MailboxMessage mailboxMessage, CassandraId cassandraId, CassandraMessageMetadata cassandraMessageMetadata) {
        return this.imapUidDAO.insert(cassandraMessageMetadata).then(Flux.merge(new Publisher[]{this.messageIdDAO.insert(cassandraMessageMetadata).retryWhen(Retry.backoff(5L, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)), this.indexTableHandler.updateIndexOnAdd(mailboxMessage, cassandraId)}).then());
    }

    private CassandraMessageMetadata computeId(MailboxMessage mailboxMessage, CassandraId cassandraId) {
        return CassandraMessageMetadata.from(mailboxMessage).withMailboxId(cassandraId);
    }

    private Mono<Void> insertIds(Collection<MailboxMessage> collection, CassandraId cassandraId) {
        return Flux.fromIterable(collection).map(mailboxMessage -> {
            return computeId(mailboxMessage, cassandraId);
        }).concatMap(cassandraMessageMetadata -> {
            return this.imapUidDAO.insert(cassandraMessageMetadata).thenReturn(cassandraMessageMetadata);
        }).flatMap(cassandraMessageMetadata2 -> {
            return this.messageIdDAO.insert(cassandraMessageMetadata2).retryWhen(Retry.backoff(5L, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF));
        }, this.reactorConcurrency).then(this.indexTableHandler.updateIndexOnAdd(collection, cassandraId));
    }

    private Mono<FlagsUpdateStageResult> tryFlagsUpdate(FlagsUpdateCalculator flagsUpdateCalculator, ModSeq modSeq, ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
        Flags flags = composedMessageIdWithMetaData.getFlags();
        Flags buildNewFlags = flagsUpdateCalculator.buildNewFlags(flags);
        return identicalFlags(flags, buildNewFlags) ? Mono.just(FlagsUpdateStageResult.success(UpdatedFlags.builder().uid(composedMessageIdWithMetaData.getComposedMessageId().getUid()).messageId(composedMessageIdWithMetaData.getComposedMessageId().getMessageId()).modSeq(composedMessageIdWithMetaData.getModSeq()).oldFlags(flags).newFlags(buildNewFlags).build())) : updateFlags(composedMessageIdWithMetaData, buildNewFlags, modSeq).map(bool -> {
            return bool.booleanValue() ? FlagsUpdateStageResult.success(UpdatedFlags.builder().uid(composedMessageIdWithMetaData.getComposedMessageId().getUid()).messageId(composedMessageIdWithMetaData.getComposedMessageId().getMessageId()).modSeq(modSeq).oldFlags(flags).newFlags(buildNewFlags).build()) : FlagsUpdateStageResult.fail(composedMessageIdWithMetaData.getComposedMessageId());
        });
    }

    private boolean identicalFlags(Flags flags, Flags flags2) {
        return flags.equals(flags2);
    }

    private Mono<Boolean> updateFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Flags flags, ModSeq modSeq) {
        ComposedMessageIdWithMetaData build = ComposedMessageIdWithMetaData.builder().composedMessageId(composedMessageIdWithMetaData.getComposedMessageId()).modSeq(modSeq).flags(flags).threadId(composedMessageIdWithMetaData.getThreadId()).build();
        ComposedMessageId composedMessageId = build.getComposedMessageId();
        ModSeq modSeq2 = composedMessageIdWithMetaData.getModSeq();
        UpdatedFlags build2 = UpdatedFlags.builder().messageId(composedMessageId.getMessageId()).modSeq(build.getModSeq()).oldFlags(composedMessageIdWithMetaData.getFlags()).newFlags(build.getFlags()).uid(composedMessageId.getUid()).build();
        return this.imapUidDAO.updateMetadata(composedMessageId, build2, modSeq2).flatMap(bool -> {
            return bool.booleanValue() ? this.messageIdDAO.updateMetadata(composedMessageId, build2).thenReturn(true) : Mono.just(false);
        });
    }

    private int evaluateReactorConcurrency() {
        return this.cassandraConfiguration.isMessageWriteStrongConsistency() ? 1 : 4;
    }
}
