package com.microsoft.azure.documentdb.rx.internal;

import com.microsoft.azure.documentdb.Attachment;
import com.microsoft.azure.documentdb.BridgeInternal;
import com.microsoft.azure.documentdb.ChangeFeedOptions;
import com.microsoft.azure.documentdb.Conflict;
import com.microsoft.azure.documentdb.ConnectionMode;
import com.microsoft.azure.documentdb.ConnectionPolicy;
import com.microsoft.azure.documentdb.ConsistencyLevel;
import com.microsoft.azure.documentdb.Database;
import com.microsoft.azure.documentdb.DatabaseAccount;
import com.microsoft.azure.documentdb.Document;
import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.DocumentCollection;
import com.microsoft.azure.documentdb.FeedOptions;
import com.microsoft.azure.documentdb.FeedResponsePage;
import com.microsoft.azure.documentdb.MediaOptions;
import com.microsoft.azure.documentdb.MediaResponse;
import com.microsoft.azure.documentdb.Offer;
import com.microsoft.azure.documentdb.PartitionKeyRange;
import com.microsoft.azure.documentdb.Permission;
import com.microsoft.azure.documentdb.RequestOptions;
import com.microsoft.azure.documentdb.Resource;
import com.microsoft.azure.documentdb.ResourceResponse;
import com.microsoft.azure.documentdb.SqlQuerySpec;
import com.microsoft.azure.documentdb.StoredProcedure;
import com.microsoft.azure.documentdb.StoredProcedureResponse;
import com.microsoft.azure.documentdb.Trigger;
import com.microsoft.azure.documentdb.User;
import com.microsoft.azure.documentdb.UserDefinedFunction;
import com.microsoft.azure.documentdb.internal.BaseAuthorizationTokenProvider;
import com.microsoft.azure.documentdb.internal.DocumentServiceResponse;
import com.microsoft.azure.documentdb.internal.EndpointManager;
import com.microsoft.azure.documentdb.internal.HttpConstants;
import com.microsoft.azure.documentdb.internal.OperationType;
import com.microsoft.azure.documentdb.internal.QueryCompatibilityMode;
import com.microsoft.azure.documentdb.internal.ResourceType;
import com.microsoft.azure.documentdb.internal.SessionContainer;
import com.microsoft.azure.documentdb.internal.UserAgentContainer;
import com.microsoft.azure.documentdb.internal.Utils;
import com.microsoft.azure.documentdb.internal.routing.ClientCollectionCache;
import com.microsoft.azure.documentdb.rx.AsyncDocumentClient;
import com.microsoft.azure.documentdb.rx.internal.Constants;
import cosmosdb_connector_shaded.io.netty.buffer.ByteBuf;
import cosmosdb_connector_shaded.io.reactivex.netty.RxNetty;
import cosmosdb_connector_shaded.io.reactivex.netty.channel.RxEventLoopProvider;
import cosmosdb_connector_shaded.io.reactivex.netty.channel.SingleNioLoopProvider;
import cosmosdb_connector_shaded.io.reactivex.netty.client.RxClient;
import cosmosdb_connector_shaded.io.reactivex.netty.pipeline.ssl.DefaultFactories;
import cosmosdb_connector_shaded.io.reactivex.netty.protocol.http.client.HttpClient;
import cosmosdb_connector_shaded.io.reactivex.netty.protocol.http.client.HttpClientBuilder;
import cosmosdb_connector_shaded.org.apache.commons.lang3.StringUtils;
import cosmosdb_connector_shaded.org.apache.http.client.utils.DateUtils;
import cosmosdb_connector_shaded.org.slf4j.Logger;
import cosmosdb_connector_shaded.org.slf4j.LoggerFactory;
import cosmosdb_connector_shaded.rx.Observable;
import cosmosdb_connector_shaded.rx.Scheduler;
import cosmosdb_connector_shaded.rx.functions.Func1;
import cosmosdb_connector_shaded.rx.internal.util.RxThreadFactory;
import cosmosdb_connector_shaded.rx.schedulers.Schedulers;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URLEncoder;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/microsoft/azure/documentdb/rx/internal/RxDocumentClientImpl.class */
public class RxDocumentClientImpl implements AsyncDocumentClient {
    private static final int MAX_COLLECTION_CACHE_CONCURRENCY = 10;
    private final String masterKey;
    private final ExecutorService collectionCacheExecutorService;
    private final URI serviceEndpoint;
    private final ConnectionPolicy connectionPolicy;
    private final SessionContainer sessionContainer;
    private final ConsistencyLevel consistencyLevel;
    private final BaseAuthorizationTokenProvider authorizationTokenProvider;
    private final ClientCollectionCache collectionCache;
    private final RxGatewayStoreModel gatewayProxy;
    private final RxWrapperDocumentClientImpl rxWrapperClient;
    private final Scheduler computationScheduler;
    private Map<String, String> resourceTokens;
    private final HttpClient<ByteBuf, ByteBuf> rxClient;
    private final EndpointManager globalEndpointManager;
    private final ExecutorService computationExecutor;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Logger logger = LoggerFactory.getLogger(RxDocumentClientImpl.class);
    private final QueryCompatibilityMode queryCompatibilityMode = QueryCompatibilityMode.Default;

    public RxDocumentClientImpl(URI uri, String str, ConnectionPolicy connectionPolicy, ConsistencyLevel consistencyLevel, int i, int i2) {
        this.logger.info("Initializing DocumentClient with serviceEndpoint [{}], ConnectionPolicy [{}], ConsistencyLevel [{}]", uri, connectionPolicy, consistencyLevel);
        this.masterKey = str;
        this.serviceEndpoint = uri;
        if (connectionPolicy != null) {
            this.connectionPolicy = connectionPolicy;
        } else {
            this.connectionPolicy = new ConnectionPolicy();
        }
        this.sessionContainer = new SessionContainer(this.serviceEndpoint.getHost());
        this.consistencyLevel = consistencyLevel;
        UserAgentContainer userAgentContainer = new UserAgentContainer(Constants.Versions.SDK_NAME, Constants.Versions.SDK_VERSION);
        String userAgentSuffix = this.connectionPolicy.getUserAgentSuffix();
        if (userAgentSuffix != null && userAgentSuffix.length() > 0) {
            userAgentContainer.setSuffix(userAgentSuffix);
        }
        if (i <= 0) {
            int availableProcessors = Runtime.getRuntime().availableProcessors();
            if (availableProcessors >= 4) {
                i2 = availableProcessors / 4;
                i = availableProcessors - i2;
            } else {
                i2 = 0;
                i = availableProcessors;
            }
            this.logger.debug("Auto configuring eventLoop size and computation pool size. CPU cores {[]}, eventLoopSize [{}], computationPoolSize [{}]", Integer.valueOf(availableProcessors), Integer.valueOf(i), Integer.valueOf(i2));
        }
        this.logger.debug("EventLoop size [{}]", Integer.valueOf(i));
        synchronized (RxDocumentClientImpl.class) {
            RxEventLoopProvider useEventLoopProvider = RxNetty.useEventLoopProvider(new SingleNioLoopProvider(1, i));
            this.rxClient = (HttpClient) httpClientBuilder().build();
            RxNetty.useEventLoopProvider(useEventLoopProvider);
        }
        if (i2 > 0) {
            this.logger.debug("Intensive computation configured on a computation scheduler backed by thread pool size [{}]", Integer.valueOf(i2));
            this.computationExecutor = new ThreadPoolExecutor(i2, i2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(2), new RxThreadFactory("cosmosdb_connector_shaded.rxdocdb-computation"), new ThreadPoolExecutor.CallerRunsPolicy());
            this.computationScheduler = Schedulers.from(this.computationExecutor);
        } else {
            this.logger.debug("Intensive computation configured on the subscription thread");
            this.computationExecutor = null;
            this.computationScheduler = Schedulers.immediate();
        }
        this.authorizationTokenProvider = new BaseAuthorizationTokenProvider(this.masterKey);
        this.collectionCacheExecutorService = new ThreadPoolExecutor(1, 10, 10L, TimeUnit.MINUTES, new ArrayBlockingQueue(10, true), new ThreadPoolExecutor.CallerRunsPolicy());
        this.collectionCache = BridgeInternal.createClientCollectionCache(this, this.collectionCacheExecutorService);
        this.globalEndpointManager = BridgeInternal.createGlobalEndpointManager(this);
        this.gatewayProxy = new RxGatewayStoreModel(this.connectionPolicy, consistencyLevel, this.queryCompatibilityMode, this.masterKey, this.resourceTokens, userAgentContainer, this.globalEndpointManager, this.rxClient);
        this.rxWrapperClient = new RxWrapperDocumentClientImpl(new DocumentClient(uri.toString(), str, connectionPolicy, consistencyLevel));
        if (this.connectionPolicy.getConnectionMode() == ConnectionMode.DirectHttps) {
            throw new UnsupportedOperationException("Direct Https is not supported");
        }
    }

    private HttpClientBuilder<ByteBuf, ByteBuf> httpClientBuilder() {
        return (HttpClientBuilder) RxNetty.newHttpClientBuilder(this.serviceEndpoint.getHost(), this.serviceEndpoint.getPort()).withSslEngineFactory(DefaultFactories.trustAll()).withMaxConnections(this.connectionPolicy.getMaxPoolSize()).withIdleConnectionsTimeoutMillis(this.connectionPolicy.getIdleConnectionTimeout() * 1000).config(new RxClient.ClientConfig.Builder().readTimeout(this.connectionPolicy.getRequestTimeout(), TimeUnit.SECONDS).build());
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public URI getServiceEndpoint() {
        return this.serviceEndpoint;
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public URI getWriteEndpoint() {
        return this.globalEndpointManager.getWriteEndpoint();
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public URI getReadEndpoint() {
        return this.globalEndpointManager.getReadEndpoint();
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public ConnectionPolicy getConnectionPolicy() {
        return this.connectionPolicy;
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Database>> createDatabase(Database database, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                if (database == null) {
                    throw new IllegalArgumentException("Database");
                }
                this.logger.debug("Creating a Database. id: [{}]", database.getId());
                validateResource(database);
                return doCreate(RxDocumentServiceRequest.create(OperationType.Create, ResourceType.Database, "/dbs", database, getRequestHeaders(requestOptions))).map(documentServiceResponse -> {
                    return BridgeInternal.toResourceResponse(documentServiceResponse, Database.class);
                });
            } catch (Exception e) {
                this.logger.debug("Failure in creating a database. due to [{}]", e.getMessage(), e);
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Database>> deleteDatabase(String str, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                if (StringUtils.isEmpty(str)) {
                    throw new IllegalArgumentException("databaseLink");
                }
                this.logger.debug("Deleting a Database. databaseLink: [{}]", str);
                return doDelete(RxDocumentServiceRequest.create(OperationType.Delete, ResourceType.Database, Utils.joinPath(str, null), getRequestHeaders(requestOptions))).map(documentServiceResponse -> {
                    return BridgeInternal.toResourceResponse(documentServiceResponse, Database.class);
                });
            } catch (Exception e) {
                this.logger.debug("Failure in deleting a database. due to [{}]", e.getMessage(), e);
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Database>> readDatabase(String str, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                if (StringUtils.isEmpty(str)) {
                    throw new IllegalArgumentException("databaseLink");
                }
                this.logger.debug("Reading a Database. databaseLink: [{}]", str);
                return doRead(RxDocumentServiceRequest.create(OperationType.Read, ResourceType.Database, Utils.joinPath(str, null), getRequestHeaders(requestOptions))).map(documentServiceResponse -> {
                    return BridgeInternal.toResourceResponse(documentServiceResponse, Database.class);
                });
            } catch (Exception e) {
                this.logger.debug("Failure in reading a database. due to [{}]", e.getMessage(), e);
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Database>> readDatabases(FeedOptions feedOptions) {
        return this.rxWrapperClient.readDatabases(feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Database>> queryDatabases(String str, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryDatabases(str, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Database>> queryDatabases(SqlQuerySpec sqlQuerySpec, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryDatabases(sqlQuerySpec, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<DocumentCollection>> createCollection(String str, DocumentCollection documentCollection, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                if (StringUtils.isEmpty(str)) {
                    throw new IllegalArgumentException("databaseLink");
                }
                if (documentCollection == null) {
                    throw new IllegalArgumentException("collection");
                }
                this.logger.debug("Creating a Collection. databaseLink: [{}], Collection id: [{}]", str, documentCollection.getId());
                validateResource(documentCollection);
                return doCreate(RxDocumentServiceRequest.create(OperationType.Create, ResourceType.DocumentCollection, Utils.joinPath(str, "colls"), documentCollection, getRequestHeaders(requestOptions))).map(documentServiceResponse -> {
                    return BridgeInternal.toResourceResponse(documentServiceResponse, DocumentCollection.class);
                });
            } catch (Exception e) {
                this.logger.debug("Failure in creating a collection. due to [{}]", e.getMessage(), e);
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<DocumentCollection>> replaceCollection(DocumentCollection documentCollection, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                if (documentCollection == null) {
                    throw new IllegalArgumentException("collection");
                }
                this.logger.debug("Replacing a Collection. id: [{}]", documentCollection.getId());
                validateResource(documentCollection);
                return doReplace(RxDocumentServiceRequest.create(OperationType.Replace, ResourceType.DocumentCollection, Utils.joinPath(documentCollection.getSelfLink(), null), documentCollection, getRequestHeaders(requestOptions))).map(documentServiceResponse -> {
                    return BridgeInternal.toResourceResponse(documentServiceResponse, DocumentCollection.class);
                });
            } catch (Exception e) {
                this.logger.debug("Failure in replacing a collection. due to [{}]", e.getMessage(), e);
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<DocumentCollection>> deleteCollection(String str, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                if (StringUtils.isEmpty(str)) {
                    throw new IllegalArgumentException("collectionLink");
                }
                this.logger.debug("Deleting a Collection. collectionLink: [{}]", str);
                return doDelete(RxDocumentServiceRequest.create(OperationType.Delete, ResourceType.DocumentCollection, Utils.joinPath(str, null), getRequestHeaders(requestOptions))).map(documentServiceResponse -> {
                    return BridgeInternal.toResourceResponse(documentServiceResponse, DocumentCollection.class);
                });
            } catch (Exception e) {
                this.logger.debug("Failure in deleting a collection, due to [{}]", e.getMessage(), e);
                return Observable.error(e);
            }
        });
    }

    private Observable<DocumentServiceResponse> doDelete(RxDocumentServiceRequest rxDocumentServiceRequest) throws DocumentClientException {
        Observable retryWhen = Observable.defer(() -> {
            try {
                return this.gatewayProxy.doDelete(rxDocumentServiceRequest).doOnNext(documentServiceResponse -> {
                    if (rxDocumentServiceRequest.getResourceType() != ResourceType.DocumentCollection) {
                        captureSessionToken(rxDocumentServiceRequest, documentServiceResponse);
                    } else {
                        clearToken(rxDocumentServiceRequest, documentServiceResponse);
                    }
                });
            } catch (Exception e) {
                return Observable.error(e);
            }
        }).retryWhen(createExecuteRequestRetryHandler(rxDocumentServiceRequest));
        return createPutMoreContentObservable(rxDocumentServiceRequest, "DELETE").doOnNext(obj -> {
            applySessionToken(rxDocumentServiceRequest);
        }).flatMap(obj2 -> {
            return retryWhen;
        });
    }

    private Observable<DocumentServiceResponse> doRead(RxDocumentServiceRequest rxDocumentServiceRequest) throws DocumentClientException {
        Observable retryWhen = Observable.defer(() -> {
            try {
                return this.gatewayProxy.processMessage(rxDocumentServiceRequest).doOnNext(documentServiceResponse -> {
                    captureSessionToken(rxDocumentServiceRequest, documentServiceResponse);
                });
            } catch (Exception e) {
                return Observable.error(e);
            }
        }).retryWhen(createExecuteRequestRetryHandler(rxDocumentServiceRequest));
        return createPutMoreContentObservable(rxDocumentServiceRequest, "GET").doOnNext(obj -> {
            applySessionToken(rxDocumentServiceRequest);
        }).flatMap(obj2 -> {
            return retryWhen;
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<DocumentCollection>> readCollection(String str, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                if (StringUtils.isEmpty(str)) {
                    throw new IllegalArgumentException("collectionLink");
                }
                this.logger.debug("Reading a Collection. collectionLink: [{}]", str);
                return doRead(RxDocumentServiceRequest.create(OperationType.Read, ResourceType.DocumentCollection, Utils.joinPath(str, null), getRequestHeaders(requestOptions))).map(documentServiceResponse -> {
                    return BridgeInternal.toResourceResponse(documentServiceResponse, DocumentCollection.class);
                });
            } catch (Exception e) {
                this.logger.debug("Failure in reading a collection, due to [{}]", e.getMessage(), e);
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<DocumentCollection>> readCollections(String str, FeedOptions feedOptions) {
        return this.rxWrapperClient.readCollections(str, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<DocumentCollection>> queryCollections(String str, String str2, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryCollections(str, str2, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<DocumentCollection>> queryCollections(String str, SqlQuerySpec sqlQuerySpec, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryCollections(str, sqlQuerySpec, feedOptions);
    }

    private String getTargetDocumentCollectionLink(String str, Object obj) {
        if (StringUtils.isEmpty(str)) {
            throw new IllegalArgumentException("collectionLink");
        }
        if (obj == null) {
            throw new IllegalArgumentException("document");
        }
        if (Utils.isDatabaseLink(str)) {
        }
        return str;
    }

    private static void validateResource(Resource resource) {
        BridgeInternal.validateResource(resource);
    }

    private Map<String, String> getRequestHeaders(RequestOptions requestOptions) {
        return BridgeInternal.getRequestHeaders(requestOptions);
    }

    private void addPartitionKeyInformation(RxDocumentServiceRequest rxDocumentServiceRequest, Document document, RequestOptions requestOptions, DocumentCollection documentCollection) {
        BridgeInternal.addPartitionKeyInformation(rxDocumentServiceRequest, document, requestOptions, documentCollection);
    }

    private RxDocumentServiceRequest getCreateDocumentRequest(String str, Object obj, RequestOptions requestOptions, boolean z, OperationType operationType) {
        if (StringUtils.isEmpty(str)) {
            throw new IllegalArgumentException("documentCollectionLink");
        }
        if (obj == null) {
            throw new IllegalArgumentException("document");
        }
        Document documentFromObject = BridgeInternal.documentFromObject(obj);
        validateResource(documentFromObject);
        if (documentFromObject.getId() == null && !z) {
            documentFromObject.setId(UUID.randomUUID().toString());
        }
        RxDocumentServiceRequest create = RxDocumentServiceRequest.create(operationType, ResourceType.Document, Utils.joinPath(str, "docs"), documentFromObject, getRequestHeaders(requestOptions));
        addPartitionKeyInformation(create, documentFromObject, requestOptions, this.collectionCache.resolveCollection(create));
        return create;
    }

    private void putMoreContentIntoDocumentServiceRequest(RxDocumentServiceRequest rxDocumentServiceRequest, String str) {
        if (this.masterKey != null) {
            Date date = new Date();
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DateUtils.PATTERN_RFC1123, Locale.US);
            simpleDateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
            rxDocumentServiceRequest.getHeaders().put(HttpConstants.HttpHeaders.X_DATE, simpleDateFormat.format(date));
        }
        if (this.masterKey != null || this.resourceTokens != null) {
            try {
                rxDocumentServiceRequest.getHeaders().put(HttpConstants.HttpHeaders.AUTHORIZATION, URLEncoder.encode(getAuthorizationToken(rxDocumentServiceRequest.getResourceFullName(), rxDocumentServiceRequest.getPath(), rxDocumentServiceRequest.getResourceType(), str, rxDocumentServiceRequest.getHeaders(), this.masterKey, this.resourceTokens), "UTF-8"));
            } catch (UnsupportedEncodingException e) {
                throw new IllegalStateException("Failed to encode authtoken.", e);
            }
        }
        if (("POST".equals(str) || "PUT".equals(str)) && !rxDocumentServiceRequest.getHeaders().containsKey("Content-Type")) {
            rxDocumentServiceRequest.getHeaders().put("Content-Type", "application/json");
        }
        if (rxDocumentServiceRequest.getHeaders().containsKey("Accept")) {
            return;
        }
        rxDocumentServiceRequest.getHeaders().put("Accept", "application/json");
    }

    private String getAuthorizationToken(String str, String str2, ResourceType resourceType, String str3, Map<String, String> map, String str4, Map<String, String> map2) {
        if (str4 != null) {
            return this.authorizationTokenProvider.generateKeyAuthorizationSignature(str3, str, resourceType, map);
        }
        if ($assertionsDisabled || map2 != null) {
            return this.authorizationTokenProvider.getAuthorizationTokenUsingResourceTokens(map2, str2, str);
        }
        throw new AssertionError();
    }

    private void applySessionToken(RxDocumentServiceRequest rxDocumentServiceRequest) {
        Map<String, String> headers = rxDocumentServiceRequest.getHeaders();
        if (headers == null || StringUtils.isEmpty(headers.get(HttpConstants.HttpHeaders.SESSION_TOKEN))) {
            String str = rxDocumentServiceRequest.getHeaders().get(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL);
            if ((this.consistencyLevel == ConsistencyLevel.Session || (!StringUtils.isEmpty(str) && StringUtils.equalsIgnoreCase(str, ConsistencyLevel.Session.toString()))) && !StringUtils.isEmpty(rxDocumentServiceRequest.getResourceAddress())) {
                String resolveGlobalSessionToken = this.sessionContainer.resolveGlobalSessionToken(rxDocumentServiceRequest);
                if (StringUtils.isEmpty(resolveGlobalSessionToken)) {
                    return;
                }
                headers.put(HttpConstants.HttpHeaders.SESSION_TOKEN, resolveGlobalSessionToken);
            }
        }
    }

    void captureSessionToken(RxDocumentServiceRequest rxDocumentServiceRequest, DocumentServiceResponse documentServiceResponse) {
        this.sessionContainer.setSessionToken(rxDocumentServiceRequest, documentServiceResponse);
    }

    void clearToken(RxDocumentServiceRequest rxDocumentServiceRequest, DocumentServiceResponse documentServiceResponse) {
        this.sessionContainer.clearToken(rxDocumentServiceRequest);
    }

    private Observable<DocumentServiceResponse> doCreate(RxDocumentServiceRequest rxDocumentServiceRequest) {
        Observable retryWhen = Observable.defer(() -> {
            try {
                return this.gatewayProxy.processMessage(rxDocumentServiceRequest).doOnNext(documentServiceResponse -> {
                    captureSessionToken(rxDocumentServiceRequest, documentServiceResponse);
                });
            } catch (Exception e) {
                return Observable.error(e);
            }
        }).retryWhen(createExecuteRequestRetryHandler(rxDocumentServiceRequest));
        return createPutMoreContentObservable(rxDocumentServiceRequest, "POST").doOnNext(obj -> {
            applySessionToken(rxDocumentServiceRequest);
        }).flatMap(obj2 -> {
            return retryWhen;
        });
    }

    private Observable<Object> createPutMoreContentObservable(RxDocumentServiceRequest rxDocumentServiceRequest, String str) {
        return Observable.create(subscriber -> {
            try {
                putMoreContentIntoDocumentServiceRequest(rxDocumentServiceRequest, str);
                subscriber.onNext(rxDocumentServiceRequest);
                subscriber.onCompleted();
            } catch (Exception e) {
                subscriber.onError(e);
            }
        }).subscribeOn(this.computationScheduler);
    }

    private Observable<DocumentServiceResponse> doUpsert(RxDocumentServiceRequest rxDocumentServiceRequest) {
        Observable retryWhen = Observable.defer(() -> {
            try {
                return this.gatewayProxy.processMessage(rxDocumentServiceRequest).doOnNext(documentServiceResponse -> {
                    captureSessionToken(rxDocumentServiceRequest, documentServiceResponse);
                });
            } catch (Exception e) {
                return Observable.error(e);
            }
        }).retryWhen(createExecuteRequestRetryHandler(rxDocumentServiceRequest));
        return createPutMoreContentObservable(rxDocumentServiceRequest, "POST").doOnNext(obj -> {
            applySessionToken(rxDocumentServiceRequest);
            Map<String, String> headers = rxDocumentServiceRequest.getHeaders();
            if (!$assertionsDisabled && headers == null) {
                throw new AssertionError();
            }
            headers.put(HttpConstants.HttpHeaders.IS_UPSERT, "true");
        }).flatMap(obj2 -> {
            return retryWhen;
        });
    }

    private Observable<DocumentServiceResponse> doReplace(RxDocumentServiceRequest rxDocumentServiceRequest) {
        Observable retryWhen = Observable.defer(() -> {
            try {
                return this.gatewayProxy.doReplace(rxDocumentServiceRequest).doOnNext(documentServiceResponse -> {
                    captureSessionToken(rxDocumentServiceRequest, documentServiceResponse);
                });
            } catch (Exception e) {
                return Observable.error(e);
            }
        }).retryWhen(createExecuteRequestRetryHandler(rxDocumentServiceRequest));
        return createPutMoreContentObservable(rxDocumentServiceRequest, "PUT").doOnNext(obj -> {
            applySessionToken(rxDocumentServiceRequest);
        }).flatMap(obj2 -> {
            return retryWhen;
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Document>> createDocument(String str, Object obj, RequestOptions requestOptions, boolean z) {
        return Observable.defer(() -> {
            try {
                this.logger.debug("Creating a Document. collectionLink: [{}]", str);
                String targetDocumentCollectionLink = getTargetDocumentCollectionLink(str, obj);
                boolean z2 = requestOptions == null || requestOptions.getPartitionKey() == null;
                Observable defer = Observable.defer(() -> {
                    return doCreate(getCreateDocumentRequest(targetDocumentCollectionLink, obj, requestOptions, z, OperationType.Create)).map(documentServiceResponse -> {
                        return BridgeInternal.toResourceResponse(documentServiceResponse, Document.class);
                    });
                });
                return z2 ? defer.retryWhen(RetryFunctionFactory.from(new CreateDocumentRetryHandler(this.collectionCache, targetDocumentCollectionLink))) : defer;
            } catch (Exception e) {
                this.logger.debug("Failure in creating a document due to [{}]", e.getMessage(), e);
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Document>> upsertDocument(String str, Object obj, RequestOptions requestOptions, boolean z) {
        return Observable.defer(() -> {
            try {
                this.logger.debug("Upserting a Document. collectionLink: [{}]", str);
                String targetDocumentCollectionLink = getTargetDocumentCollectionLink(str, obj);
                boolean z2 = requestOptions == null || requestOptions.getPartitionKey() == null;
                Observable defer = Observable.defer(() -> {
                    return doUpsert(getCreateDocumentRequest(targetDocumentCollectionLink, obj, requestOptions, z, OperationType.Upsert)).map(documentServiceResponse -> {
                        return BridgeInternal.toResourceResponse(documentServiceResponse, Document.class);
                    });
                });
                return z2 ? defer.retryWhen(RetryFunctionFactory.from(new CreateDocumentRetryHandler(this.collectionCache, targetDocumentCollectionLink))) : defer;
            } catch (Exception e) {
                this.logger.debug("Failure in upserting a document due to [{}]", e.getMessage(), e);
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Document>> replaceDocument(String str, Object obj, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                if (StringUtils.isEmpty(str)) {
                    throw new IllegalArgumentException("documentLink");
                }
                if (obj == null) {
                    throw new IllegalArgumentException("document");
                }
                return replaceDocumentInternal(str, BridgeInternal.documentFromObject(obj), requestOptions);
            } catch (Exception e) {
                this.logger.debug("Failure in replacing a document due to [{}]", e.getMessage());
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Document>> replaceDocument(Document document, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                if (document == null) {
                    throw new IllegalArgumentException("document");
                }
                return replaceDocumentInternal(document.getSelfLink(), document, requestOptions);
            } catch (Exception e) {
                this.logger.debug("Failure in replacing a database due to [{}]", e.getMessage());
                return Observable.error(e);
            }
        });
    }

    private Observable<ResourceResponse<Document>> replaceDocumentInternal(String str, Document document, RequestOptions requestOptions) throws DocumentClientException {
        if (document == null) {
            throw new IllegalArgumentException("document");
        }
        this.logger.debug("Replacing a Document. documentLink: [{}]", str);
        String targetDocumentCollectionLink = getTargetDocumentCollectionLink(Utils.getCollectionName(str), document);
        RxDocumentServiceRequest create = RxDocumentServiceRequest.create(OperationType.Replace, ResourceType.Document, Utils.joinPath(str, null), document, getRequestHeaders(requestOptions));
        boolean z = requestOptions == null || requestOptions.getPartitionKey() == null;
        addPartitionKeyInformation(create, document, requestOptions, this.collectionCache.resolveCollection(create));
        validateResource(document);
        Observable map = doReplace(create).map(documentServiceResponse -> {
            return BridgeInternal.toResourceResponse(documentServiceResponse, Document.class);
        });
        return z ? map.retryWhen(RetryFunctionFactory.from(new CreateDocumentRetryHandler(null, targetDocumentCollectionLink))) : map;
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Document>> deleteDocument(String str, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                if (StringUtils.isEmpty(str)) {
                    throw new IllegalArgumentException("documentLink");
                }
                this.logger.debug("Deleting a Document. documentLink: [{}]", str);
                RxDocumentServiceRequest create = RxDocumentServiceRequest.create(OperationType.Delete, ResourceType.Document, Utils.joinPath(str, null), getRequestHeaders(requestOptions));
                addPartitionKeyInformation(create, null, requestOptions, this.collectionCache.resolveCollection(create));
                return doDelete(create).map(documentServiceResponse -> {
                    return BridgeInternal.toResourceResponse(documentServiceResponse, Document.class);
                });
            } catch (Exception e) {
                this.logger.debug("Failure in deleting a document due to [{}]", e.getMessage());
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Document>> readDocument(String str, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                if (StringUtils.isEmpty(str)) {
                    throw new IllegalArgumentException("documentLink");
                }
                this.logger.debug("Reading a Document. documentLink: [{}]", str);
                RxDocumentServiceRequest create = RxDocumentServiceRequest.create(OperationType.Read, ResourceType.Document, Utils.joinPath(str, null), getRequestHeaders(requestOptions));
                addPartitionKeyInformation(create, null, requestOptions, this.collectionCache.resolveCollection(create));
                return doRead(create).map(documentServiceResponse -> {
                    return BridgeInternal.toResourceResponse(documentServiceResponse, Document.class);
                });
            } catch (Exception e) {
                this.logger.debug("Failure in reading a document due to [{}]", e.getMessage());
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Document>> readDocuments(String str, FeedOptions feedOptions) {
        return this.rxWrapperClient.readDocuments(str, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Document>> queryDocuments(String str, String str2, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryDocuments(str, str2, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Document>> queryDocuments(String str, String str2, FeedOptions feedOptions, Object obj) {
        return this.rxWrapperClient.queryDocuments(str, str2, feedOptions, obj);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Document>> queryDocuments(String str, SqlQuerySpec sqlQuerySpec, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryDocuments(str, sqlQuerySpec, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Document>> queryDocuments(String str, SqlQuerySpec sqlQuerySpec, FeedOptions feedOptions, Object obj) {
        return this.rxWrapperClient.queryDocuments(str, sqlQuerySpec, feedOptions, obj);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Document>> queryDocumentChangeFeed(String str, ChangeFeedOptions changeFeedOptions) {
        return this.rxWrapperClient.queryDocumentChangeFeed(str, changeFeedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<PartitionKeyRange>> readPartitionKeyRanges(String str, FeedOptions feedOptions) {
        return this.rxWrapperClient.readPartitionKeyRanges(str, feedOptions);
    }

    private RxDocumentServiceRequest getStoredProcedureRequest(String str, StoredProcedure storedProcedure, RequestOptions requestOptions, OperationType operationType) {
        if (StringUtils.isEmpty(str)) {
            throw new IllegalArgumentException("collectionLink");
        }
        if (storedProcedure == null) {
            throw new IllegalArgumentException("storedProcedure");
        }
        validateResource(storedProcedure);
        return RxDocumentServiceRequest.create(operationType, ResourceType.StoredProcedure, Utils.joinPath(str, "sprocs"), storedProcedure, getRequestHeaders(requestOptions));
    }

    private RxDocumentServiceRequest getUserDefinedFunctionRequest(String str, UserDefinedFunction userDefinedFunction, RequestOptions requestOptions, OperationType operationType) {
        if (StringUtils.isEmpty(str)) {
            throw new IllegalArgumentException("collectionLink");
        }
        if (userDefinedFunction == null) {
            throw new IllegalArgumentException("udf");
        }
        validateResource(userDefinedFunction);
        return RxDocumentServiceRequest.create(operationType, ResourceType.UserDefinedFunction, Utils.joinPath(str, "udfs"), userDefinedFunction, getRequestHeaders(requestOptions));
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<StoredProcedure>> createStoredProcedure(String str, StoredProcedure storedProcedure, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                this.logger.debug("Creating a StoredProcedure. collectionLink: [{}], storedProcedure id [{}]", str, storedProcedure.getId());
                return doCreate(getStoredProcedureRequest(str, storedProcedure, requestOptions, OperationType.Create)).map(documentServiceResponse -> {
                    return BridgeInternal.toResourceResponse(documentServiceResponse, StoredProcedure.class);
                });
            } catch (Exception e) {
                this.logger.debug("Failure in creating a StoredProcedure due to [{}]", e.getMessage(), e);
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<StoredProcedure>> upsertStoredProcedure(String str, StoredProcedure storedProcedure, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                this.logger.debug("Upserting a StoredProcedure. collectionLink: [{}], storedProcedure id [{}]", str, storedProcedure.getId());
                return doUpsert(getStoredProcedureRequest(str, storedProcedure, requestOptions, OperationType.Upsert)).map(documentServiceResponse -> {
                    return BridgeInternal.toResourceResponse(documentServiceResponse, StoredProcedure.class);
                });
            } catch (Exception e) {
                this.logger.debug("Failure in upserting a StoredProcedure due to [{}]", e.getMessage(), e);
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<StoredProcedure>> replaceStoredProcedure(StoredProcedure storedProcedure, RequestOptions requestOptions) {
        return this.rxWrapperClient.replaceStoredProcedure(storedProcedure, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<StoredProcedure>> deleteStoredProcedure(String str, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                if (StringUtils.isEmpty(str)) {
                    throw new IllegalArgumentException("storedProcedureLink");
                }
                this.logger.debug("Deleting a StoredProcedure. storedProcedureLink [{}]", str);
                return doDelete(RxDocumentServiceRequest.create(OperationType.Delete, ResourceType.StoredProcedure, Utils.joinPath(str, null), getRequestHeaders(requestOptions))).map(documentServiceResponse -> {
                    return BridgeInternal.toResourceResponse(documentServiceResponse, StoredProcedure.class);
                });
            } catch (Exception e) {
                this.logger.debug("Failure in deleting a StoredProcedure due to [{}]", e.getMessage(), e);
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<StoredProcedure>> readStoredProcedure(String str, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                if (StringUtils.isEmpty(str)) {
                    throw new IllegalArgumentException("storedProcedureLink");
                }
                this.logger.debug("Reading a StoredProcedure. storedProcedureLink [{}]", str);
                return doRead(RxDocumentServiceRequest.create(OperationType.Read, ResourceType.StoredProcedure, Utils.joinPath(str, null), getRequestHeaders(requestOptions))).map(documentServiceResponse -> {
                    return BridgeInternal.toResourceResponse(documentServiceResponse, StoredProcedure.class);
                });
            } catch (Exception e) {
                this.logger.debug("Failure in reading a StoredProcedure due to [{}]", e.getMessage(), e);
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<StoredProcedure>> readStoredProcedures(String str, FeedOptions feedOptions) {
        return this.rxWrapperClient.readStoredProcedures(str, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<StoredProcedure>> queryStoredProcedures(String str, String str2, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryStoredProcedures(str, str2, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<StoredProcedure>> queryStoredProcedures(String str, SqlQuerySpec sqlQuerySpec, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryStoredProcedures(str, sqlQuerySpec, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<StoredProcedureResponse> executeStoredProcedure(String str, Object[] objArr) {
        return this.rxWrapperClient.executeStoredProcedure(str, objArr);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<StoredProcedureResponse> executeStoredProcedure(String str, RequestOptions requestOptions, Object[] objArr) {
        return this.rxWrapperClient.executeStoredProcedure(str, requestOptions, objArr);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Trigger>> createTrigger(String str, Trigger trigger, RequestOptions requestOptions) {
        return this.rxWrapperClient.createTrigger(str, trigger, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Trigger>> upsertTrigger(String str, Trigger trigger, RequestOptions requestOptions) {
        return this.rxWrapperClient.upsertTrigger(str, trigger, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Trigger>> replaceTrigger(Trigger trigger, RequestOptions requestOptions) {
        return this.rxWrapperClient.replaceTrigger(trigger, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Trigger>> deleteTrigger(String str, RequestOptions requestOptions) {
        return this.rxWrapperClient.deleteTrigger(str, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Trigger>> readTrigger(String str, RequestOptions requestOptions) {
        return this.rxWrapperClient.readTrigger(str, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Trigger>> readTriggers(String str, FeedOptions feedOptions) {
        return this.rxWrapperClient.readTriggers(str, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Trigger>> queryTriggers(String str, String str2, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryTriggers(str, str2, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Trigger>> queryTriggers(String str, SqlQuerySpec sqlQuerySpec, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryTriggers(str, sqlQuerySpec, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<UserDefinedFunction>> createUserDefinedFunction(String str, UserDefinedFunction userDefinedFunction, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                this.logger.debug("Creating a UserDefinedFunction. collectionLink [{}], udf id [{}]", str, userDefinedFunction.getId());
                return doCreate(getUserDefinedFunctionRequest(str, userDefinedFunction, requestOptions, OperationType.Create)).map(documentServiceResponse -> {
                    return BridgeInternal.toResourceResponse(documentServiceResponse, UserDefinedFunction.class);
                });
            } catch (Exception e) {
                this.logger.debug("Failure in creating a UserDefinedFunction due to [{}]", e.getMessage(), e);
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<UserDefinedFunction>> upsertUserDefinedFunction(String str, UserDefinedFunction userDefinedFunction, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                this.logger.debug("Upserting a UserDefinedFunction. collectionLink [{}], udf id [{}]", str, userDefinedFunction.getId());
                return doUpsert(getUserDefinedFunctionRequest(str, userDefinedFunction, requestOptions, OperationType.Upsert)).map(documentServiceResponse -> {
                    return BridgeInternal.toResourceResponse(documentServiceResponse, UserDefinedFunction.class);
                });
            } catch (Exception e) {
                this.logger.debug("Failure in upserting a UserDefinedFunction due to [{}]", e.getMessage(), e);
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<UserDefinedFunction>> replaceUserDefinedFunction(UserDefinedFunction userDefinedFunction, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                if (userDefinedFunction == null) {
                    throw new IllegalArgumentException("udf");
                }
                this.logger.debug("Replacing a UserDefinedFunction. udf id [{}]", userDefinedFunction.getId());
                validateResource(userDefinedFunction);
                return doReplace(RxDocumentServiceRequest.create(OperationType.Replace, ResourceType.UserDefinedFunction, Utils.joinPath(userDefinedFunction.getSelfLink(), null), userDefinedFunction, getRequestHeaders(requestOptions))).map(documentServiceResponse -> {
                    return BridgeInternal.toResourceResponse(documentServiceResponse, UserDefinedFunction.class);
                });
            } catch (Exception e) {
                this.logger.debug("Failure in replacing a UserDefinedFunction due to [{}]", e.getMessage(), e);
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<UserDefinedFunction>> deleteUserDefinedFunction(String str, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                if (StringUtils.isEmpty(str)) {
                    throw new IllegalArgumentException("udfLink");
                }
                this.logger.debug("Deleting a UserDefinedFunction. udfLink [{}]", str);
                return doDelete(RxDocumentServiceRequest.create(OperationType.Delete, ResourceType.UserDefinedFunction, Utils.joinPath(str, null), getRequestHeaders(requestOptions))).map(documentServiceResponse -> {
                    return BridgeInternal.toResourceResponse(documentServiceResponse, UserDefinedFunction.class);
                });
            } catch (Exception e) {
                this.logger.debug("Failure in deleting a UserDefinedFunction due to [{}]", e.getMessage(), e);
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<UserDefinedFunction>> readUserDefinedFunction(String str, RequestOptions requestOptions) {
        return Observable.defer(() -> {
            try {
                if (StringUtils.isEmpty(str)) {
                    throw new IllegalArgumentException("udfLink");
                }
                this.logger.debug("Reading a UserDefinedFunction. udfLink [{}]", str);
                return doRead(RxDocumentServiceRequest.create(OperationType.Read, ResourceType.UserDefinedFunction, Utils.joinPath(str, null), getRequestHeaders(requestOptions))).map(documentServiceResponse -> {
                    return BridgeInternal.toResourceResponse(documentServiceResponse, UserDefinedFunction.class);
                });
            } catch (Exception e) {
                this.logger.debug("Failure in reading a UserDefinedFunction due to [{}]", e.getMessage(), e);
                return Observable.error(e);
            }
        });
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<UserDefinedFunction>> readUserDefinedFunctions(String str, FeedOptions feedOptions) {
        return this.rxWrapperClient.readUserDefinedFunctions(str, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<UserDefinedFunction>> queryUserDefinedFunctions(String str, String str2, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryUserDefinedFunctions(str, str2, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<UserDefinedFunction>> queryUserDefinedFunctions(String str, SqlQuerySpec sqlQuerySpec, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryUserDefinedFunctions(str, sqlQuerySpec, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Attachment>> createAttachment(String str, Attachment attachment, RequestOptions requestOptions) {
        return this.rxWrapperClient.createAttachment(str, attachment, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Attachment>> upsertAttachment(String str, Attachment attachment, RequestOptions requestOptions) {
        return this.rxWrapperClient.upsertAttachment(str, attachment, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Attachment>> replaceAttachment(Attachment attachment, RequestOptions requestOptions) {
        return this.rxWrapperClient.replaceAttachment(attachment, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Attachment>> deleteAttachment(String str, RequestOptions requestOptions) {
        return this.rxWrapperClient.deleteAttachment(str, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Attachment>> readAttachment(String str, RequestOptions requestOptions) {
        return this.rxWrapperClient.readAttachment(str, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Attachment>> readAttachments(String str, FeedOptions feedOptions) {
        return this.rxWrapperClient.readAttachments(str, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Attachment>> queryAttachments(String str, String str2, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryAttachments(str, str2, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Attachment>> queryAttachments(String str, SqlQuerySpec sqlQuerySpec, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryAttachments(str, sqlQuerySpec, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Attachment>> createAttachment(String str, InputStream inputStream, MediaOptions mediaOptions) {
        return this.rxWrapperClient.createAttachment(str, inputStream, mediaOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Attachment>> upsertAttachment(String str, InputStream inputStream, MediaOptions mediaOptions) {
        return this.rxWrapperClient.upsertAttachment(str, inputStream, mediaOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<MediaResponse> readMedia(String str) {
        return this.rxWrapperClient.readMedia(str);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<MediaResponse> updateMedia(String str, InputStream inputStream, MediaOptions mediaOptions) {
        return this.rxWrapperClient.updateMedia(str, inputStream, mediaOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Conflict>> readConflict(String str, RequestOptions requestOptions) {
        return this.rxWrapperClient.readConflict(str, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Conflict>> readConflicts(String str, FeedOptions feedOptions) {
        return this.rxWrapperClient.readConflicts(str, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Conflict>> queryConflicts(String str, String str2, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryConflicts(str, str2, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Conflict>> queryConflicts(String str, SqlQuerySpec sqlQuerySpec, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryConflicts(str, sqlQuerySpec, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Conflict>> deleteConflict(String str, RequestOptions requestOptions) {
        return this.rxWrapperClient.deleteConflict(str, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<User>> createUser(String str, User user, RequestOptions requestOptions) {
        return this.rxWrapperClient.createUser(str, user, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<User>> upsertUser(String str, User user, RequestOptions requestOptions) {
        return this.rxWrapperClient.upsertUser(str, user, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<User>> replaceUser(User user, RequestOptions requestOptions) {
        return this.rxWrapperClient.replaceUser(user, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<User>> deleteUser(String str, RequestOptions requestOptions) {
        return this.rxWrapperClient.deleteUser(str, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<User>> readUser(String str, RequestOptions requestOptions) {
        return this.rxWrapperClient.readUser(str, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<User>> readUsers(String str, FeedOptions feedOptions) {
        return this.rxWrapperClient.readUsers(str, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<User>> queryUsers(String str, String str2, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryUsers(str, str2, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<User>> queryUsers(String str, SqlQuerySpec sqlQuerySpec, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryUsers(str, sqlQuerySpec, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Permission>> createPermission(String str, Permission permission, RequestOptions requestOptions) {
        return this.rxWrapperClient.createPermission(str, permission, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Permission>> upsertPermission(String str, Permission permission, RequestOptions requestOptions) {
        return this.rxWrapperClient.upsertPermission(str, permission, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Permission>> replacePermission(Permission permission, RequestOptions requestOptions) {
        return this.rxWrapperClient.replacePermission(permission, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Permission>> deletePermission(String str, RequestOptions requestOptions) {
        return this.rxWrapperClient.deletePermission(str, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Permission>> readPermission(String str, RequestOptions requestOptions) {
        return this.rxWrapperClient.readPermission(str, requestOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Permission>> readPermissions(String str, FeedOptions feedOptions) {
        return this.rxWrapperClient.readPermissions(str, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Permission>> queryPermissions(String str, String str2, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryPermissions(str, str2, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Permission>> queryPermissions(String str, SqlQuerySpec sqlQuerySpec, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryPermissions(str, sqlQuerySpec, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Offer>> replaceOffer(Offer offer) {
        return this.rxWrapperClient.replaceOffer(offer);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<ResourceResponse<Offer>> readOffer(String str) {
        return this.rxWrapperClient.readOffer(str);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Offer>> readOffers(FeedOptions feedOptions) {
        return this.rxWrapperClient.readOffers(feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Offer>> queryOffers(String str, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryOffers(str, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<FeedResponsePage<Offer>> queryOffers(SqlQuerySpec sqlQuerySpec, FeedOptions feedOptions) {
        return this.rxWrapperClient.queryOffers(sqlQuerySpec, feedOptions);
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public Observable<DatabaseAccount> getDatabaseAccount() {
        return this.rxWrapperClient.getDatabaseAccount();
    }

    public Observable<DatabaseAccount> getDatabaseAccountFromEndpoint(URI uri) {
        return Observable.defer(() -> {
            RxDocumentServiceRequest create = RxDocumentServiceRequest.create(OperationType.Read, ResourceType.DatabaseAccount, "", (Map<String, String>) null);
            putMoreContentIntoDocumentServiceRequest(create, "GET");
            create.setEndpointOverride(uri);
            return this.gatewayProxy.doRead(create).doOnError(th -> {
                Throwable cause = th.getCause();
                this.logger.warn(cause != null ? String.format("Failed to retrieve database account information. %s", cause.toString()) : String.format("Failed to retrieve database account information. %s", th.toString()));
            }).map(documentServiceResponse -> {
                return (DatabaseAccount) documentServiceResponse.getResource(DatabaseAccount.class);
            });
        });
    }

    private void safeShutdownExecutorService(ExecutorService executorService) {
        if (executorService == null) {
            return;
        }
        try {
            executorService.shutdown();
            executorService.awaitTermination(15L, TimeUnit.SECONDS);
        } catch (Exception e) {
            this.logger.warn("Failure in shutting down a executor service", (Throwable) e);
        }
    }

    @Override // com.microsoft.azure.documentdb.rx.AsyncDocumentClient
    public void close() {
        safeShutdownExecutorService(this.collectionCacheExecutorService);
        safeShutdownExecutorService(this.computationExecutor);
        try {
            this.rxWrapperClient.close();
        } catch (Exception e) {
            this.logger.warn("Failure in shutting down rxWrapperClient", (Throwable) e);
        }
        try {
            this.rxClient.shutdown();
        } catch (Exception e2) {
            this.logger.warn("Failure in shutting down rxClient", (Throwable) e2);
        }
    }

    private Func1<Observable<? extends Throwable>, Observable<Long>> createExecuteRequestRetryHandler(RxDocumentServiceRequest rxDocumentServiceRequest) {
        return RetryFunctionFactory.from(new ExecuteDocumentClientRequestRetryHandler(rxDocumentServiceRequest, this.globalEndpointManager, this));
    }

    static {
        $assertionsDisabled = !RxDocumentClientImpl.class.desiredAssertionStatus();
    }
}
