package com.microsoft.azure.documentdb.bulkexecutor;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.AsyncCallable;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.microsoft.azure.documentdb.Document;
import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.FeedOptions;
import com.microsoft.azure.documentdb.PartitionKeyDefinition;
import com.microsoft.azure.documentdb.PartitionKeyRange;
import com.microsoft.azure.documentdb.RetryOptions;
import com.microsoft.azure.documentdb.bulkexecutor.internal.BatchDeleter;
import com.microsoft.azure.documentdb.bulkexecutor.internal.BatchInserter;
import com.microsoft.azure.documentdb.bulkexecutor.internal.BatchUpdater;
import com.microsoft.azure.documentdb.bulkexecutor.internal.BulkDeleteQuerySpec;
import com.microsoft.azure.documentdb.bulkexecutor.internal.BulkImportStoredProcedureOptions;
import com.microsoft.azure.documentdb.bulkexecutor.internal.CongestionController;
import com.microsoft.azure.documentdb.bulkexecutor.internal.DocumentAnalyzer;
import com.microsoft.azure.documentdb.bulkexecutor.internal.ExceptionUtils;
import com.microsoft.azure.documentdb.internal.routing.CollectionRoutingMap;
import com.microsoft.azure.documentdb.internal.routing.InMemoryCollectionRoutingMap;
import com.microsoft.azure.documentdb.internal.routing.PartitionKeyInternal;
import com.microsoft.azure.documentdb.internal.routing.Range;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/microsoft/azure/documentdb/bulkexecutor/DocumentBulkExecutor.class */
public class DocumentBulkExecutor implements AutoCloseable {
    private static final String BULK_IMPORT_STORED_PROCECURE_NAME = "__.sys.commonBulkInsert";
    private static final String BULK_UPDATE_STORED_PROCECURE_NAME = "__.sys.bulkPatch";
    private static final String BULK_DELETE_STORED_PROCECURE_NAME = "__.sys.commonDelete";
    private static final int MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE = 1101005;
    private static final double FRACTION_OF_MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE_ALLOWED = 0.2d;
    private static final int INITIALIZATION_SLEEP_TIME_ON_THROTTLING = 500;
    private static final int DEFAULT_BULK_DELETE_BATCH_SIZE = 1000;
    private final Logger logger;
    private final Map<String, Integer> partitionKeyRangeIdToInferredDegreeOfParallelism;
    private static final String SQL_QUERY_REGEX_PATTERN = "(?i)select\\s+\\*\\s+(?i)from\\s+(?<root>\\w+)(?:\\s+(?i)where(?<filter>.+))?";
    private static final Pattern BULK_DELETE_QUERY_SPEC_PATTERN = Pattern.compile(SQL_QUERY_REGEX_PATTERN);
    private final ListeningExecutorService listeningExecutorService;
    private final DocumentClient client;
    private final String collectionLink;
    private final PartitionKeyDefinition partitionKeyDefinition;
    private List<String> partitionKeyRangeIds;
    private CollectionRoutingMap collectionRoutingMap;
    private String bulkImportStoredProcLink;
    private String bulkUpdateStoredProcLink;
    private String bulkDeleteStoredProcLink;
    private int collectionThroughput;
    private int maxMiniBatchSize;
    private int maxUpdateMiniBatchCount;
    private RetryOptions retryOptions;

    /* loaded from: input_file:com/microsoft/azure/documentdb/bulkexecutor/DocumentBulkExecutor$Builder.class */
    public static class Builder {
        private DocumentClient client;
        private String collectionLink;
        private int maxMiniBatchSize;
        private int maxUpdateMiniBatchCount;
        private static final int DEFAULT_RETRY_ATTEMPT_ON_THROTTLING_FOR_INIT = 200;
        private static final int DEFAULT_WAIT_TIME_ON_THROTTLING_FOR_INIT_IN_SECONDS = 60;
        private PartitionKeyDefinition partitionKeyDef;
        private int offerThroughput;
        private static RetryOptions DEFAULT_INIT_RETRY_OPTIONS = new RetryOptions();
        private RetryOptions retryOptions;

        public Builder from(DocumentClient documentClient, String str, String str2, PartitionKeyDefinition partitionKeyDefinition, int i) {
            this.client = documentClient;
            this.collectionLink = String.format("/dbs/%s/colls/%s", str, str2);
            this.partitionKeyDef = partitionKeyDefinition;
            this.offerThroughput = i;
            return this;
        }

        public Builder withMaxMiniBatchSize(int i) {
            Preconditions.checkArgument(i > 0, "maxMiniBatchSize cannot be negative");
            Preconditions.checkArgument(i <= DocumentBulkExecutor.MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE, "maxMiniBatchSize cannot be negative");
            this.maxMiniBatchSize = i;
            return this;
        }

        public Builder withMaxUpdateMiniBatchCount(int i) {
            Preconditions.checkArgument(i > 0, "maxUpdateMiniBatchCount cannot be negative");
            this.maxUpdateMiniBatchCount = i;
            return this;
        }

        public Builder withInitializationRetryOptions(RetryOptions retryOptions) {
            this.retryOptions = retryOptions;
            return this;
        }

        public DocumentBulkExecutor build() throws Exception {
            DocumentBulkExecutor documentBulkExecutor = new DocumentBulkExecutor(this.client, this.collectionLink, this.partitionKeyDef, this.offerThroughput);
            try {
                documentBulkExecutor.setInitializationRetryOptions(this.retryOptions);
                documentBulkExecutor.setMaxMiniBatchSize(this.maxMiniBatchSize);
                documentBulkExecutor.setMaxUpdateMiniBatchCount(this.maxUpdateMiniBatchCount);
                documentBulkExecutor.safeInit();
                return documentBulkExecutor;
            } catch (Exception e) {
                documentBulkExecutor.close();
                throw e;
            }
        }

        private Builder() {
            this.maxMiniBatchSize = (int) Math.floor(220201.0d);
            this.maxUpdateMiniBatchCount = 500;
            this.retryOptions = DEFAULT_INIT_RETRY_OPTIONS;
        }

        static {
            DEFAULT_INIT_RETRY_OPTIONS.setMaxRetryAttemptsOnThrottledRequests(200);
            DEFAULT_INIT_RETRY_OPTIONS.setMaxRetryWaitTimeInSeconds(60);
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setMaxMiniBatchSize(int i) {
        this.maxMiniBatchSize = i;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setMaxUpdateMiniBatchCount(int i) {
        this.maxUpdateMiniBatchCount = i;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setInitializationRetryOptions(RetryOptions retryOptions) {
        this.retryOptions = retryOptions;
    }

    private DocumentBulkExecutor(DocumentClient documentClient, String str, PartitionKeyDefinition partitionKeyDefinition, int i) {
        this.logger = LoggerFactory.getLogger(DocumentBulkExecutor.class);
        this.partitionKeyRangeIdToInferredDegreeOfParallelism = new ConcurrentHashMap();
        Preconditions.checkNotNull(documentClient, "client cannot be null");
        Preconditions.checkNotNull(partitionKeyDefinition, "partitionKeyDefinition cannot be null");
        Preconditions.checkNotNull(str, "collectionLink cannot be null");
        Preconditions.checkArgument(i > 0, "collection throughput is less than 10,000");
        this.client = documentClient;
        this.collectionLink = str;
        this.collectionThroughput = i;
        this.partitionKeyDefinition = partitionKeyDefinition;
        this.listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void safeInit() throws Exception {
        int i = 0;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                initialize();
                return;
            } catch (Exception e) {
                i++;
                DocumentClientException throttelingException = ExceptionUtils.getThrottelingException(e);
                long currentTimeMillis2 = System.currentTimeMillis();
                if (i >= this.retryOptions.getMaxRetryAttemptsOnThrottledRequests() || currentTimeMillis2 - currentTimeMillis >= this.retryOptions.getMaxRetryWaitTimeInSeconds() * 1000 || throttelingException == null || throttelingException.getStatusCode() != 429) {
                    throw e;
                }
                Thread.sleep((i * throttelingException.getRetryAfterInMilliseconds()) + 500);
            }
        }
        throw e;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.listeningExecutorService.shutdown();
        try {
            if (!this.listeningExecutorService.awaitTermination(60L, TimeUnit.SECONDS)) {
                this.listeningExecutorService.shutdownNow();
                if (!this.listeningExecutorService.awaitTermination(60L, TimeUnit.SECONDS)) {
                    this.logger.error("some tasks did not terminate");
                }
            }
        } catch (InterruptedException e) {
            this.listeningExecutorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    private void initialize() throws DocumentClientException {
        this.logger.debug("Initializing ...");
        this.bulkImportStoredProcLink = String.format("%s/sprocs/%s", this.collectionLink, BULK_IMPORT_STORED_PROCECURE_NAME);
        this.bulkUpdateStoredProcLink = String.format("%s/sprocs/%s", this.collectionLink, BULK_UPDATE_STORED_PROCECURE_NAME);
        this.bulkDeleteStoredProcLink = String.format("%s/sprocs/%s", this.collectionLink, BULK_DELETE_STORED_PROCECURE_NAME);
        this.logger.debug("Fetching partition map of collection");
        Range<String> range = new Range<>(PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey, PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey, true, false);
        this.collectionRoutingMap = getCollectionRoutingMap(this.client, this.collectionLink);
        this.partitionKeyRangeIds = (List) this.collectionRoutingMap.getOverlappingRanges(range).stream().map(partitionKeyRange -> {
            return partitionKeyRange.getId();
        }).collect(Collectors.toList());
        this.logger.debug("Initialization completed");
    }

    public BulkImportResponse importAll(Collection<String> collection, boolean z, boolean z2, Integer num) throws DocumentClientException {
        return executeBulkImportInternal(collection, z, z2, num);
    }

    public BulkUpdateResponse updateAll(Collection<UpdateItem> collection, Integer num) throws DocumentClientException {
        return executeBulkUpdateInternal(collection, num);
    }

    public BulkUpdateResponse mergeAll(Collection<Document> collection, Integer num) throws DocumentClientException {
        return executeBulkUpdateWithPatchInternal(collection, num);
    }

    public BulkDeleteResponse deleteAll(String str) throws DocumentClientException {
        return executeBulkDeleteInternal(str);
    }

    private BulkUpdateResponse updateDocument(String str, String str2, List<UpdateOperationBase> list) throws DocumentClientException {
        return executeUpdateDocumentInternal(str, str2, list);
    }

    private BulkImportResponse executeBulkImportInternal(Collection<String> collection, boolean z, boolean z2, Integer num) throws DocumentClientException {
        Preconditions.checkNotNull(collection, "document collection cannot be null");
        try {
            return executeBulkImportAsyncImpl(collection, z, z2, num).get();
        } catch (ExecutionException e) {
            this.logger.error("Failed to import documents", (Throwable) e);
            Throwable cause = e.getCause();
            if (cause instanceof Exception) {
                throw toDocumentClientException((Exception) cause);
            }
            throw toDocumentClientException(e);
        } catch (Exception e2) {
            this.logger.error("Failed to import documents", (Throwable) e2);
            throw toDocumentClientException(e2);
        }
    }

    private BulkUpdateResponse executeBulkUpdateInternal(Collection<UpdateItem> collection, Integer num) throws DocumentClientException {
        Preconditions.checkNotNull(collection, "update items cannot be null");
        try {
            return executeBulkUpdateAsyncImpl(collection, num).get();
        } catch (ExecutionException e) {
            this.logger.error("Failed to update documents", (Throwable) e);
            Throwable cause = e.getCause();
            if (cause instanceof Exception) {
                throw toDocumentClientException((Exception) cause);
            }
            throw toDocumentClientException(e);
        } catch (Exception e2) {
            this.logger.error("Failed to update documents", (Throwable) e2);
            throw toDocumentClientException(e2);
        }
    }

    private BulkUpdateResponse executeBulkUpdateWithPatchInternal(Collection<Document> collection, Integer num) throws DocumentClientException {
        Preconditions.checkNotNull(collection, "patch documents cannot be null");
        try {
            return executeBulkUpdateWithPatchAsyncImpl(collection, num).get();
        } catch (ExecutionException e) {
            this.logger.error("Failed to update documents", (Throwable) e);
            Throwable cause = e.getCause();
            if (cause instanceof Exception) {
                throw toDocumentClientException((Exception) cause);
            }
            throw toDocumentClientException(e);
        } catch (Exception e2) {
            this.logger.error("Failed to update documents", (Throwable) e2);
            throw toDocumentClientException(e2);
        }
    }

    private BulkUpdateResponse executeUpdateDocumentInternal(String str, String str2, List<UpdateOperationBase> list) throws DocumentClientException {
        Preconditions.checkNotNull(str, "partitionKey cannot be null");
        Preconditions.checkNotNull(str2, "id cannot be null");
        Preconditions.checkNotNull(list, "update operations cannot be null");
        try {
            return executeUpdateDocumentAsyncImpl(str, str2, list).get();
        } catch (ExecutionException e) {
            this.logger.error("Failed to update document", (Throwable) e);
            Throwable cause = e.getCause();
            if (cause instanceof Exception) {
                throw toDocumentClientException((Exception) cause);
            }
            throw toDocumentClientException(e);
        } catch (Exception e2) {
            this.logger.error("Failed to update document", (Throwable) e2);
            throw toDocumentClientException(e2);
        }
    }

    private BulkDeleteResponse executeBulkDeleteInternal(String str) throws DocumentClientException {
        Preconditions.checkNotNull(str, "query to fetch documents to delete cannot be null");
        try {
            return executeBulkDeleteAsyncImpl(str).get();
        } catch (ExecutionException e) {
            this.logger.error("Failed to delete document", (Throwable) e);
            Throwable cause = e.getCause();
            if (cause instanceof Exception) {
                throw toDocumentClientException((Exception) cause);
            }
            throw toDocumentClientException(e);
        } catch (Exception e2) {
            this.logger.error("Failed to delete document", (Throwable) e2);
            throw toDocumentClientException(e2);
        }
    }

    private ListenableFuture<BulkImportResponse> executeBulkImportAsyncImpl(Collection<String> collection, boolean z, boolean z2, Integer num) throws Exception {
        final Stopwatch createStarted = Stopwatch.createStarted();
        BulkImportStoredProcedureOptions bulkImportStoredProcedureOptions = new BulkImportStoredProcedureOptions(z2, false, null, false, z, true);
        this.logger.debug("Bucketing documents ...");
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
        for (String str : this.partitionKeyRangeIds) {
            concurrentHashMap.put(str, ConcurrentHashMap.newKeySet(collection.size() / this.partitionKeyRangeIds.size()));
            concurrentHashMap2.put(str, new ArrayList(1000));
        }
        collection.parallelStream().forEach(str2 -> {
            ((Set) concurrentHashMap.get(this.collectionRoutingMap.getRangeByEffectivePartitionKey(DocumentAnalyzer.extractPartitionKeyValue(str2, this.partitionKeyDefinition).getEffectivePartitionKeyString(this.partitionKeyDefinition, true)).getId())).add(str2);
        });
        this.logger.debug("Creating mini batches within each partition bucket");
        concurrentHashMap.entrySet().parallelStream().forEach(entry -> {
            String str3 = (String) entry.getKey();
            Iterator it = ((Set) entry.getValue()).iterator();
            ArrayList arrayList = new ArrayList(500);
            int i = 0;
            while (true) {
                int i2 = i;
                if (!it.hasNext()) {
                    break;
                }
                String str4 = (String) it.next();
                int documentSize = getDocumentSize(str4);
                if (i2 + documentSize <= this.maxMiniBatchSize) {
                    arrayList.add(str4);
                    i = i2 + documentSize;
                } else {
                    ((List) concurrentHashMap2.get(str3)).add(arrayList);
                    arrayList = new ArrayList(500);
                    arrayList.add(str4);
                    i = documentSize;
                }
            }
            if (arrayList.size() > 0) {
                ((List) concurrentHashMap2.get(str3)).add(arrayList);
            }
        });
        this.logger.debug("Beginning bulk import within each partition bucket");
        final HashMap hashMap = new HashMap();
        final HashMap hashMap2 = new HashMap();
        this.logger.debug("Preprocessing took: " + createStarted.elapsed().toMillis() + " millis");
        ArrayList arrayList = new ArrayList();
        for (String str3 : this.partitionKeyRangeIds) {
            BatchInserter batchInserter = new BatchInserter(str3, (List) concurrentHashMap2.get(str3), this.client, this.bulkImportStoredProcLink, bulkImportStoredProcedureOptions);
            hashMap.put(str3, batchInserter);
            CongestionController congestionController = new CongestionController(this.listeningExecutorService, this.collectionThroughput / this.partitionKeyRangeIds.size(), str3, batchInserter, this.partitionKeyRangeIdToInferredDegreeOfParallelism.get(str3), num);
            hashMap2.put(str3, congestionController);
            arrayList.add(congestionController.executeAllAsync());
        }
        return Futures.whenAllComplete(arrayList).callAsync(new AsyncCallable<BulkImportResponse>() { // from class: com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor.1
            @Override // com.google.common.util.concurrent.AsyncCallable
            public ListenableFuture<BulkImportResponse> call() throws Exception {
                ArrayList arrayList2 = new ArrayList();
                ArrayList arrayList3 = new ArrayList();
                for (String str4 : DocumentBulkExecutor.this.partitionKeyRangeIds) {
                    CongestionController congestionController2 = (CongestionController) hashMap2.get(str4);
                    arrayList2.addAll(congestionController2.getFailures());
                    arrayList3.addAll(((BatchInserter) hashMap.get(str4)).getBadInputDocuments());
                    DocumentBulkExecutor.this.partitionKeyRangeIdToInferredDegreeOfParallelism.put(str4, Integer.valueOf(congestionController2.getDegreeOfConcurrency()));
                }
                int sum = hashMap.values().stream().mapToInt(batchInserter2 -> {
                    return batchInserter2.getNumberOfDocumentsImported();
                }).sum();
                double sum2 = hashMap.values().stream().mapToDouble(batchInserter3 -> {
                    return batchInserter3.getTotalRequestUnitsConsumed();
                }).sum();
                createStarted.stop();
                return Futures.immediateFuture(new BulkImportResponse(sum, sum2, createStarted.elapsed(), arrayList2, arrayList3));
            }
        }, this.listeningExecutorService);
    }

    private ListenableFuture<BulkUpdateResponse> executeBulkUpdateAsyncImpl(Collection<UpdateItem> collection, Integer num) {
        final Stopwatch createStarted = Stopwatch.createStarted();
        this.logger.debug("Bucketing update items ...");
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
        for (String str : this.partitionKeyRangeIds) {
            concurrentHashMap.put(str, ConcurrentHashMap.newKeySet(collection.size() / this.partitionKeyRangeIds.size()));
            concurrentHashMap2.put(str, new ArrayList(1000));
        }
        collection.parallelStream().forEach(updateItem -> {
            ((Set) concurrentHashMap.get(this.collectionRoutingMap.getRangeByEffectivePartitionKey(DocumentAnalyzer.fromPartitionKeyvalue(updateItem.getPartitionKeyValue()).getEffectivePartitionKeyString(this.partitionKeyDefinition, true)).getId())).add(updateItem);
        });
        this.logger.debug("Creating mini batches within each partition bucket");
        concurrentHashMap.entrySet().parallelStream().forEach(entry -> {
            String str2 = (String) entry.getKey();
            ArrayList arrayList = new ArrayList(500);
            int i = 0;
            for (UpdateItem updateItem2 : (Set) entry.getValue()) {
                if (i + 1 <= this.maxUpdateMiniBatchCount) {
                    arrayList.add(updateItem2);
                    i++;
                } else {
                    ((List) concurrentHashMap2.get(str2)).add(arrayList);
                    arrayList = new ArrayList(500);
                    arrayList.add(updateItem2);
                    i = 1;
                }
            }
            if (arrayList.size() > 0) {
                ((List) concurrentHashMap2.get(str2)).add(arrayList);
            }
        });
        this.logger.debug("Beginning bulk update within each partition bucket");
        final HashMap hashMap = new HashMap();
        final HashMap hashMap2 = new HashMap();
        this.logger.debug("Preprocessing took: " + createStarted.elapsed().toMillis() + " millis");
        ArrayList arrayList = new ArrayList();
        String replaceFirst = this.partitionKeyDefinition.getPaths().iterator().next().replaceFirst("^/", "");
        for (String str2 : this.partitionKeyRangeIds) {
            BatchUpdater batchUpdater = new BatchUpdater(str2, (List) concurrentHashMap2.get(str2), this.client, this.bulkUpdateStoredProcLink, replaceFirst);
            hashMap.put(str2, batchUpdater);
            CongestionController congestionController = new CongestionController(this.listeningExecutorService, this.collectionThroughput / this.partitionKeyRangeIds.size(), str2, batchUpdater, this.partitionKeyRangeIdToInferredDegreeOfParallelism.get(str2), num);
            hashMap2.put(str2, congestionController);
            arrayList.add(congestionController.executeAllAsync());
        }
        return Futures.whenAllComplete(arrayList).callAsync(new AsyncCallable<BulkUpdateResponse>() { // from class: com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor.2
            @Override // com.google.common.util.concurrent.AsyncCallable
            public ListenableFuture<BulkUpdateResponse> call() throws Exception {
                ArrayList arrayList2 = new ArrayList();
                for (String str3 : DocumentBulkExecutor.this.partitionKeyRangeIds) {
                    CongestionController congestionController2 = (CongestionController) hashMap2.get(str3);
                    arrayList2.addAll(congestionController2.getFailures());
                    DocumentBulkExecutor.this.partitionKeyRangeIdToInferredDegreeOfParallelism.put(str3, Integer.valueOf(congestionController2.getDegreeOfConcurrency()));
                }
                int sum = hashMap.values().stream().mapToInt(batchUpdater2 -> {
                    return batchUpdater2.getNumberOfDocumentsUpdated();
                }).sum();
                double sum2 = hashMap.values().stream().mapToDouble(batchUpdater3 -> {
                    return batchUpdater3.getTotalRequestUnitsConsumed();
                }).sum();
                createStarted.stop();
                return Futures.immediateFuture(new BulkUpdateResponse(sum, sum2, createStarted.elapsed(), arrayList2));
            }
        }, this.listeningExecutorService);
    }

    private ListenableFuture<BulkUpdateResponse> executeBulkUpdateWithPatchAsyncImpl(Collection<Document> collection, Integer num) {
        final Stopwatch createStarted = Stopwatch.createStarted();
        this.logger.debug("Bucketing patch documents ...");
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
        for (String str : this.partitionKeyRangeIds) {
            concurrentHashMap.put(str, ConcurrentHashMap.newKeySet(collection.size() / this.partitionKeyRangeIds.size()));
            concurrentHashMap2.put(str, new ArrayList(1000));
        }
        String replaceFirst = this.partitionKeyDefinition.getPaths().iterator().next().replaceFirst("^/", "");
        collection.parallelStream().forEach(document -> {
            UpdateItem updateItemFromPatchDocument = getUpdateItemFromPatchDocument(document, replaceFirst);
            ((Set) concurrentHashMap.get(this.collectionRoutingMap.getRangeByEffectivePartitionKey(DocumentAnalyzer.fromPartitionKeyvalue(updateItemFromPatchDocument.getPartitionKeyValue()).getEffectivePartitionKeyString(this.partitionKeyDefinition, true)).getId())).add(updateItemFromPatchDocument);
        });
        this.logger.debug("Creating mini batches within each partition bucket");
        concurrentHashMap.entrySet().parallelStream().forEach(entry -> {
            String str2 = (String) entry.getKey();
            ArrayList arrayList = new ArrayList(500);
            int i = 0;
            for (UpdateItem updateItem : (Set) entry.getValue()) {
                if (i + 1 <= this.maxUpdateMiniBatchCount) {
                    arrayList.add(updateItem);
                    i++;
                } else {
                    ((List) concurrentHashMap2.get(str2)).add(arrayList);
                    arrayList = new ArrayList(500);
                    arrayList.add(updateItem);
                    i = 1;
                }
            }
            if (arrayList.size() > 0) {
                ((List) concurrentHashMap2.get(str2)).add(arrayList);
            }
        });
        this.logger.debug("Beginning bulk update within each partition bucket");
        final HashMap hashMap = new HashMap();
        final HashMap hashMap2 = new HashMap();
        this.logger.debug("Preprocessing took: " + createStarted.elapsed().toMillis() + " millis");
        ArrayList arrayList = new ArrayList();
        for (String str2 : this.partitionKeyRangeIds) {
            BatchUpdater batchUpdater = new BatchUpdater(str2, (List) concurrentHashMap2.get(str2), this.client, this.bulkUpdateStoredProcLink, replaceFirst);
            hashMap.put(str2, batchUpdater);
            CongestionController congestionController = new CongestionController(this.listeningExecutorService, this.collectionThroughput / this.partitionKeyRangeIds.size(), str2, batchUpdater, this.partitionKeyRangeIdToInferredDegreeOfParallelism.get(str2), num);
            hashMap2.put(str2, congestionController);
            arrayList.add(congestionController.executeAllAsync());
        }
        return Futures.whenAllComplete(arrayList).callAsync(new AsyncCallable<BulkUpdateResponse>() { // from class: com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor.3
            @Override // com.google.common.util.concurrent.AsyncCallable
            public ListenableFuture<BulkUpdateResponse> call() throws Exception {
                ArrayList arrayList2 = new ArrayList();
                for (String str3 : DocumentBulkExecutor.this.partitionKeyRangeIds) {
                    CongestionController congestionController2 = (CongestionController) hashMap2.get(str3);
                    arrayList2.addAll(congestionController2.getFailures());
                    DocumentBulkExecutor.this.partitionKeyRangeIdToInferredDegreeOfParallelism.put(str3, Integer.valueOf(congestionController2.getDegreeOfConcurrency()));
                }
                int sum = hashMap.values().stream().mapToInt(batchUpdater2 -> {
                    return batchUpdater2.getNumberOfDocumentsUpdated();
                }).sum();
                double sum2 = hashMap.values().stream().mapToDouble(batchUpdater3 -> {
                    return batchUpdater3.getTotalRequestUnitsConsumed();
                }).sum();
                createStarted.stop();
                return Futures.immediateFuture(new BulkUpdateResponse(sum, sum2, createStarted.elapsed(), arrayList2));
            }
        }, this.listeningExecutorService);
    }

    private UpdateItem getUpdateItemFromPatchDocument(Document document, String str) {
        String str2 = null;
        String str3 = null;
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, Object> entry : document.getHashMap().entrySet()) {
            if (entry.getKey().matches("id")) {
                str2 = (String) entry.getValue();
            } else if (entry.getKey().matches(str)) {
                str3 = (String) entry.getValue();
            } else {
                arrayList.addAll(getUpdateOperations("", entry.getKey(), entry.getValue()));
            }
        }
        return new UpdateItem(str2, str3, arrayList);
    }

    private List<UpdateOperationBase> getUpdateOperations(String str, String str2, Object obj) {
        ArrayList arrayList = new ArrayList();
        String str3 = str.matches("") ? str2 : str + "." + str2;
        if (obj instanceof String) {
            arrayList.add(new SetUpdateOperation(str3, (String) obj));
        } else if (obj instanceof Integer) {
            arrayList.add(new SetUpdateOperation(str3, (Integer) obj));
        } else if (obj instanceof Double) {
            arrayList.add(new SetUpdateOperation(str3, (Double) obj));
        } else if (obj instanceof Boolean) {
            arrayList.add(new SetUpdateOperation(str3, (Boolean) obj));
        } else if (obj instanceof List) {
            arrayList.add(new SetUpdateOperation(str3, (List) obj));
        } else if (obj instanceof Map) {
            for (Map.Entry entry : ((HashMap) obj).entrySet()) {
                arrayList.addAll(getUpdateOperations(str3, (String) entry.getKey(), entry.getValue()));
            }
        }
        return arrayList;
    }

    private ListenableFuture<BulkUpdateResponse> executeUpdateDocumentAsyncImpl(String str, String str2, List<UpdateOperationBase> list) {
        final Stopwatch createStarted = Stopwatch.createStarted();
        String id = this.collectionRoutingMap.getRangeByEffectivePartitionKey(DocumentAnalyzer.fromPartitionKeyvalue(str).getEffectivePartitionKeyString(this.partitionKeyDefinition, true)).getId();
        ArrayList arrayList = new ArrayList(1);
        ArrayList arrayList2 = new ArrayList(1);
        arrayList2.add(new UpdateItem(str2, str, list));
        arrayList.add(arrayList2);
        final BatchUpdater batchUpdater = new BatchUpdater(id, arrayList, this.client, this.bulkUpdateStoredProcLink, this.partitionKeyDefinition.getPaths().iterator().next().replaceFirst("^/", ""));
        final CongestionController congestionController = new CongestionController(this.listeningExecutorService, this.collectionThroughput / this.partitionKeyRangeIds.size(), id, batchUpdater, null, null);
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(congestionController.executeAllAsync());
        return Futures.whenAllComplete(arrayList3).callAsync(new AsyncCallable<BulkUpdateResponse>() { // from class: com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor.4
            @Override // com.google.common.util.concurrent.AsyncCallable
            public ListenableFuture<BulkUpdateResponse> call() throws Exception {
                ArrayList arrayList4 = new ArrayList();
                arrayList4.addAll(congestionController.getFailures());
                int numberOfDocumentsUpdated = batchUpdater.getNumberOfDocumentsUpdated();
                double totalRequestUnitsConsumed = batchUpdater.getTotalRequestUnitsConsumed();
                createStarted.stop();
                return Futures.immediateFuture(new BulkUpdateResponse(numberOfDocumentsUpdated, totalRequestUnitsConsumed, createStarted.elapsed(), arrayList4));
            }
        }, this.listeningExecutorService);
    }

    private ListenableFuture<BulkDeleteResponse> executeBulkDeleteAsyncImpl(String str) throws Exception {
        ArrayList arrayList = new ArrayList();
        final Stopwatch createStarted = Stopwatch.createStarted();
        Matcher matcher = BULK_DELETE_QUERY_SPEC_PATTERN.matcher(str);
        matcher.find();
        BulkDeleteQuerySpec bulkDeleteQuerySpec = new BulkDeleteQuerySpec(matcher.group("root").toString(), matcher.group("filter").toString(), null, 1000);
        this.logger.debug("Beginning bulk delete within each partition range");
        final HashMap hashMap = new HashMap();
        for (String str2 : this.partitionKeyRangeIds) {
            BatchDeleter batchDeleter = new BatchDeleter(str2, this.client, this.bulkDeleteStoredProcLink, bulkDeleteQuerySpec);
            hashMap.put(str2, batchDeleter);
            arrayList.add(this.listeningExecutorService.submit((Callable) batchDeleter.executeDelete()));
        }
        return Futures.whenAllComplete(arrayList).callAsync(new AsyncCallable<BulkDeleteResponse>() { // from class: com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor.5
            @Override // com.google.common.util.concurrent.AsyncCallable
            public ListenableFuture<BulkDeleteResponse> call() throws Exception {
                ArrayList arrayList2 = new ArrayList();
                int sum = hashMap.values().stream().mapToInt(batchDeleter2 -> {
                    return batchDeleter2.getNumberOfDocumentsDeleted();
                }).sum();
                double sum2 = hashMap.values().stream().mapToDouble(batchDeleter3 -> {
                    return batchDeleter3.getTotalRequestUnitsConsumed();
                }).sum();
                createStarted.stop();
                return Futures.immediateFuture(new BulkDeleteResponse(sum, sum2, createStarted.elapsed(), arrayList2));
            }
        }, this.listeningExecutorService);
    }

    private static CollectionRoutingMap getCollectionRoutingMap(DocumentClient documentClient, String str) {
        ArrayList arrayList = new ArrayList();
        Iterator<PartitionKeyRange> it = documentClient.readPartitionKeyRanges(str, (FeedOptions) null).getQueryIterable().toList().iterator();
        while (it.hasNext()) {
            arrayList.add(new ImmutablePair(it.next(), true));
        }
        InMemoryCollectionRoutingMap tryCreateCompleteRoutingMap = InMemoryCollectionRoutingMap.tryCreateCompleteRoutingMap(arrayList, "");
        if (tryCreateCompleteRoutingMap == null) {
            throw new IllegalStateException("Cannot create complete routing map");
        }
        return tryCreateCompleteRoutingMap;
    }

    private static List<ImmutablePair<String, String>> getPartitionKeyRangeIdsFromValues(CollectionRoutingMap collectionRoutingMap, PartitionKeyDefinition partitionKeyDefinition, List<String> list) {
        ArrayList arrayList = new ArrayList();
        for (String str : list) {
            arrayList.add(new ImmutablePair(str, collectionRoutingMap.getRangeByEffectivePartitionKey(PartitionKeyInternal.fromObjectArray(Collections.singletonList(str), true).getEffectivePartitionKeyString(partitionKeyDefinition, true)).getId()));
        }
        return arrayList;
    }

    private DocumentClientException toDocumentClientException(Exception exc) {
        return exc instanceof DocumentClientException ? (DocumentClientException) exc : new DocumentClientException(500, exc);
    }

    private int getDocumentSize(String str) {
        int length = str.getBytes(Charset.forName("UTF-8")).length;
        if (length > this.maxMiniBatchSize) {
            this.logger.error("Document size {} larger than script payload limit. {}", Integer.valueOf(length), Integer.valueOf(this.maxMiniBatchSize));
        }
        return length;
    }
}
