package com.azure.data.cosmos.internal.changefeed.implementation;

import com.azure.data.cosmos.ChangeFeedOptions;
import com.azure.data.cosmos.CommonsBridgeInternal;
import com.azure.data.cosmos.CosmosClientException;
import com.azure.data.cosmos.CosmosItemProperties;
import com.azure.data.cosmos.FeedResponse;
import com.azure.data.cosmos.internal.HttpConstants;
import com.azure.data.cosmos.internal.changefeed.CancellationToken;
import com.azure.data.cosmos.internal.changefeed.ChangeFeedContextClient;
import com.azure.data.cosmos.internal.changefeed.ChangeFeedObserver;
import com.azure.data.cosmos.internal.changefeed.PartitionCheckpointer;
import com.azure.data.cosmos.internal.changefeed.PartitionProcessor;
import com.azure.data.cosmos.internal.changefeed.ProcessorSettings;
import com.azure.data.cosmos.internal.changefeed.exceptions.PartitionNotFoundException;
import com.azure.data.cosmos.internal.changefeed.exceptions.PartitionSplitException;
import com.azure.data.cosmos.internal.changefeed.exceptions.TaskCancelledException;
import java.time.Duration;
import java.util.List;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/data/cosmos/internal/changefeed/implementation/PartitionProcessorImpl.class */
class PartitionProcessorImpl implements PartitionProcessor {
    private static final int DefaultMaxItemCount = 100;
    private final ProcessorSettings settings;
    private final PartitionCheckpointer checkpointer;
    private final ChangeFeedObserver observer;
    private final ChangeFeedOptions options = new ChangeFeedOptions();
    private final ChangeFeedContextClient documentClient;
    private RuntimeException resultException;
    private String lastContinuation;

    /* renamed from: com.azure.data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl$1, reason: invalid class name */
    /* loaded from: input_file:com/azure/data/cosmos/internal/changefeed/implementation/PartitionProcessorImpl$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$azure$data$cosmos$internal$changefeed$implementation$StatusCodeErrorType = new int[StatusCodeErrorType.values().length];

        static {
            try {
                $SwitchMap$com$azure$data$cosmos$internal$changefeed$implementation$StatusCodeErrorType[StatusCodeErrorType.PARTITION_NOT_FOUND.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$azure$data$cosmos$internal$changefeed$implementation$StatusCodeErrorType[StatusCodeErrorType.PARTITION_SPLIT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$azure$data$cosmos$internal$changefeed$implementation$StatusCodeErrorType[StatusCodeErrorType.UNDEFINED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$azure$data$cosmos$internal$changefeed$implementation$StatusCodeErrorType[StatusCodeErrorType.MAX_ITEM_COUNT_TOO_LARGE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public PartitionProcessorImpl(ChangeFeedObserver changeFeedObserver, ChangeFeedContextClient changeFeedContextClient, ProcessorSettings processorSettings, PartitionCheckpointer partitionCheckpointer) {
        this.observer = changeFeedObserver;
        this.documentClient = changeFeedContextClient;
        this.settings = processorSettings;
        this.checkpointer = partitionCheckpointer;
        this.options.maxItemCount(Integer.valueOf(processorSettings.getMaxItemCount()));
        CommonsBridgeInternal.partitionKeyRangeIdInternal(this.options, processorSettings.getPartitionKeyRangeId());
        this.options.startFromBeginning(processorSettings.isStartFromBeginning());
        this.options.requestContinuation(processorSettings.getStartContinuation());
        this.options.startDateTime(processorSettings.getStartTime());
    }

    @Override // com.azure.data.cosmos.internal.changefeed.PartitionProcessor
    public Mono<Void> run(CancellationToken cancellationToken) {
        this.lastContinuation = this.settings.getStartContinuation();
        return Mono.fromRunnable(() -> {
            while (!cancellationToken.isCancellationRequested()) {
                Duration feedPollDelay = this.settings.getFeedPollDelay();
                try {
                    this.options.requestContinuation(this.lastContinuation);
                    for (FeedResponse<CosmosItemProperties> feedResponse : (List) this.documentClient.createDocumentChangeFeedQuery(this.settings.getCollectionSelfLink(), this.options).collectList().block()) {
                        this.lastContinuation = feedResponse.continuationToken();
                        if (feedResponse.results() != null && feedResponse.results().size() > 0) {
                            this.dispatchChanges(feedResponse);
                        }
                        this.options.requestContinuation(this.lastContinuation);
                        if (cancellationToken.isCancellationRequested()) {
                            throw new TaskCancelledException();
                        }
                    }
                    if (this.options.maxItemCount().compareTo(Integer.valueOf(this.settings.getMaxItemCount())) == 0) {
                        this.options.maxItemCount(Integer.valueOf(this.settings.getMaxItemCount()));
                    }
                } catch (RuntimeException e) {
                    if (e.getCause() instanceof CosmosClientException) {
                        switch (AnonymousClass1.$SwitchMap$com$azure$data$cosmos$internal$changefeed$implementation$StatusCodeErrorType[ExceptionClassifier.classifyClientException((CosmosClientException) e.getCause()).ordinal()]) {
                            case 1:
                                this.resultException = new PartitionNotFoundException("Partition not found.", this.lastContinuation);
                            case 2:
                                this.resultException = new PartitionSplitException("Partition split.", this.lastContinuation);
                            case HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN /* 3 */:
                                this.resultException = e;
                            case 4:
                                if (this.options.maxItemCount() == null) {
                                    this.options.maxItemCount(100);
                                } else if (this.options.maxItemCount().intValue() <= 1) {
                                    throw e;
                                }
                                this.options.maxItemCount(Integer.valueOf(this.options.maxItemCount().intValue() / 2));
                                break;
                            default:
                                this.resultException = e;
                                break;
                        }
                    } else if (e instanceof TaskCancelledException) {
                        this.resultException = e;
                    }
                }
                for (long millis = feedPollDelay.toMillis(); !cancellationToken.isCancellationRequested() && millis > 0; millis -= 100) {
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e2) {
                    }
                }
            }
        });
    }

    @Override // com.azure.data.cosmos.internal.changefeed.PartitionProcessor
    public RuntimeException getResultException() {
        return this.resultException;
    }

    private void dispatchChanges(FeedResponse<CosmosItemProperties> feedResponse) {
        this.observer.processChanges(new ChangeFeedObserverContextImpl(this.settings.getPartitionKeyRangeId(), feedResponse, this.checkpointer), feedResponse.results());
    }
}
