package org.apache.james.pop3server.mailbox.task;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.time.Duration;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV3;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.pop3server.mailbox.Pop3MetadataStore;
import org.apache.james.task.Task;
import org.apache.james.util.ReactorUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesService.class */
public class MetaDataFixInconsistenciesService {
    static final Inconsistency NO_INCONSISTENCY = (context, cassandraMessageIdToImapUidDAO, pop3MetadataStore) -> {
        return Mono.just(Task.Result.COMPLETED);
    };
    private static final Logger LOGGER = LoggerFactory.getLogger(MetaDataFixInconsistenciesService.class);
    private static final Duration PERIOD = Duration.ofSeconds(1);
    private final CassandraMessageIdToImapUidDAO imapUidDAO;
    private final Pop3MetadataStore pop3MetadataStore;
    private final CassandraMessageDAOV3 cassandraMessageDAOV3;

    /* loaded from: input_file:org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesService$Context.class */
    public static class Context {
        private final AtomicLong processedImapUidEntries;
        private final AtomicLong processedPop3MetaDataStoreEntries;
        private final AtomicLong stalePOP3Entries;
        private final AtomicLong missingPOP3Entries;
        private final ConcurrentLinkedDeque<MessageInconsistenciesEntry> fixedInconsistencies;
        private final ConcurrentLinkedDeque<MessageInconsistenciesEntry> errors;

        /* loaded from: input_file:org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesService$Context$Snapshot.class */
        static class Snapshot {
            private final long processedImapUidEntries;
            private final long processedPop3MetaDataStoreEntries;
            private final long stalePOP3Entries;
            private final long missingPOP3Entries;
            private final ImmutableList<MessageInconsistenciesEntry> fixedInconsistencies;
            private final ImmutableList<MessageInconsistenciesEntry> errors;

            /* loaded from: input_file:org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesService$Context$Snapshot$Builder.class */
            static class Builder {
                private Optional<Long> processedImapUidEntries = Optional.empty();
                private Optional<Long> processedPop3MetaDataStoreEntries = Optional.empty();
                private Optional<Long> stalePOP3Entries = Optional.empty();
                private Optional<Long> missingPOP3Entries = Optional.empty();
                private ImmutableList.Builder<MessageInconsistenciesEntry> fixedInconsistencies = ImmutableList.builder();
                private ImmutableList.Builder<MessageInconsistenciesEntry> errors = ImmutableList.builder();

                Builder() {
                }

                public Builder processedImapUidEntries(long j) {
                    this.processedImapUidEntries = Optional.of(Long.valueOf(j));
                    return this;
                }

                public Builder processedPop3MetaDataStoreEntries(long j) {
                    this.processedPop3MetaDataStoreEntries = Optional.of(Long.valueOf(j));
                    return this;
                }

                public Builder stalePOP3Entries(long j) {
                    this.stalePOP3Entries = Optional.of(Long.valueOf(j));
                    return this;
                }

                public Builder missingPOP3Entries(long j) {
                    this.missingPOP3Entries = Optional.of(Long.valueOf(j));
                    return this;
                }

                public Builder addFixedInconsistencies(MessageInconsistenciesEntry messageInconsistenciesEntry) {
                    this.fixedInconsistencies.add(messageInconsistenciesEntry);
                    return this;
                }

                public Builder errors(MessageInconsistenciesEntry messageInconsistenciesEntry) {
                    this.errors.add(messageInconsistenciesEntry);
                    return this;
                }

                public Snapshot build() {
                    return new Snapshot(this.processedImapUidEntries.orElse(0L).longValue(), this.processedPop3MetaDataStoreEntries.orElse(0L).longValue(), this.stalePOP3Entries.orElse(0L).longValue(), this.missingPOP3Entries.orElse(0L).longValue(), this.fixedInconsistencies.build(), this.errors.build());
                }
            }

            public static Builder builder() {
                return new Builder();
            }

            public Snapshot(long j, long j2, long j3, long j4, ImmutableList<MessageInconsistenciesEntry> immutableList, ImmutableList<MessageInconsistenciesEntry> immutableList2) {
                this.processedImapUidEntries = j;
                this.processedPop3MetaDataStoreEntries = j2;
                this.stalePOP3Entries = j3;
                this.missingPOP3Entries = j4;
                this.fixedInconsistencies = immutableList;
                this.errors = immutableList2;
            }

            public final int hashCode() {
                return Objects.hash(Long.valueOf(this.processedPop3MetaDataStoreEntries), Long.valueOf(this.processedImapUidEntries), this.errors, this.fixedInconsistencies);
            }

            public final boolean equals(Object obj) {
                if (!(obj instanceof Snapshot)) {
                    return false;
                }
                Snapshot snapshot = (Snapshot) obj;
                return Objects.equals(Long.valueOf(this.processedPop3MetaDataStoreEntries), Long.valueOf(snapshot.processedPop3MetaDataStoreEntries)) && Objects.equals(Long.valueOf(this.processedImapUidEntries), Long.valueOf(snapshot.processedImapUidEntries)) && Objects.equals(this.fixedInconsistencies, snapshot.fixedInconsistencies) && Objects.equals(this.errors, snapshot.errors);
            }

            public String toString() {
                return MoreObjects.toStringHelper(this).add("processedPop3MetaDataStoreEntries", this.processedPop3MetaDataStoreEntries).add("processedImapUidEntries", this.processedImapUidEntries).add("stalePOP3Entries", this.stalePOP3Entries).add("missingPOP3Entries", this.missingPOP3Entries).add("fixedInconsistencies", this.fixedInconsistencies).add("errors", this.errors).toString();
            }

            public long getProcessedImapUidEntries() {
                return this.processedImapUidEntries;
            }

            public long getProcessedPop3MetaDataStoreEntries() {
                return this.processedPop3MetaDataStoreEntries;
            }

            public long getStalePOP3Entries() {
                return this.stalePOP3Entries;
            }

            public long getMissingPOP3Entries() {
                return this.missingPOP3Entries;
            }

            public ImmutableList<MessageInconsistenciesEntry> getFixedInconsistencies() {
                return this.fixedInconsistencies;
            }

            public ImmutableList<MessageInconsistenciesEntry> getErrors() {
                return this.errors;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Context() {
            this(new AtomicLong(), new AtomicLong(), new AtomicLong(), new AtomicLong(), ImmutableList.of(), ImmutableList.of());
        }

        private Context(AtomicLong atomicLong, AtomicLong atomicLong2, AtomicLong atomicLong3, AtomicLong atomicLong4, Collection<MessageInconsistenciesEntry> collection, Collection<MessageInconsistenciesEntry> collection2) {
            this.processedImapUidEntries = atomicLong;
            this.processedPop3MetaDataStoreEntries = atomicLong2;
            this.stalePOP3Entries = atomicLong3;
            this.missingPOP3Entries = atomicLong4;
            this.fixedInconsistencies = new ConcurrentLinkedDeque<>(collection);
            this.errors = new ConcurrentLinkedDeque<>(collection2);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void incrementProcessedImapUidEntries() {
            this.processedImapUidEntries.incrementAndGet();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void incrementProcessedPop3MetaDataStoreEntries() {
            this.processedPop3MetaDataStoreEntries.incrementAndGet();
        }

        void incrementStalePOP3Entries() {
            this.stalePOP3Entries.getAndIncrement();
        }

        void incrementMissingPOP3Entries() {
            this.missingPOP3Entries.incrementAndGet();
        }

        void addFixedInconsistency(MessageInconsistenciesEntry messageInconsistenciesEntry) {
            this.fixedInconsistencies.add(messageInconsistenciesEntry);
        }

        void addErrors(MessageInconsistenciesEntry messageInconsistenciesEntry) {
            this.errors.add(messageInconsistenciesEntry);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Snapshot snapshot() {
            return new Snapshot(this.processedImapUidEntries.get(), this.processedPop3MetaDataStoreEntries.get(), this.stalePOP3Entries.get(), this.missingPOP3Entries.get(), ImmutableList.copyOf(this.fixedInconsistencies), ImmutableList.copyOf(this.errors));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesService$FailToDetectInconsistency.class */
    public static class FailToDetectInconsistency implements Inconsistency {
        private final MailboxId mailboxId;
        private final MessageId messageId;

        private FailToDetectInconsistency(MailboxId mailboxId, MessageId messageId) {
            this.mailboxId = mailboxId;
            this.messageId = messageId;
        }

        @Override // org.apache.james.pop3server.mailbox.task.MetaDataFixInconsistenciesService.Inconsistency
        public Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO cassandraMessageIdToImapUidDAO, Pop3MetadataStore pop3MetadataStore) {
            context.addErrors(MessageInconsistenciesEntry.builder().mailboxId(this.mailboxId.serialize()).messageId(this.messageId.serialize()));
            MetaDataFixInconsistenciesService.LOGGER.error("Failed to detect inconsistency: {}", this.messageId);
            return Mono.just(Task.Result.PARTIAL);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesService$Inconsistency.class */
    public interface Inconsistency {
        Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO cassandraMessageIdToImapUidDAO, Pop3MetadataStore pop3MetadataStore);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesService$MissingPOP3EntryInconsistency.class */
    public static class MissingPOP3EntryInconsistency implements Inconsistency {
        private final MailboxId mailboxId;
        private final CassandraMessageId messageId;
        private final CassandraMessageDAOV3 cassandraMessageDAOV3;

        private MissingPOP3EntryInconsistency(MailboxId mailboxId, CassandraMessageId cassandraMessageId, CassandraMessageDAOV3 cassandraMessageDAOV3) {
            this.mailboxId = mailboxId;
            this.messageId = cassandraMessageId;
            this.cassandraMessageDAOV3 = cassandraMessageDAOV3;
        }

        @Override // org.apache.james.pop3server.mailbox.task.MetaDataFixInconsistenciesService.Inconsistency
        public Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO cassandraMessageIdToImapUidDAO, Pop3MetadataStore pop3MetadataStore) {
            return buildStatMetadata().flatMap(statMetadata -> {
                return Mono.from(pop3MetadataStore.add(this.mailboxId, statMetadata));
            }).doOnSuccess(r5 -> {
                notifySuccess(context);
            }).thenReturn(Task.Result.COMPLETED).onErrorResume(th -> {
                notifyFailure(context);
                return Mono.just(Task.Result.PARTIAL);
            });
        }

        private Mono<Pop3MetadataStore.StatMetadata> buildStatMetadata() {
            return this.cassandraMessageDAOV3.retrieveMessage(this.messageId, MessageMapper.FetchType.METADATA).switchIfEmpty(Mono.error(new MailboxException("Message not found: " + this.messageId))).map(messageRepresentation -> {
                return new Pop3MetadataStore.StatMetadata(this.messageId, messageRepresentation.getSize().longValue());
            });
        }

        private void notifyFailure(Context context) {
            context.addErrors(MessageInconsistenciesEntry.builder().mailboxId(this.mailboxId.serialize()).messageId(this.messageId.serialize()));
            MetaDataFixInconsistenciesService.LOGGER.error("Failed to fix inconsistency for missing POP3 entry: {}", this.messageId);
        }

        private void notifySuccess(Context context) {
            context.incrementMissingPOP3Entries();
            context.addFixedInconsistency(MessageInconsistenciesEntry.builder().mailboxId(this.mailboxId.serialize()).messageId(this.messageId.serialize()));
            MetaDataFixInconsistenciesService.LOGGER.info("Inconsistency fixed for missing POP3 entry: {}", this.messageId);
        }
    }

    /* loaded from: input_file:org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesService$RunningOptions.class */
    public static class RunningOptions {
        public static final RunningOptions DEFAULT = new RunningOptions(100);
        private final int messagesPerSecond;

        public static RunningOptions withMessageRatePerSecond(int i) {
            return new RunningOptions(i);
        }

        @JsonCreator
        public RunningOptions(@JsonProperty("messagesPerSecond") int i) {
            Preconditions.checkArgument(i > 0, "'messagesPerSecond' must be strictly positive");
            this.messagesPerSecond = i;
        }

        public int getMessagesPerSecond() {
            return this.messagesPerSecond;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesService$StalePOP3EntryConsistency.class */
    public static class StalePOP3EntryConsistency implements Inconsistency {
        private final MailboxId mailboxId;
        private final MessageId messageId;

        private StalePOP3EntryConsistency(MailboxId mailboxId, MessageId messageId) {
            this.mailboxId = mailboxId;
            this.messageId = messageId;
        }

        @Override // org.apache.james.pop3server.mailbox.task.MetaDataFixInconsistenciesService.Inconsistency
        public Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO cassandraMessageIdToImapUidDAO, Pop3MetadataStore pop3MetadataStore) {
            return Mono.from(pop3MetadataStore.remove(this.mailboxId, this.messageId)).doOnSuccess(r5 -> {
                notifySuccess(context);
            }).thenReturn(Task.Result.COMPLETED).onErrorResume(th -> {
                notifyFailure(context);
                return Mono.just(Task.Result.PARTIAL);
            });
        }

        private void notifyFailure(Context context) {
            context.addErrors(MessageInconsistenciesEntry.builder().mailboxId(this.mailboxId.serialize()).messageId(this.messageId.serialize()));
            MetaDataFixInconsistenciesService.LOGGER.error("Failed to fix inconsistency for stale POP3 entry: {}", this.messageId);
        }

        private void notifySuccess(Context context) {
            context.incrementStalePOP3Entries();
            context.addFixedInconsistency(MessageInconsistenciesEntry.builder().mailboxId(this.mailboxId.serialize()).messageId(this.messageId.serialize()));
            MetaDataFixInconsistenciesService.LOGGER.info("Inconsistency fixed for stale POP3 entry: {}", this.messageId);
        }
    }

    @Inject
    public MetaDataFixInconsistenciesService(CassandraMessageIdToImapUidDAO cassandraMessageIdToImapUidDAO, Pop3MetadataStore pop3MetadataStore, CassandraMessageDAOV3 cassandraMessageDAOV3) {
        this.imapUidDAO = cassandraMessageIdToImapUidDAO;
        this.pop3MetadataStore = pop3MetadataStore;
        this.cassandraMessageDAOV3 = cassandraMessageDAOV3;
    }

    public Mono<Task.Result> fixInconsistencies(Context context, RunningOptions runningOptions) {
        return Flux.concat(new Publisher[]{fixInconsistenciesInPop3MetaDataStore(context, runningOptions), fixInconsistenciesInImapUid(context, runningOptions)}).reduce(Task.Result.COMPLETED, Task::combine);
    }

    private Flux<Task.Result> fixInconsistenciesInPop3MetaDataStore(Context context, RunningOptions runningOptions) {
        return Flux.from(this.pop3MetadataStore.listAllEntries()).transform(ReactorUtils.throttle().elements(runningOptions.getMessagesPerSecond()).per(PERIOD).forOperation(fullMetadata -> {
            return detectStaleEntriesInPop3MetaDataStore(fullMetadata).doOnNext(inconsistency -> {
                context.incrementProcessedPop3MetaDataStoreEntries();
            }).flatMap(inconsistency2 -> {
                return inconsistency2.fix(context, this.imapUidDAO, this.pop3MetadataStore);
            });
        }));
    }

    private Mono<Inconsistency> detectStaleEntriesInPop3MetaDataStore(Pop3MetadataStore.FullMetadata fullMetadata) {
        CassandraId mailboxId = fullMetadata.getMailboxId();
        CassandraMessageId messageId = fullMetadata.getMessageId();
        return this.imapUidDAO.retrieve(messageId, Optional.of(mailboxId)).next().flatMap(cassandraMessageMetadata -> {
            return Mono.just(NO_INCONSISTENCY);
        }).switchIfEmpty(Mono.just(new StalePOP3EntryConsistency(mailboxId, messageId))).onErrorResume(th -> {
            return Mono.just(new FailToDetectInconsistency(mailboxId, messageId));
        });
    }

    private Flux<Task.Result> fixInconsistenciesInImapUid(Context context, RunningOptions runningOptions) {
        return this.imapUidDAO.retrieveAllMessages().map((v0) -> {
            return v0.getComposedMessageId();
        }).transform(ReactorUtils.throttle().elements(runningOptions.getMessagesPerSecond()).per(PERIOD).forOperation(composedMessageIdWithMetaData -> {
            return detectMissingEntriesInPop3MetaDataStore(composedMessageIdWithMetaData).doOnNext(inconsistency -> {
                context.incrementProcessedImapUidEntries();
            }).flatMap(inconsistency2 -> {
                return inconsistency2.fix(context, this.imapUidDAO, this.pop3MetadataStore);
            });
        }));
    }

    private Mono<Inconsistency> detectMissingEntriesInPop3MetaDataStore(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
        MailboxId mailboxId = (CassandraId) composedMessageIdWithMetaData.getComposedMessageId().getMailboxId();
        MessageId messageId = (CassandraMessageId) composedMessageIdWithMetaData.getComposedMessageId().getMessageId();
        return Flux.from(this.pop3MetadataStore.retrieve(mailboxId, messageId)).next().flatMap(fullMetadata -> {
            return Mono.just(NO_INCONSISTENCY);
        }).switchIfEmpty(Mono.just(new MissingPOP3EntryInconsistency(mailboxId, messageId, this.cassandraMessageDAOV3))).onErrorResume(th -> {
            return Mono.just(new FailToDetectInconsistency(mailboxId, messageId));
        });
    }
}
