package com.azure.data.cosmos.internal.changefeed.implementation;

import com.azure.data.cosmos.internal.changefeed.CancellationToken;
import com.azure.data.cosmos.internal.changefeed.Lease;
import com.azure.data.cosmos.internal.changefeed.LeaseManager;
import com.azure.data.cosmos.internal.changefeed.LeaseRenewer;
import com.azure.data.cosmos.internal.changefeed.exceptions.LeaseLostException;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.temporal.TemporalAmount;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/data/cosmos/internal/changefeed/implementation/LeaseRenewerImpl.class */
class LeaseRenewerImpl implements LeaseRenewer {
    private static final Logger logger = LoggerFactory.getLogger(LeaseRenewerImpl.class);
    private final LeaseManager leaseManager;
    private final Duration leaseRenewInterval;
    private Lease lease;
    private RuntimeException resultException;

    public LeaseRenewerImpl(Lease lease, LeaseManager leaseManager, Duration duration) {
        this.lease = lease;
        this.leaseManager = leaseManager;
        this.leaseRenewInterval = duration;
    }

    @Override // com.azure.data.cosmos.internal.changefeed.LeaseRenewer
    public Mono<Void> run(CancellationToken cancellationToken) {
        logger.info("Partition {}: renewer task started.", this.lease.getLeaseToken());
        return Mono.just(this).flatMap(leaseRenewerImpl -> {
            if (cancellationToken.isCancellationRequested()) {
                return Mono.empty();
            }
            ZonedDateTime plus = ZonedDateTime.now().plus((TemporalAmount) this.leaseRenewInterval);
            return Mono.just(leaseRenewerImpl).delayElement(Duration.ofMillis(100L)).repeat(() -> {
                return !cancellationToken.isCancellationRequested() && ZonedDateTime.now().isBefore(plus);
            }).last();
        }).flatMap(leaseRenewerImpl2 -> {
            return cancellationToken.isCancellationRequested() ? Mono.empty() : renew(cancellationToken);
        }).repeat(() -> {
            if (cancellationToken.isCancellationRequested()) {
                logger.info("Partition {}: renewer task stopped.", this.lease.getLeaseToken());
            }
            return !cancellationToken.isCancellationRequested();
        }).then().doOnError(th -> {
            if (th instanceof LeaseLostException) {
                logger.info("Partition {}: renew lease loop failed.", this.lease.getLeaseToken(), th);
            } else {
                logger.error("Partition {}: renew lease loop failed.", this.lease.getLeaseToken(), th);
            }
        });
    }

    @Override // com.azure.data.cosmos.internal.changefeed.LeaseRenewer
    public RuntimeException getResultException() {
        return this.resultException;
    }

    private Mono<Lease> renew(CancellationToken cancellationToken) {
        return cancellationToken.isCancellationRequested() ? Mono.empty() : this.leaseManager.renew(this.lease).map(lease -> {
            if (lease != null) {
                this.lease = lease;
            }
            logger.info("Partition {}: renewed lease with result {}", this.lease.getLeaseToken(), Boolean.valueOf(lease != null));
            return lease;
        }).onErrorResume(th -> {
            if (!(th instanceof LeaseLostException)) {
                logger.error("Partition {}: failed to renew lease.", this.lease.getLeaseToken(), th);
                return Mono.empty();
            }
            LeaseLostException leaseLostException = (LeaseLostException) th;
            this.resultException = leaseLostException;
            logger.error("Partition {}: lost lease on renew.", this.lease.getLeaseToken(), leaseLostException);
            return Mono.error(leaseLostException);
        });
    }
}
