package com.microsoft.azure.cosmosdb.changefeedprocessor.internal;

import com.microsoft.azure.cosmosdb.Document;
import com.microsoft.azure.cosmosdb.DocumentClientException;
import com.microsoft.azure.cosmosdb.SqlParameter;
import com.microsoft.azure.cosmosdb.SqlParameterCollection;
import com.microsoft.azure.cosmosdb.SqlQuerySpec;
import com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedDocumentClient;
import com.microsoft.azure.cosmosdb.changefeedprocessor.DocumentCollectionInfo;
import com.microsoft.azure.cosmosdb.changefeedprocessor.DocumentServiceLease;
import com.microsoft.azure.cosmosdb.changefeedprocessor.DocumentServiceLeaseUpdater;
import com.microsoft.azure.cosmosdb.changefeedprocessor.Lease;
import com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStore;
import com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager;
import com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManagerSettings;
import com.microsoft.azure.cosmosdb.changefeedprocessor.RequestOptionsFactory;
import com.microsoft.azure.cosmosdb.changefeedprocessor.exceptions.LeaseLostException;
import com.microsoft.azure.cosmosdb.changefeedprocessor.internal.ChangeFeedHelper;
import com.microsoft.azure.cosmosdb.changefeedprocessor.internal.Constants;
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
import java.time.Duration;
import rx.Completable;
import rx.Observable;
import rx.Single;
import rx.schedulers.Schedulers;

/* loaded from: input_file:com/microsoft/azure/cosmosdb/changefeedprocessor/internal/LeaseStoreManagerImpl.class */
public class LeaseStoreManagerImpl implements LeaseStoreManager, LeaseStoreManager.LeaseStoreManagerBuilderDefinition {
    private LeaseStoreManagerSettings settings = new LeaseStoreManagerSettings();
    private ChangeFeedDocumentClient leaseDocumentClient;
    private RequestOptionsFactory requestOptionsFactory;
    private DocumentServiceLeaseUpdater leaseUpdater;
    private LeaseStore leaseStore;

    public static LeaseStoreManager.LeaseStoreManagerBuilderDefinition Builder() {
        return new LeaseStoreManagerImpl();
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager.LeaseStoreManagerBuilderDefinition
    public LeaseStoreManager.LeaseStoreManagerBuilderDefinition withLeaseCollection(DocumentCollectionInfo documentCollectionInfo) {
        if (documentCollectionInfo == null) {
            throw new IllegalArgumentException("leaseCollectionLocation");
        }
        this.settings.withLeaseCollectionInfo(ChangeFeedHelper.canonicalize(documentCollectionInfo));
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager.LeaseStoreManagerBuilderDefinition
    public LeaseStoreManager.LeaseStoreManagerBuilderDefinition withLeaseDocumentClient(ChangeFeedDocumentClient changeFeedDocumentClient) {
        if (changeFeedDocumentClient == null) {
            throw new IllegalArgumentException("leaseDocumentClient");
        }
        this.leaseDocumentClient = changeFeedDocumentClient;
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager.LeaseStoreManagerBuilderDefinition
    public LeaseStoreManager.LeaseStoreManagerBuilderDefinition withLeasePrefix(String str) {
        if (str == null) {
            throw new IllegalArgumentException("leasePrefix");
        }
        this.settings.withContainerNamePrefix(str);
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager.LeaseStoreManagerBuilderDefinition
    public LeaseStoreManager.LeaseStoreManagerBuilderDefinition withLeaseCollectionLink(String str) {
        if (str == null) {
            throw new IllegalArgumentException("leaseCollectionLink");
        }
        this.settings.withLeaseCollectionLink(str);
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager.LeaseStoreManagerBuilderDefinition
    public LeaseStoreManager.LeaseStoreManagerBuilderDefinition withRequestOptionsFactory(RequestOptionsFactory requestOptionsFactory) {
        if (requestOptionsFactory == null) {
            throw new IllegalArgumentException("requestOptionsFactory");
        }
        this.requestOptionsFactory = requestOptionsFactory;
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager.LeaseStoreManagerBuilderDefinition
    public LeaseStoreManager.LeaseStoreManagerBuilderDefinition withHostName(String str) {
        if (str == null) {
            throw new IllegalArgumentException("hostName");
        }
        this.settings.withHostName(str);
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager.LeaseStoreManagerBuilderDefinition
    public LeaseStoreManager build() {
        return (LeaseStoreManager) buildAsync().toBlocking().last();
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager.LeaseStoreManagerBuilderDefinition
    public Observable<LeaseStoreManager> buildAsync() {
        if (this.settings == null) {
            throw new IllegalArgumentException("settings");
        }
        if (this.settings.getLeaseCollectionInfo() == null) {
            throw new IllegalArgumentException("settings.leaseCollectionInfo");
        }
        if (this.settings.getContainerNamePrefix() == null) {
            throw new IllegalArgumentException("settings.containerNamePrefix");
        }
        if (this.settings.getLeaseCollectionLink() == null) {
            throw new IllegalArgumentException("settings.leaseCollectionLink");
        }
        if (this.settings.getHostName() == null || this.settings.getHostName().isEmpty()) {
            throw new IllegalArgumentException("settings.hostName");
        }
        if (this.leaseDocumentClient == null) {
            throw new IllegalArgumentException("leaseDocumentClient");
        }
        if (this.requestOptionsFactory == null) {
            throw new IllegalArgumentException("requestOptionsFactory");
        }
        if (this.leaseUpdater == null) {
            this.leaseUpdater = new DocumentServiceLeaseUpdaterImpl(this.leaseDocumentClient);
        }
        this.leaseStore = new DocumentServiceLeaseStore(this.leaseDocumentClient, this.settings.getLeaseCollectionInfo(), this.settings.getContainerNamePrefix(), this.settings.getLeaseCollectionLink(), this.requestOptionsFactory);
        return Observable.fromCallable(() -> {
            if (this.settings.getLeaseCollectionInfo() == null) {
                throw new IllegalArgumentException("leaseCollectionInfo was not specified");
            }
            if (this.settings.getLeaseCollectionLink() == null) {
                throw new IllegalArgumentException("leaseCollectionLink was not specified");
            }
            if (this.requestOptionsFactory == null) {
                throw new IllegalArgumentException("requestOptionsFactory was not specified");
            }
            DocumentCollectionInfo leaseCollectionInfo = this.settings.getLeaseCollectionInfo();
            if (this.leaseDocumentClient == null) {
                this.leaseDocumentClient = new ChangeFeedDocumentClientImpl(new AsyncDocumentClient.Builder().withServiceEndpoint(leaseCollectionInfo.getUri()).withMasterKeyOrResourceToken(leaseCollectionInfo.getMasterKey()).withConnectionPolicy(leaseCollectionInfo.getConnectionPolicy()).withConsistencyLevel(leaseCollectionInfo.getConsistencyLevel()).build());
            }
            return this;
        }).subscribeOn(Schedulers.io()).map(leaseStoreManagerImpl -> {
            return this;
        });
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager, com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseContainer
    public Observable<Lease> getAllLeasesAsync() {
        return listDocumentsAsync(getPartitionLeasePrefix()).subscribeOn(Schedulers.computation()).map(documentServiceLease -> {
            return documentServiceLease;
        });
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager, com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseContainer
    public Observable<Lease> getOwnedLeasesAsync() {
        return getAllLeasesAsync().filter(lease -> {
            return Boolean.valueOf(lease.getOwner() != null && lease.getOwner().equalsIgnoreCase(this.settings.getHostName()));
        });
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager, com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseManager
    public Observable<Lease> createLeaseIfNotExistAsync(String str, String str2) {
        if (str == null) {
            throw new IllegalArgumentException("leaseToken");
        }
        DocumentServiceLease withContinuationToken = new DocumentServiceLease().withId(getDocumentId(str)).withLeaseToken(str).withContinuationToken(str2);
        return this.leaseDocumentClient.createDocument(this.settings.getLeaseCollectionLink(), withContinuationToken, null, false).onErrorReturn(th -> {
            if ((th instanceof DocumentClientException) && ((DocumentClientException) th).getStatusCode() == 409) {
                return null;
            }
            Observable.error(th);
            return null;
        }).subscribeOn(Schedulers.computation()).map(resourceResponse -> {
            if (resourceResponse == null) {
                return null;
            }
            Document resource = resourceResponse.getResource();
            return withContinuationToken.withId(resource.getId()).withEtag(resource.getETag()).withTs(resource.getString(Constants.Properties.LAST_MODIFIED));
        });
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager, com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseManager
    public Completable deleteAsync(Lease lease) {
        if (lease == null || lease.getId() == null) {
            throw new IllegalArgumentException("lease");
        }
        return this.leaseDocumentClient.deleteDocument(createDocumentUri(lease.getId()), this.requestOptionsFactory.createRequestOptions(lease)).onErrorReturn(th -> {
            if ((th instanceof DocumentClientException) && ((DocumentClientException) th).getStatusCode() == 404) {
                return null;
            }
            Observable.error(th);
            return null;
        }).map(resourceResponse -> {
            return true;
        }).toCompletable();
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager, com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseManager
    public Observable<Lease> acquireAsync(Lease lease) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        String owner = lease.getOwner();
        return this.leaseUpdater.updateLeaseAsync(lease, createDocumentUri(lease.getId()), this.requestOptionsFactory.createRequestOptions(lease), lease2 -> {
            if (lease2.getOwner() != null && !lease2.getOwner().equalsIgnoreCase(owner)) {
                Observable.error(new LeaseLostException(lease));
            }
            lease2.setOwner(this.settings.getHostName());
            lease2.setProperties(lease.getProperties());
            return lease2;
        });
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager, com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseManager
    public Completable releaseAsync(Lease lease) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        return this.leaseDocumentClient.readDocument(createDocumentUri(lease.getId()), this.requestOptionsFactory.createRequestOptions(lease)).onErrorReturn(th -> {
            if ((th instanceof DocumentClientException) && ((DocumentClientException) th).getStatusCode() == 404) {
                Observable.error(new LeaseLostException(lease));
            }
            Observable.error(th);
            return null;
        }).subscribeOn(Schedulers.computation()).map(resourceResponse -> {
            return DocumentServiceLease.fromDocument(resourceResponse.getResource());
        }).flatMap(documentServiceLease -> {
            return this.leaseUpdater.updateLeaseAsync(documentServiceLease, this.createDocumentUri(documentServiceLease.getId()), this.requestOptionsFactory.createRequestOptions(lease), lease2 -> {
                if (!lease2.getOwner().equalsIgnoreCase(lease.getOwner())) {
                    Observable.error(new LeaseLostException(lease));
                }
                lease2.setOwner(null);
                return lease2;
            });
        }).toCompletable();
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager, com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseManager
    public Observable<Lease> renewAsync(Lease lease) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        return this.leaseDocumentClient.readDocument(createDocumentUri(lease.getId()), this.requestOptionsFactory.createRequestOptions(lease)).onErrorReturn(th -> {
            if ((th instanceof DocumentClientException) && ((DocumentClientException) th).getStatusCode() == 404) {
                Observable.error(new LeaseLostException(lease));
            }
            Observable.error(th);
            return null;
        }).observeOn(Schedulers.computation()).map(resourceResponse -> {
            return DocumentServiceLease.fromDocument(resourceResponse.getResource());
        }).flatMap(documentServiceLease -> {
            return this.leaseUpdater.updateLeaseAsync(documentServiceLease, this.createDocumentUri(documentServiceLease.getId()), this.requestOptionsFactory.createRequestOptions(lease), lease2 -> {
                if (!lease2.getOwner().equalsIgnoreCase(lease.getOwner())) {
                    Observable.error(new LeaseLostException(lease));
                }
                return lease2;
            });
        });
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager, com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseManager
    public Observable<Lease> updatePropertiesAsync(Lease lease) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        if (!lease.getOwner().equalsIgnoreCase(this.settings.getHostName())) {
            Observable.error(new LeaseLostException(lease));
        }
        return this.leaseUpdater.updateLeaseAsync(lease, createDocumentUri(lease.getId()), this.requestOptionsFactory.createRequestOptions(lease), lease2 -> {
            if (!lease2.getOwner().equalsIgnoreCase(lease.getOwner())) {
                Observable.error(new LeaseLostException(lease));
            }
            lease2.setProperties(lease.getProperties());
            return lease2;
        });
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager, com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseCheckpointer
    public Observable<Lease> checkpointAsync(Lease lease, String str) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        if (str == null || str.isEmpty()) {
            throw new IllegalArgumentException("continuationToken must be a non-empty string");
        }
        return this.leaseUpdater.updateLeaseAsync(lease, createDocumentUri(lease.getId()), this.requestOptionsFactory.createRequestOptions(lease), lease2 -> {
            if (lease2.getOwner() != null && !lease2.getOwner().equalsIgnoreCase(lease.getOwner())) {
                Observable.error(new LeaseLostException(lease));
            }
            lease2.setContinuationToken(str);
            return lease2;
        });
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager, com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStore
    public Single<Boolean> isInitializedAsync() {
        return this.leaseStore.isInitializedAsync();
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager, com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStore
    public Completable markInitializedAsync() {
        return this.leaseStore.markInitializedAsync();
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager, com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStore
    public Single<Boolean> acquireInitializationLockAsync(Duration duration) {
        return this.leaseStore.acquireInitializationLockAsync(duration);
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager, com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStore
    public Single<Boolean> releaseInitializationLockAsync() {
        return this.leaseStore.releaseInitializationLockAsync();
    }

    private Observable<DocumentServiceLease> tryGetLeaseAsync(Lease lease) {
        return this.leaseDocumentClient.readDocument(createDocumentUri(lease.getId()), this.requestOptionsFactory.createRequestOptions(lease)).onErrorReturn(th -> {
            if ((th instanceof DocumentClientException) && ((DocumentClientException) th).getStatusCode() == 404) {
                return null;
            }
            Observable.error(th);
            return null;
        }).subscribeOn(Schedulers.computation()).map(resourceResponse -> {
            if (resourceResponse == null) {
                return null;
            }
            return DocumentServiceLease.fromDocument(resourceResponse.getResource());
        });
    }

    private Observable<DocumentServiceLease> listDocumentsAsync(String str) {
        if (str == null || str.isEmpty()) {
            throw new IllegalArgumentException("prefix");
        }
        SqlParameter sqlParameter = new SqlParameter();
        sqlParameter.setName("@PartitionLeasePrefix");
        sqlParameter.setValue(str);
        return this.leaseDocumentClient.queryDocuments(this.settings.getLeaseCollectionLink(), new SqlQuerySpec("SELECT * FROM c WHERE STARTSWITH(c.id, @PartitionLeasePrefix)", new SqlParameterCollection(new SqlParameter[]{sqlParameter})), this.requestOptionsFactory.createFeedOptions()).subscribeOn(Schedulers.computation()).flatMap(feedResponse -> {
            return Observable.from(feedResponse.getResults());
        }).map(DocumentServiceLease::fromDocument);
    }

    private String getDocumentId(String str) {
        return getPartitionLeasePrefix() + str;
    }

    private String getPartitionLeasePrefix() {
        return this.settings.getContainerNamePrefix() + "..";
    }

    private String createDocumentUri(String str) {
        return ChangeFeedHelper.UriFactory.createDocumentUri(this.settings.getLeaseCollectionInfo().getDatabaseName(), this.settings.getLeaseCollectionInfo().getCollectionName(), str);
    }
}
