package com.azure.data.cosmos.internal.changefeed.implementation;

import com.azure.data.cosmos.AccessCondition;
import com.azure.data.cosmos.AccessConditionType;
import com.azure.data.cosmos.CosmosClientException;
import com.azure.data.cosmos.CosmosItem;
import com.azure.data.cosmos.CosmosItemProperties;
import com.azure.data.cosmos.CosmosItemRequestOptions;
import com.azure.data.cosmos.internal.changefeed.ChangeFeedContextClient;
import com.azure.data.cosmos.internal.changefeed.Lease;
import com.azure.data.cosmos.internal.changefeed.ServiceItemLease;
import com.azure.data.cosmos.internal.changefeed.ServiceItemLeaseUpdater;
import com.azure.data.cosmos.internal.changefeed.exceptions.LeaseConflictException;
import com.azure.data.cosmos.internal.changefeed.exceptions.LeaseLostException;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/data/cosmos/internal/changefeed/implementation/DocumentServiceLeaseUpdaterImpl.class */
class DocumentServiceLeaseUpdaterImpl implements ServiceItemLeaseUpdater {
    private final Logger logger = LoggerFactory.getLogger(DocumentServiceLeaseUpdaterImpl.class);
    private final int RETRY_COUNT_ON_CONFLICT = 5;
    private final ChangeFeedContextClient client;

    public DocumentServiceLeaseUpdaterImpl(ChangeFeedContextClient changeFeedContextClient) {
        if (changeFeedContextClient == null) {
            throw new IllegalArgumentException("client");
        }
        this.client = changeFeedContextClient;
    }

    @Override // com.azure.data.cosmos.internal.changefeed.ServiceItemLeaseUpdater
    public Mono<Lease> updateLease(Lease lease, CosmosItem cosmosItem, CosmosItemRequestOptions cosmosItemRequestOptions, Function<Lease, Lease> function) {
        Lease[] leaseArr = {lease};
        leaseArr[0] = function.apply(lease);
        if (leaseArr[0] == null) {
            return Mono.empty();
        }
        leaseArr[0].setTimestamp(ZonedDateTime.now(ZoneId.of("UTC")));
        return tryReplaceLease(leaseArr[0], cosmosItem).map(cosmosItemProperties -> {
            leaseArr[0] = ServiceItemLease.fromDocument(cosmosItemProperties);
            return leaseArr[0];
        }).hasElement().flatMap(bool -> {
            return bool.booleanValue() ? Mono.just(leaseArr[0]) : this.client.readItem(cosmosItem, cosmosItemRequestOptions).onErrorResume(th -> {
                if ((th instanceof CosmosClientException) && ((CosmosClientException) th).statusCode() == 404) {
                    throw new LeaseLostException(leaseArr[0]);
                }
                return Mono.error(th);
            }).map(cosmosItemResponse -> {
                ServiceItemLease fromDocument = ServiceItemLease.fromDocument(cosmosItemResponse.properties());
                this.logger.info("Partition {} update failed because the lease with token '{}' was updated by owner '{}' with token '{}'.", new Object[]{leaseArr[0].getLeaseToken(), leaseArr[0].getConcurrencyToken(), fromDocument.getOwner(), fromDocument.getConcurrencyToken()});
                leaseArr[0] = fromDocument;
                throw new LeaseConflictException(leaseArr[0], "Partition update failed");
            });
        }).retry(5L, th -> {
            if (!(th instanceof LeaseConflictException)) {
                return false;
            }
            this.logger.info("Partition {} for the lease with token '{}' failed to update for owner '{}'; will retry.", new Object[]{leaseArr[0].getLeaseToken(), leaseArr[0].getConcurrencyToken(), leaseArr[0].getOwner()});
            return true;
        }).onErrorResume(th2 -> {
            if (!(th2 instanceof LeaseConflictException)) {
                return Mono.error(th2);
            }
            this.logger.warn("Partition {} for the lease with token '{}' failed to update for owner '{}'; current continuation token '{}'.", new Object[]{leaseArr[0].getLeaseToken(), leaseArr[0].getConcurrencyToken(), leaseArr[0].getOwner(), leaseArr[0].getContinuationToken(), th2});
            return Mono.just(leaseArr[0]);
        });
    }

    private Mono<CosmosItemProperties> tryReplaceLease(Lease lease, CosmosItem cosmosItem) throws LeaseLostException {
        return this.client.replaceItem(cosmosItem, lease, getCreateIfMatchOptions(lease)).map(cosmosItemResponse -> {
            return cosmosItemResponse.properties();
        }).onErrorResume(th -> {
            if (!(th instanceof CosmosClientException)) {
                return Mono.error(th);
            }
            CosmosClientException cosmosClientException = (CosmosClientException) th;
            switch (cosmosClientException.statusCode()) {
                case 404:
                    throw new LeaseLostException(lease, cosmosClientException, true);
                case 409:
                    throw new LeaseLostException(lease, cosmosClientException, false);
                case 412:
                    return Mono.empty();
                default:
                    return Mono.error(th);
            }
        });
    }

    private CosmosItemRequestOptions getCreateIfMatchOptions(Lease lease) {
        AccessCondition accessCondition = new AccessCondition();
        accessCondition.type(AccessConditionType.IF_MATCH);
        accessCondition.condition(lease.getConcurrencyToken());
        CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions();
        cosmosItemRequestOptions.accessCondition(accessCondition);
        return cosmosItemRequestOptions;
    }
}
