package org.apache.james.mailbox.cassandra.mail;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.relation.Relation;
import com.datastax.oss.driver.api.querybuilder.update.Assignment;
import com.datastax.oss.driver.api.querybuilder.update.Update;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import javax.inject.Inject;
import javax.mail.Flags;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.blob.api.BlobId;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.ModSeq;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageMetadata;
import org.apache.james.mailbox.cassandra.table.CassandraMessageIdTable;
import org.apache.james.mailbox.cassandra.table.CassandraMessageIds;
import org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table;
import org.apache.james.mailbox.cassandra.table.Flag;
import org.apache.james.mailbox.cassandra.table.MessageIdToImapUid;
import org.apache.james.mailbox.model.ComposedMessageId;
import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.ThreadId;
import org.apache.james.mailbox.model.UpdatedFlags;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.class */
public class CassandraMessageIdToImapUidDAO {
    private static final String MOD_SEQ_CONDITION = "modSeqCondition";
    private static final String ADDED_USERS_FLAGS = "added_user_flags";
    private static final String REMOVED_USERS_FLAGS = "removed_user_flags";
    private final CassandraAsyncExecutor cassandraAsyncExecutor;
    private final BlobId.Factory blobIdFactory;
    private final PreparedStatement delete = prepareDelete();
    private final PreparedStatement insert = prepareInsert();
    private final PreparedStatement insertForced = prepareInsertForced();
    private final PreparedStatement update = prepareUpdate();
    private final PreparedStatement selectAll = prepareSelectAll();
    private final PreparedStatement select = prepareSelect();
    private final PreparedStatement listStatement = prepareList();
    private final CassandraConfiguration cassandraConfiguration;
    private final CqlSession session;
    private final DriverExecutionProfile lwtProfile;

    @Inject
    public CassandraMessageIdToImapUidDAO(CqlSession cqlSession, BlobId.Factory factory, CassandraConfiguration cassandraConfiguration) {
        this.session = cqlSession;
        this.cassandraAsyncExecutor = new CassandraAsyncExecutor(cqlSession);
        this.blobIdFactory = factory;
        this.cassandraConfiguration = cassandraConfiguration;
        this.lwtProfile = JamesExecutionProfiles.getLWTProfile(cqlSession);
    }

    private PreparedStatement prepareDelete() {
        return this.session.prepare(QueryBuilder.deleteFrom(MessageIdToImapUid.TABLE_NAME).where(List.of((Relation) Relation.column(CassandraMessageIds.MESSAGE_ID).isEqualTo(QueryBuilder.bindMarker(CassandraMessageIds.MESSAGE_ID)), (Relation) Relation.column(CassandraMessageIds.MAILBOX_ID).isEqualTo(QueryBuilder.bindMarker(CassandraMessageIds.MAILBOX_ID)))).build());
    }

    private PreparedStatement prepareInsert() {
        return this.cassandraConfiguration.isMessageWriteStrongConsistency() ? this.session.prepare(QueryBuilder.insertInto(MessageIdToImapUid.TABLE_NAME).value(CassandraMessageIds.MESSAGE_ID, QueryBuilder.bindMarker(CassandraMessageIds.MESSAGE_ID)).value(CassandraMessageIds.MAILBOX_ID, QueryBuilder.bindMarker(CassandraMessageIds.MAILBOX_ID)).value(CassandraMessageIds.IMAP_UID, QueryBuilder.bindMarker(CassandraMessageIds.IMAP_UID)).value(MessageIdToImapUid.THREAD_ID, QueryBuilder.bindMarker(MessageIdToImapUid.THREAD_ID)).value(MessageIdToImapUid.MOD_SEQ, QueryBuilder.bindMarker(MessageIdToImapUid.MOD_SEQ)).value(Flag.ANSWERED, QueryBuilder.bindMarker(Flag.ANSWERED)).value(Flag.DELETED, QueryBuilder.bindMarker(Flag.DELETED)).value(Flag.DRAFT, QueryBuilder.bindMarker(Flag.DRAFT)).value(Flag.FLAGGED, QueryBuilder.bindMarker(Flag.FLAGGED)).value(Flag.RECENT, QueryBuilder.bindMarker(Flag.RECENT)).value(Flag.SEEN, QueryBuilder.bindMarker(Flag.SEEN)).value(Flag.USER, QueryBuilder.bindMarker(Flag.USER)).value(Flag.USER_FLAGS, QueryBuilder.bindMarker(Flag.USER_FLAGS)).value(CassandraMessageV3Table.INTERNAL_DATE, QueryBuilder.bindMarker(CassandraMessageV3Table.INTERNAL_DATE)).value(CassandraMessageIdTable.SAVE_DATE, QueryBuilder.bindMarker(CassandraMessageIdTable.SAVE_DATE)).value(CassandraMessageV3Table.BODY_START_OCTET, QueryBuilder.bindMarker(CassandraMessageV3Table.BODY_START_OCTET)).value(CassandraMessageV3Table.FULL_CONTENT_OCTETS, QueryBuilder.bindMarker(CassandraMessageV3Table.FULL_CONTENT_OCTETS)).value(CassandraMessageV3Table.HEADER_CONTENT, QueryBuilder.bindMarker(CassandraMessageV3Table.HEADER_CONTENT)).ifNotExists().build()) : this.session.prepare(QueryBuilder.update(MessageIdToImapUid.TABLE_NAME).set(new Assignment[]{Assignment.setColumn(MessageIdToImapUid.THREAD_ID, QueryBuilder.bindMarker(MessageIdToImapUid.THREAD_ID)), Assignment.setColumn(MessageIdToImapUid.MOD_SEQ, QueryBuilder.bindMarker(MessageIdToImapUid.MOD_SEQ)), Assignment.setColumn(Flag.ANSWERED, QueryBuilder.bindMarker(Flag.ANSWERED)), Assignment.setColumn(Flag.DELETED, QueryBuilder.bindMarker(Flag.DELETED)), Assignment.setColumn(Flag.DRAFT, QueryBuilder.bindMarker(Flag.DRAFT)), Assignment.setColumn(Flag.FLAGGED, QueryBuilder.bindMarker(Flag.FLAGGED)), Assignment.setColumn(Flag.RECENT, QueryBuilder.bindMarker(Flag.RECENT)), Assignment.setColumn(Flag.SEEN, QueryBuilder.bindMarker(Flag.SEEN)), Assignment.setColumn(Flag.USER, QueryBuilder.bindMarker(Flag.USER)), Assignment.setColumn(CassandraMessageV3Table.INTERNAL_DATE, QueryBuilder.bindMarker(CassandraMessageV3Table.INTERNAL_DATE)), Assignment.setColumn(CassandraMessageIdTable.SAVE_DATE, QueryBuilder.bindMarker(CassandraMessageIdTable.SAVE_DATE)), Assignment.setColumn(CassandraMessageV3Table.BODY_START_OCTET, QueryBuilder.bindMarker(CassandraMessageV3Table.BODY_START_OCTET)), Assignment.setColumn(CassandraMessageV3Table.FULL_CONTENT_OCTETS, QueryBuilder.bindMarker(CassandraMessageV3Table.FULL_CONTENT_OCTETS)), Assignment.setColumn(CassandraMessageV3Table.HEADER_CONTENT, QueryBuilder.bindMarker(CassandraMessageV3Table.HEADER_CONTENT))}).append(Flag.USER_FLAGS, QueryBuilder.bindMarker(Flag.USER_FLAGS)).where(new Relation[]{(Relation) Relation.column(CassandraMessageIds.MESSAGE_ID).isEqualTo(QueryBuilder.bindMarker(CassandraMessageIds.MESSAGE_ID)), (Relation) Relation.column(CassandraMessageIds.MAILBOX_ID).isEqualTo(QueryBuilder.bindMarker(CassandraMessageIds.MAILBOX_ID)), (Relation) Relation.column(CassandraMessageIds.IMAP_UID).isEqualTo(QueryBuilder.bindMarker(CassandraMessageIds.IMAP_UID))}).build());
    }

    private PreparedStatement prepareInsertForced() {
        return this.session.prepare(QueryBuilder.insertInto(MessageIdToImapUid.TABLE_NAME).value(CassandraMessageIds.MESSAGE_ID, QueryBuilder.bindMarker(CassandraMessageIds.MESSAGE_ID)).value(CassandraMessageIds.MAILBOX_ID, QueryBuilder.bindMarker(CassandraMessageIds.MAILBOX_ID)).value(CassandraMessageIds.IMAP_UID, QueryBuilder.bindMarker(CassandraMessageIds.IMAP_UID)).value(MessageIdToImapUid.MOD_SEQ, QueryBuilder.bindMarker(MessageIdToImapUid.MOD_SEQ)).value(Flag.ANSWERED, QueryBuilder.bindMarker(Flag.ANSWERED)).value(Flag.DELETED, QueryBuilder.bindMarker(Flag.DELETED)).value(Flag.DRAFT, QueryBuilder.bindMarker(Flag.DRAFT)).value(Flag.FLAGGED, QueryBuilder.bindMarker(Flag.FLAGGED)).value(Flag.RECENT, QueryBuilder.bindMarker(Flag.RECENT)).value(Flag.SEEN, QueryBuilder.bindMarker(Flag.SEEN)).value(Flag.USER, QueryBuilder.bindMarker(Flag.USER)).value(Flag.USER_FLAGS, QueryBuilder.bindMarker(Flag.USER_FLAGS)).value(CassandraMessageV3Table.INTERNAL_DATE, QueryBuilder.bindMarker(CassandraMessageV3Table.INTERNAL_DATE)).value(CassandraMessageIdTable.SAVE_DATE, QueryBuilder.bindMarker(CassandraMessageIdTable.SAVE_DATE)).value(CassandraMessageV3Table.BODY_START_OCTET, QueryBuilder.bindMarker(CassandraMessageV3Table.BODY_START_OCTET)).value(CassandraMessageV3Table.FULL_CONTENT_OCTETS, QueryBuilder.bindMarker(CassandraMessageV3Table.FULL_CONTENT_OCTETS)).value(CassandraMessageV3Table.HEADER_CONTENT, QueryBuilder.bindMarker(CassandraMessageV3Table.HEADER_CONTENT)).build());
    }

    private PreparedStatement prepareUpdate() {
        Update where = QueryBuilder.update(MessageIdToImapUid.TABLE_NAME).set(new Assignment[]{Assignment.setColumn(MessageIdToImapUid.MOD_SEQ, QueryBuilder.bindMarker(MessageIdToImapUid.MOD_SEQ)), Assignment.setColumn(Flag.ANSWERED, QueryBuilder.bindMarker(Flag.ANSWERED)), Assignment.setColumn(Flag.DELETED, QueryBuilder.bindMarker(Flag.DELETED)), Assignment.setColumn(Flag.DRAFT, QueryBuilder.bindMarker(Flag.DRAFT)), Assignment.setColumn(Flag.FLAGGED, QueryBuilder.bindMarker(Flag.FLAGGED)), Assignment.setColumn(Flag.RECENT, QueryBuilder.bindMarker(Flag.RECENT)), Assignment.setColumn(Flag.SEEN, QueryBuilder.bindMarker(Flag.SEEN)), Assignment.setColumn(Flag.USER, QueryBuilder.bindMarker(Flag.USER))}).append(Flag.USER_FLAGS, QueryBuilder.bindMarker(ADDED_USERS_FLAGS)).remove(Flag.USER_FLAGS, QueryBuilder.bindMarker(REMOVED_USERS_FLAGS)).where(new Relation[]{(Relation) Relation.column(CassandraMessageIds.MESSAGE_ID).isEqualTo(QueryBuilder.bindMarker(CassandraMessageIds.MESSAGE_ID)), (Relation) Relation.column(CassandraMessageIds.MAILBOX_ID).isEqualTo(QueryBuilder.bindMarker(CassandraMessageIds.MAILBOX_ID)), (Relation) Relation.column(CassandraMessageIds.IMAP_UID).isEqualTo(QueryBuilder.bindMarker(CassandraMessageIds.IMAP_UID))});
        return this.cassandraConfiguration.isMessageWriteStrongConsistency() ? this.session.prepare(((Update) where.ifColumn(MessageIdToImapUid.MOD_SEQ).isEqualTo(QueryBuilder.bindMarker("modSeqCondition"))).build()) : this.session.prepare(where.build());
    }

    private PreparedStatement prepareSelectAll() {
        return this.session.prepare(QueryBuilder.selectFrom(MessageIdToImapUid.TABLE_NAME).all().where((Relation) Relation.column(CassandraMessageIds.MESSAGE_ID).isEqualTo(QueryBuilder.bindMarker(CassandraMessageIds.MESSAGE_ID))).build());
    }

    private PreparedStatement prepareList() {
        return this.session.prepare(QueryBuilder.selectFrom(MessageIdToImapUid.TABLE_NAME).all().build());
    }

    private PreparedStatement prepareSelect() {
        return this.session.prepare(QueryBuilder.selectFrom(MessageIdToImapUid.TABLE_NAME).all().where(new Relation[]{(Relation) Relation.column(CassandraMessageIds.MESSAGE_ID).isEqualTo(QueryBuilder.bindMarker(CassandraMessageIds.MESSAGE_ID)), (Relation) Relation.column(CassandraMessageIds.MAILBOX_ID).isEqualTo(QueryBuilder.bindMarker(CassandraMessageIds.MAILBOX_ID))}).build());
    }

    public Mono<Void> delete(CassandraMessageId cassandraMessageId, CassandraId cassandraId) {
        return this.cassandraAsyncExecutor.executeVoid(this.delete.bind(new Object[0]).setUuid(CassandraMessageIds.MESSAGE_ID, cassandraMessageId.get()).setUuid(CassandraMessageIds.MAILBOX_ID, cassandraId.asUuid()));
    }

    public Mono<Void> insert(CassandraMessageMetadata cassandraMessageMetadata) {
        ComposedMessageId composedMessageId = cassandraMessageMetadata.getComposedMessageId().getComposedMessageId();
        Flags flags = cassandraMessageMetadata.getComposedMessageId().getFlags();
        ThreadId threadId = cassandraMessageMetadata.getComposedMessageId().getThreadId();
        BoundStatementBuilder boundStatementBuilder = this.insert.boundStatementBuilder(new Object[0]);
        if (cassandraMessageMetadata.getComposedMessageId().getFlags().getUserFlags().length == 0) {
            boundStatementBuilder.unset(Flag.USER_FLAGS);
        } else {
            boundStatementBuilder.setSet(Flag.USER_FLAGS, ImmutableSet.copyOf(flags.getUserFlags()), String.class);
        }
        return this.cassandraAsyncExecutor.executeVoid(boundStatementBuilder.set(CassandraMessageIds.MESSAGE_ID, ((CassandraMessageId) composedMessageId.getMessageId()).get(), TypeCodecs.TIMEUUID).set(CassandraMessageIds.MAILBOX_ID, ((CassandraId) composedMessageId.getMailboxId()).asUuid(), TypeCodecs.TIMEUUID).setLong(CassandraMessageIds.IMAP_UID, composedMessageId.getUid().asLong()).setLong(MessageIdToImapUid.MOD_SEQ, cassandraMessageMetadata.getComposedMessageId().getModSeq().asLong()).setUuid(MessageIdToImapUid.THREAD_ID, ((CassandraMessageId) threadId.getBaseMessageId()).get()).setBoolean(Flag.ANSWERED, flags.contains(Flags.Flag.ANSWERED)).setBoolean(Flag.DELETED, flags.contains(Flags.Flag.DELETED)).setBoolean(Flag.DRAFT, flags.contains(Flags.Flag.DRAFT)).setBoolean(Flag.FLAGGED, flags.contains(Flags.Flag.FLAGGED)).setBoolean(Flag.RECENT, flags.contains(Flags.Flag.RECENT)).setBoolean(Flag.SEEN, flags.contains(Flags.Flag.SEEN)).setBoolean(Flag.USER, flags.contains(Flags.Flag.USER)).setInstant(CassandraMessageV3Table.INTERNAL_DATE, cassandraMessageMetadata.getInternalDate().get().toInstant()).setInstant(CassandraMessageIdTable.SAVE_DATE, (Instant) cassandraMessageMetadata.getSaveDate().map((v0) -> {
            return v0.toInstant();
        }).orElse(null)).setInt(CassandraMessageV3Table.BODY_START_OCTET, Math.toIntExact(cassandraMessageMetadata.getBodyStartOctet().get().longValue())).setLong(CassandraMessageV3Table.FULL_CONTENT_OCTETS, cassandraMessageMetadata.getSize().get().longValue()).setString(CassandraMessageV3Table.HEADER_CONTENT, cassandraMessageMetadata.getHeaderContent().get().asString()).build());
    }

    public Mono<Void> insertForce(CassandraMessageMetadata cassandraMessageMetadata) {
        ComposedMessageId composedMessageId = cassandraMessageMetadata.getComposedMessageId().getComposedMessageId();
        Flags flags = cassandraMessageMetadata.getComposedMessageId().getFlags();
        return this.cassandraAsyncExecutor.executeVoid(this.insertForced.bind(new Object[0]).set(CassandraMessageIds.MESSAGE_ID, ((CassandraMessageId) composedMessageId.getMessageId()).get(), TypeCodecs.TIMEUUID).set(CassandraMessageIds.MAILBOX_ID, ((CassandraId) composedMessageId.getMailboxId()).asUuid(), TypeCodecs.TIMEUUID).setLong(CassandraMessageIds.IMAP_UID, composedMessageId.getUid().asLong()).setLong(MessageIdToImapUid.MOD_SEQ, cassandraMessageMetadata.getComposedMessageId().getModSeq().asLong()).setBoolean(Flag.ANSWERED, flags.contains(Flags.Flag.ANSWERED)).setBoolean(Flag.DELETED, flags.contains(Flags.Flag.DELETED)).setBoolean(Flag.DRAFT, flags.contains(Flags.Flag.DRAFT)).setBoolean(Flag.FLAGGED, flags.contains(Flags.Flag.FLAGGED)).setBoolean(Flag.RECENT, flags.contains(Flags.Flag.RECENT)).setBoolean(Flag.SEEN, flags.contains(Flags.Flag.SEEN)).setBoolean(Flag.USER, flags.contains(Flags.Flag.USER)).setSet(Flag.USER_FLAGS, ImmutableSet.copyOf(flags.getUserFlags()), String.class).setInstant(CassandraMessageV3Table.INTERNAL_DATE, cassandraMessageMetadata.getInternalDate().get().toInstant()).setInstant(CassandraMessageIdTable.SAVE_DATE, (Instant) cassandraMessageMetadata.getSaveDate().map((v0) -> {
            return v0.toInstant();
        }).orElse(null)).setInt(CassandraMessageV3Table.BODY_START_OCTET, Math.toIntExact(cassandraMessageMetadata.getBodyStartOctet().get().longValue())).setLong(CassandraMessageV3Table.FULL_CONTENT_OCTETS, cassandraMessageMetadata.getSize().get().longValue()).setString(CassandraMessageV3Table.HEADER_CONTENT, cassandraMessageMetadata.getHeaderContent().get().asString()));
    }

    public Mono<Boolean> updateMetadata(ComposedMessageId composedMessageId, UpdatedFlags updatedFlags, ModSeq modSeq) {
        return this.cassandraConfiguration.isMessageWriteStrongConsistency() ? this.cassandraAsyncExecutor.executeReturnApplied(updateBoundStatement(composedMessageId, updatedFlags, modSeq)) : this.cassandraAsyncExecutor.executeVoid(updateBoundStatement(composedMessageId, updatedFlags, modSeq)).thenReturn(true);
    }

    private BoundStatement updateBoundStatement(ComposedMessageId composedMessageId, UpdatedFlags updatedFlags, ModSeq modSeq) {
        BoundStatementBuilder boundStatementBuilder = this.update.boundStatementBuilder(new Object[0]).setLong(MessageIdToImapUid.MOD_SEQ, updatedFlags.getModSeq().asLong()).setUuid(CassandraMessageIds.MESSAGE_ID, ((CassandraMessageId) composedMessageId.getMessageId()).get()).setUuid(CassandraMessageIds.MAILBOX_ID, ((CassandraId) composedMessageId.getMailboxId()).asUuid()).setLong(CassandraMessageIds.IMAP_UID, composedMessageId.getUid().asLong());
        if (updatedFlags.isChanged(Flags.Flag.ANSWERED)) {
            boundStatementBuilder.setBoolean(Flag.ANSWERED, updatedFlags.isModifiedToSet(Flags.Flag.ANSWERED));
        } else {
            boundStatementBuilder.unset(Flag.ANSWERED);
        }
        if (updatedFlags.isChanged(Flags.Flag.DRAFT)) {
            boundStatementBuilder.setBoolean(Flag.DRAFT, updatedFlags.isModifiedToSet(Flags.Flag.DRAFT));
        } else {
            boundStatementBuilder.unset(Flag.DRAFT);
        }
        if (updatedFlags.isChanged(Flags.Flag.FLAGGED)) {
            boundStatementBuilder.setBoolean(Flag.FLAGGED, updatedFlags.isModifiedToSet(Flags.Flag.FLAGGED));
        } else {
            boundStatementBuilder.unset(Flag.FLAGGED);
        }
        if (updatedFlags.isChanged(Flags.Flag.DELETED)) {
            boundStatementBuilder.setBoolean(Flag.DELETED, updatedFlags.isModifiedToSet(Flags.Flag.DELETED));
        } else {
            boundStatementBuilder.unset(Flag.DELETED);
        }
        if (updatedFlags.isChanged(Flags.Flag.RECENT)) {
            boundStatementBuilder.setBoolean(Flag.RECENT, updatedFlags.getNewFlags().contains(Flags.Flag.RECENT));
        } else {
            boundStatementBuilder.unset(Flag.RECENT);
        }
        if (updatedFlags.isChanged(Flags.Flag.SEEN)) {
            boundStatementBuilder.setBoolean(Flag.SEEN, updatedFlags.isModifiedToSet(Flags.Flag.SEEN));
        } else {
            boundStatementBuilder.unset(Flag.SEEN);
        }
        if (updatedFlags.isChanged(Flags.Flag.USER)) {
            boundStatementBuilder.setBoolean(Flag.USER, updatedFlags.isModifiedToSet(Flags.Flag.USER));
        } else {
            boundStatementBuilder.unset(Flag.USER);
        }
        Sets.SetView difference = Sets.difference(ImmutableSet.copyOf(updatedFlags.getOldFlags().getUserFlags()), ImmutableSet.copyOf(updatedFlags.getNewFlags().getUserFlags()));
        Sets.SetView difference2 = Sets.difference(ImmutableSet.copyOf(updatedFlags.getNewFlags().getUserFlags()), ImmutableSet.copyOf(updatedFlags.getOldFlags().getUserFlags()));
        if (difference2.isEmpty()) {
            boundStatementBuilder.unset(ADDED_USERS_FLAGS);
        } else {
            boundStatementBuilder.setSet(ADDED_USERS_FLAGS, difference2, String.class);
        }
        if (difference.isEmpty()) {
            boundStatementBuilder.unset(REMOVED_USERS_FLAGS);
        } else {
            boundStatementBuilder.setSet(REMOVED_USERS_FLAGS, difference, String.class);
        }
        if (this.cassandraConfiguration.isMessageWriteStrongConsistency()) {
            boundStatementBuilder.setLong("modSeqCondition", modSeq.asLong());
        }
        return boundStatementBuilder.build();
    }

    public Flux<CassandraMessageMetadata> retrieve(CassandraMessageId cassandraMessageId, Optional<CassandraId> optional, JamesExecutionProfiles.ConsistencyChoice consistencyChoice) {
        return this.cassandraAsyncExecutor.executeRows(setExecutionProfileIfNeeded(selectStatement(cassandraMessageId, optional), consistencyChoice)).map(this::toComposedMessageIdWithMetadata);
    }

    @VisibleForTesting
    public Flux<CassandraMessageMetadata> retrieve(CassandraMessageId cassandraMessageId, Optional<CassandraId> optional) {
        return retrieve(cassandraMessageId, optional, JamesExecutionProfiles.ConsistencyChoice.STRONG);
    }

    public Flux<CassandraMessageMetadata> retrieveAllMessages() {
        return this.cassandraAsyncExecutor.executeRows(this.listStatement.bind(new Object[0]).setTimeout(Duration.ofDays(1L))).map(this::toComposedMessageIdWithMetadata);
    }

    private CassandraMessageMetadata toComposedMessageIdWithMetadata(Row row) {
        CassandraMessageId of = CassandraMessageId.Factory.of(row.getUuid(CassandraMessageIds.MESSAGE_ID));
        CassandraMessageMetadata.Builder size = CassandraMessageMetadata.builder().ids(ComposedMessageIdWithMetaData.builder().composedMessageId(new ComposedMessageId(CassandraId.of(row.getUuid(CassandraMessageIds.MAILBOX_ID)), of, MessageUid.of(row.getLong(CassandraMessageIds.IMAP_UID)))).flags(FlagsExtractor.getFlags(row)).threadId(getThreadIdFromRow(row, of)).modSeq(ModSeq.of(row.getLong(MessageIdToImapUid.MOD_SEQ))).build()).bodyStartOctet((Integer) row.get(CassandraMessageV3Table.BODY_START_OCTET, Integer.class)).internalDate(Optional.ofNullable((Instant) row.get(CassandraMessageV3Table.INTERNAL_DATE, TypeCodecs.TIMESTAMP)).map(Date::from)).saveDate(Optional.ofNullable((Instant) row.get(CassandraMessageIdTable.SAVE_DATE, TypeCodecs.TIMESTAMP)).map(Date::from)).size((Long) row.get(CassandraMessageV3Table.FULL_CONTENT_OCTETS, Long.class));
        Optional ofNullable = Optional.ofNullable(row.getString(CassandraMessageV3Table.HEADER_CONTENT));
        BlobId.Factory factory = this.blobIdFactory;
        Objects.requireNonNull(factory);
        return size.headerContent(ofNullable.map(factory::from)).build();
    }

    private ThreadId getThreadIdFromRow(Row row, MessageId messageId) {
        UUID uuid = (UUID) row.get(MessageIdToImapUid.THREAD_ID, TypeCodecs.TIMEUUID);
        return uuid == null ? ThreadId.fromBaseMessageId(messageId) : ThreadId.fromBaseMessageId(CassandraMessageId.Factory.of(uuid));
    }

    private BoundStatement selectStatement(CassandraMessageId cassandraMessageId, Optional<CassandraId> optional) {
        return (BoundStatement) optional.map(cassandraId -> {
            return this.select.bind(new Object[0]).setUuid(CassandraMessageIds.MESSAGE_ID, cassandraMessageId.get()).setUuid(CassandraMessageIds.MAILBOX_ID, cassandraId.asUuid());
        }).orElseGet(() -> {
            return this.selectAll.bind(new Object[0]).setUuid(CassandraMessageIds.MESSAGE_ID, cassandraMessageId.get());
        });
    }

    private BoundStatement setExecutionProfileIfNeeded(BoundStatement boundStatement, JamesExecutionProfiles.ConsistencyChoice consistencyChoice) {
        return consistencyChoice.equals(JamesExecutionProfiles.ConsistencyChoice.STRONG) ? boundStatement.setExecutionProfile(this.lwtProfile) : boundStatement;
    }

    @VisibleForTesting
    Mono<Void> insertNullInternalDateAndHeaderContent(CassandraMessageMetadata cassandraMessageMetadata) {
        ComposedMessageId composedMessageId = cassandraMessageMetadata.getComposedMessageId().getComposedMessageId();
        Flags flags = cassandraMessageMetadata.getComposedMessageId().getFlags();
        ThreadId threadId = cassandraMessageMetadata.getComposedMessageId().getThreadId();
        BoundStatementBuilder boundStatementBuilder = this.insert.boundStatementBuilder(new Object[0]);
        if (cassandraMessageMetadata.getComposedMessageId().getFlags().getUserFlags().length == 0) {
            boundStatementBuilder.unset(Flag.USER_FLAGS);
        } else {
            boundStatementBuilder.setSet(Flag.USER_FLAGS, ImmutableSet.copyOf(flags.getUserFlags()), String.class);
        }
        return this.cassandraAsyncExecutor.executeVoid(boundStatementBuilder.set(CassandraMessageIds.MESSAGE_ID, ((CassandraMessageId) composedMessageId.getMessageId()).get(), TypeCodecs.TIMEUUID).set(CassandraMessageIds.MAILBOX_ID, ((CassandraId) composedMessageId.getMailboxId()).asUuid(), TypeCodecs.TIMEUUID).setLong(CassandraMessageIds.IMAP_UID, composedMessageId.getUid().asLong()).setLong(MessageIdToImapUid.MOD_SEQ, cassandraMessageMetadata.getComposedMessageId().getModSeq().asLong()).setUuid(MessageIdToImapUid.THREAD_ID, ((CassandraMessageId) threadId.getBaseMessageId()).get()).setBoolean(Flag.ANSWERED, flags.contains(Flags.Flag.ANSWERED)).setBoolean(Flag.DELETED, flags.contains(Flags.Flag.DELETED)).setBoolean(Flag.DRAFT, flags.contains(Flags.Flag.DRAFT)).setBoolean(Flag.FLAGGED, flags.contains(Flags.Flag.FLAGGED)).setBoolean(Flag.RECENT, flags.contains(Flags.Flag.RECENT)).setBoolean(Flag.SEEN, flags.contains(Flags.Flag.SEEN)).setBoolean(Flag.USER, flags.contains(Flags.Flag.USER)).setInstant(CassandraMessageV3Table.INTERNAL_DATE, (Instant) null).setInt(CassandraMessageV3Table.BODY_START_OCTET, 0).setLong(CassandraMessageV3Table.FULL_CONTENT_OCTETS, 0L).setString(CassandraMessageV3Table.HEADER_CONTENT, (String) null).build());
    }
}
