package com.microsoft.azure.documentdb.changefeedprocessor.services;

import com.microsoft.azure.documentdb.ChangeFeedOptions;
import com.microsoft.azure.documentdb.Document;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.FeedResponse;
import com.microsoft.azure.documentdb.changefeedprocessor.ChangeFeedObserverCloseReason;
import com.microsoft.azure.documentdb.changefeedprocessor.ChangeFeedObserverContext;
import com.microsoft.azure.documentdb.changefeedprocessor.IChangeFeedObserver;
import com.microsoft.azure.documentdb.changefeedprocessor.internal.ChangeFeedThreadFactory;
import com.microsoft.azure.documentdb.changefeedprocessor.internal.StatusCode;
import com.microsoft.azure.documentdb.changefeedprocessor.internal.SubStatusCode;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/* loaded from: input_file:com/microsoft/azure/documentdb/changefeedprocessor/services/ChangeFeedJob.class */
public class ChangeFeedJob implements Job {
    private final DocumentServices client;
    private final CheckpointServices checkpointServices;
    private final String partitionId;
    private final IChangeFeedObserver observer;
    private ChangeFeedOptions feedOptions;
    private int pageSize;
    private final int DEFAULT_PAGE_SIZE = 100;
    private final int DEFAULT_THREAD_WAIT = 1000;
    private ExecutorService exec;
    private final int CPUs;
    private final int NumThreadsPerCpu = 5;
    private static Logger logger = Logger.getLogger(ChangeFeedJob.class.getName());

    public DocumentServices getClient() {
        return this.client;
    }

    public CheckpointServices getCheckpointServices() {
        return this.checkpointServices;
    }

    public ChangeFeedJob(String str, DocumentServices documentServices, CheckpointServices checkpointServices, IChangeFeedObserver iChangeFeedObserver, int i) {
        this.DEFAULT_PAGE_SIZE = 100;
        this.DEFAULT_THREAD_WAIT = 1000;
        this.CPUs = Runtime.getRuntime().availableProcessors();
        this.NumThreadsPerCpu = 5;
        this.client = documentServices;
        this.checkpointServices = checkpointServices;
        this.partitionId = str;
        this.observer = iChangeFeedObserver;
        this.pageSize = i;
        this.exec = CreateExecutorService(5, "partition_" + str);
    }

    public ChangeFeedJob(String str, DocumentServices documentServices, CheckpointServices checkpointServices, IChangeFeedObserver iChangeFeedObserver) {
        this.DEFAULT_PAGE_SIZE = 100;
        this.DEFAULT_THREAD_WAIT = 1000;
        this.CPUs = Runtime.getRuntime().availableProcessors();
        this.NumThreadsPerCpu = 5;
        this.client = documentServices;
        this.checkpointServices = checkpointServices;
        this.partitionId = str;
        this.observer = iChangeFeedObserver;
        this.pageSize = 100;
        this.exec = CreateExecutorService(5, "partition_" + str);
    }

    private ExecutorService CreateExecutorService(int i, String str) {
        logger.info(String.format("Creating ExecutorService CPUs: %d, numThreadPerCPU: %d, threadSuffixName: %s", Integer.valueOf(this.CPUs), Integer.valueOf(i), str));
        if (i <= 0) {
            throw new IllegalArgumentException("The parameter numThreadPerCPU must be greater them 0");
        }
        if (str == null || str.isEmpty()) {
            throw new IllegalArgumentException("The parameter threadSuffixName is null or empty");
        }
        return Executors.newFixedThreadPool(i * this.CPUs, new ChangeFeedThreadFactory(str));
    }

    @Override // com.microsoft.azure.documentdb.changefeedprocessor.services.Job
    public void start(String str) throws DocumentClientException, InterruptedException {
        logger.info(String.format("Starting ChangeFeedJob ", new Object[0]));
        if (this.exec.isShutdown() || this.exec.isTerminated()) {
            return;
        }
        this.exec.execute(() -> {
            try {
                QueryChangeFeed(str);
            } catch (DocumentClientException e) {
                logger.warning(e.getMessage());
                e.printStackTrace();
            } catch (InterruptedException e2) {
                logger.warning(e2.getMessage());
                e2.printStackTrace();
            }
        });
    }

    private void QueryChangeFeed(String str) throws DocumentClientException, InterruptedException {
        logger.info(String.format("PartitionID: %s - QueryChangeFeed - Initiate", this.partitionId));
        ChangeFeedObserverContext changeFeedObserverContext = new ChangeFeedObserverContext();
        changeFeedObserverContext.setPartitionKeyRangeId(this.partitionId);
        boolean z = false;
        while (!this.exec.isTerminated() && !this.exec.isShutdown()) {
            do {
                try {
                    logger.info(String.format("client.createDocumentChangeFeedQuery(%s, %s, %d)", this.partitionId, this.checkpointServices.getCheckpointData(this.partitionId), Integer.valueOf(this.pageSize)));
                    FeedResponse<Document> createDocumentChangeFeedQuery = this.client.createDocumentChangeFeedQuery(this.partitionId, this.checkpointServices.getCheckpointData(this.partitionId), this.pageSize);
                    if (createDocumentChangeFeedQuery != null) {
                        logger.info(String.format("Query is not null query.getActivityId: %s ", createDocumentChangeFeedQuery.getActivityId()));
                        changeFeedObserverContext.setFeedResponse(createDocumentChangeFeedQuery);
                        do {
                            String responseContinuation = createDocumentChangeFeedQuery.getResponseContinuation();
                            List<Document> fetchNextBlock = createDocumentChangeFeedQuery.getQueryIterable().fetchNextBlock();
                            z = createDocumentChangeFeedQuery.getQueryIterator().hasNext();
                            if (fetchNextBlock != null) {
                                logger.info(String.format("Docs Loaded #%d - HasMoreResults: %s", Integer.valueOf(fetchNextBlock.size()), Boolean.valueOf(z)));
                                this.observer.processChanges(changeFeedObserverContext, fetchNextBlock);
                            } else {
                                logger.info(String.format("Docs is null & HasMoreResults = %s", Boolean.valueOf(z)));
                            }
                            checkpoint(responseContinuation);
                        } while (z);
                    }
                } catch (Exception e) {
                    logger.severe(String.format("Other exception not handled happened: %s ", e.getMessage()));
                    e.printStackTrace();
                } catch (DocumentClientException e2) {
                    int subStatusCode = getSubStatusCode(e2);
                    if ((e2.getStatusCode() == StatusCode.NOTFOUND.Value() && SubStatusCode.ReadSessionNotAvailable.Value() != subStatusCode) || e2.getStatusCode() == StatusCode.GONE.Value()) {
                        stop(ChangeFeedObserverCloseReason.RESOURCE_GONE);
                        this.observer.close(changeFeedObserverContext, ChangeFeedObserverCloseReason.RESOURCE_GONE);
                    } else if (SubStatusCode.Splitting.Value() == subStatusCode) {
                        logger.warning(String.format("Partition %s is splitting. Will retry to read changes until split finishes. %s", changeFeedObserverContext.getPartitionKeyRangeId(), e2.getMessage()));
                    } else {
                        if (e2.getStatusCode() != StatusCode.TOO_MANY_REQUESTS.Value()) {
                            throw e2;
                        }
                        try {
                            ExecutorService executorService = this.exec;
                            getClass();
                            executorService.awaitTermination(1000L, TimeUnit.MILLISECONDS);
                            Logger logger2 = logger;
                            getClass();
                            logger2.info(String.format("Too many requests during change feed for Partition: %s - Waiting for %d milliseconds before perform another query.", this.partitionId, 1000));
                        } catch (InterruptedException e3) {
                            logger.warning(String.format("Too Many requests InterruptedException trying to wait the thread: %s", e3.getMessage()));
                        }
                    }
                }
                if (!z || this.exec.isTerminated()) {
                    break;
                }
            } while (!this.exec.isShutdown());
            if (!this.exec.isTerminated() && !this.exec.isShutdown()) {
                try {
                    ExecutorService executorService2 = this.exec;
                    getClass();
                    executorService2.awaitTermination(1000L, TimeUnit.MILLISECONDS);
                    Logger logger3 = logger;
                    getClass();
                    logger3.info(String.format("There are no changes for Partition: %s - Waiting for %d milliseconds before perform another query.", this.partitionId, 1000));
                } catch (InterruptedException e4) {
                    logger.warning(String.format(" InterruptedException trying to wait the thread: %s", e4.getMessage()));
                }
            }
        }
    }

    void checkpoint(String str) throws DocumentClientException {
        logger.info(String.format("Chekpoint - PartitionID: %s, InitialData %s", this.partitionId, str));
        this.checkpointServices.setCheckpointData(this.partitionId, str != null ? str : "");
    }

    @Override // com.microsoft.azure.documentdb.changefeedprocessor.services.Job
    public void stop(ChangeFeedObserverCloseReason changeFeedObserverCloseReason) {
        switch (changeFeedObserverCloseReason) {
            case SHUTDOWN:
            case RESOURCE_GONE:
                logger.warning(String.format("CloseReason %s Shutting down executor", changeFeedObserverCloseReason));
                this.exec.shutdown();
                return;
            default:
                logger.warning(String.format("CloseReason %s Shutting down executor NOW", changeFeedObserverCloseReason));
                this.exec.shutdownNow();
                return;
        }
    }

    private int getSubStatusCode(DocumentClientException documentClientException) {
        String str = (String) documentClientException.getResponseHeaders().get("x-ms-substatus");
        if (str == null || str.isEmpty()) {
            return -1;
        }
        try {
            return Integer.parseInt(str);
        } catch (Exception e) {
            logger.severe(String.format("Not able to parse the error code %s to int", str));
            return -1;
        }
    }
}
