package com.microsoft.azure.cosmosdb.changefeedprocessor.internal;

import com.microsoft.azure.cosmosdb.FeedOptions;
import com.microsoft.azure.cosmosdb.PartitionKeyRange;
import com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedDocumentClient;
import com.microsoft.azure.cosmosdb.changefeedprocessor.Lease;
import com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseContainer;
import com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseManager;
import com.microsoft.azure.cosmosdb.changefeedprocessor.PartitionSynchronizer;
import java.util.HashSet;
import java.util.Set;
import rx.Completable;
import rx.Observable;
import rx.functions.Func1;

/* loaded from: input_file:com/microsoft/azure/cosmosdb/changefeedprocessor/internal/PartitionSynchronizerImpl.class */
public class PartitionSynchronizerImpl implements PartitionSynchronizer {
    private final ChangeFeedDocumentClient documentClient;
    private final String collectionSelfLink;
    private final LeaseContainer leaseContainer;
    private final LeaseManager leaseManager;
    private final int degreeOfParallelism;
    private final int maxBatchSize;

    public PartitionSynchronizerImpl(ChangeFeedDocumentClient changeFeedDocumentClient, String str, LeaseContainer leaseContainer, LeaseManager leaseManager, int i, int i2) {
        this.documentClient = changeFeedDocumentClient;
        this.collectionSelfLink = str;
        this.leaseContainer = leaseContainer;
        this.leaseManager = leaseManager;
        this.degreeOfParallelism = i;
        this.maxBatchSize = i2;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.PartitionSynchronizer
    public Completable createMissingLeasesAsync() {
        return enumPartitionKeyRangesAsync().map(partitionKeyRange -> {
            return partitionKeyRange.getId();
        }).toList().flatMap(list -> {
            return this.createLeasesAsync(new HashSet(list));
        }).onErrorReturn(new Func1<Throwable, Lease>() { // from class: com.microsoft.azure.cosmosdb.changefeedprocessor.internal.PartitionSynchronizerImpl.1
            public Lease call(Throwable th) {
                return null;
            }
        }).toCompletable();
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.PartitionSynchronizer
    public Observable<Lease> splitPartitionAsync(Lease lease) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        String leaseToken = lease.getLeaseToken();
        String continuationToken = lease.getContinuationToken();
        return enumPartitionKeyRangesAsync().filter(partitionKeyRange -> {
            return Boolean.valueOf((partitionKeyRange == null || partitionKeyRange.getParents() == null || !partitionKeyRange.getParents().contains(leaseToken)) ? false : true);
        }).map((v0) -> {
            return v0.getId();
        }).toList().flatMap(list -> {
            if (list.size() == 0) {
                throw new RuntimeException(String.format("Partition %s had split but we failed to find at least one child partition", leaseToken));
            }
            return Observable.from(list);
        }).flatMap(str -> {
            return this.leaseManager.createLeaseIfNotExistAsync(str, continuationToken);
        }, this.degreeOfParallelism).map(lease2 -> {
            return lease2;
        });
    }

    private Observable<PartitionKeyRange> enumPartitionKeyRangesAsync() {
        String str = this.collectionSelfLink;
        FeedOptions feedOptions = new FeedOptions();
        feedOptions.setMaxItemCount(Integer.valueOf(this.maxBatchSize));
        feedOptions.setRequestContinuation((String) null);
        return this.documentClient.readPartitionKeyRangeFeedAsync(str, feedOptions).flatMap(feedResponse -> {
            return Observable.from(feedResponse.getResults());
        }).onErrorReturn(new Func1<Throwable, PartitionKeyRange>() { // from class: com.microsoft.azure.cosmosdb.changefeedprocessor.internal.PartitionSynchronizerImpl.2
            public PartitionKeyRange call(Throwable th) {
                return null;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<Lease> createLeasesAsync(Set<String> set) {
        HashSet hashSet = new HashSet(set);
        return this.leaseContainer.getAllLeasesAsync().map(lease -> {
            if (lease != null) {
                hashSet.remove(lease.getLeaseToken());
            }
            return lease;
        }).toCompletable().andThen(Observable.from(hashSet).flatMap(str -> {
            return this.leaseManager.createLeaseIfNotExistAsync(str, null);
        }, this.degreeOfParallelism).map(lease2 -> {
            return lease2;
        }));
    }
}
