package org.apache.james.mailbox.cassandra.mail.migration;

import java.time.Clock;
import java.time.Instant;
import java.util.Optional;
import javax.inject.Inject;
import org.apache.james.backends.cassandra.migration.Migration;
import org.apache.james.mailbox.cassandra.mail.CassandraIdAndPath;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathDAOImpl;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathV2DAO;
import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.class */
public class MailboxPathV2Migration implements Migration {
    public static final Logger LOGGER = LoggerFactory.getLogger(MailboxPathV2Migration.class);
    public static final TaskType TYPE = TaskType.of("cassandra-mailbox-path-v2-migration");
    private final CassandraMailboxPathDAOImpl daoV1;
    private final CassandraMailboxPathV2DAO daoV2;
    private final long initialCount = getCurrentCount().longValue();

    /* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration$AdditionalInformation.class */
    public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
        private final long remainingCount;
        private final long initialCount;
        private final Instant timestamp;

        public AdditionalInformation(long j, long j2, Instant instant) {
            this.remainingCount = j;
            this.initialCount = j2;
            this.timestamp = instant;
        }

        public long getRemainingCount() {
            return this.remainingCount;
        }

        public long getInitialCount() {
            return this.initialCount;
        }

        public Instant timestamp() {
            return this.timestamp;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration$MailboxPathV2MigrationTask.class */
    public static class MailboxPathV2MigrationTask implements Task {
        private final MailboxPathV2Migration migration;

        /* JADX INFO: Access modifiers changed from: package-private */
        public MailboxPathV2MigrationTask(MailboxPathV2Migration mailboxPathV2Migration) {
            this.migration = mailboxPathV2Migration;
        }

        public Task.Result run() throws InterruptedException {
            return this.migration.runTask();
        }

        public TaskType type() {
            return MailboxPathV2Migration.TYPE;
        }

        public Optional<TaskExecutionDetails.AdditionalInformation> details() {
            return Optional.of(this.migration.getAdditionalInformation());
        }
    }

    @Inject
    public MailboxPathV2Migration(CassandraMailboxPathDAOImpl cassandraMailboxPathDAOImpl, CassandraMailboxPathV2DAO cassandraMailboxPathV2DAO) {
        this.daoV1 = cassandraMailboxPathDAOImpl;
        this.daoV2 = cassandraMailboxPathV2DAO;
    }

    public void apply() {
        this.daoV1.readAll().flatMap(this::migrate).doOnError(th -> {
            LOGGER.error("Error while performing migration", th);
        }).blockLast();
    }

    private Mono<Void> migrate(CassandraIdAndPath cassandraIdAndPath) {
        return this.daoV2.save(cassandraIdAndPath.getMailboxPath(), cassandraIdAndPath.getCassandraId()).then(this.daoV1.delete(cassandraIdAndPath.getMailboxPath())).onErrorResume(th -> {
            return handleErrorMigrate(cassandraIdAndPath, th);
        }).then();
    }

    private Mono<Void> handleErrorMigrate(CassandraIdAndPath cassandraIdAndPath, Throwable th) {
        LOGGER.error("Error while performing migration for path {}", cassandraIdAndPath.getMailboxPath(), th);
        return Mono.empty();
    }

    public Task asTask() {
        return new MailboxPathV2MigrationTask(this);
    }

    AdditionalInformation getAdditionalInformation() {
        return new AdditionalInformation(getCurrentCount().longValue(), this.initialCount, Clock.systemUTC().instant());
    }

    private Long getCurrentCount() {
        return (Long) this.daoV1.countAll().block();
    }
}
