package com.microsoft.azure.cosmosdb.changefeedprocessor.internal;

import com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedDocumentClient;
import com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedObserver;
import com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedObserverFactory;
import com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor;
import com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessorOptions;
import com.microsoft.azure.cosmosdb.changefeedprocessor.DocumentCollectionInfo;
import com.microsoft.azure.cosmosdb.changefeedprocessor.HealthMonitor;
import com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseStoreManager;
import com.microsoft.azure.cosmosdb.changefeedprocessor.PartitionLoadBalancingStrategy;
import com.microsoft.azure.cosmosdb.changefeedprocessor.PartitionManager;
import com.microsoft.azure.cosmosdb.changefeedprocessor.PartitionProcessorFactory;
import com.microsoft.azure.cosmosdb.changefeedprocessor.internal.Constants;
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import rx.Completable;
import rx.Observable;
import rx.Single;
import rx.schedulers.Schedulers;

/* loaded from: input_file:com/microsoft/azure/cosmosdb/changefeedprocessor/internal/ChangeFeedProcessorBuilderImpl.class */
public class ChangeFeedProcessorBuilderImpl implements ChangeFeedProcessor.BuilderDefinition, ChangeFeedProcessor, AutoCloseable {
    private static final long DefaultUnhealthinessDuration = Duration.ofMinutes(15).toMillis();
    private final Duration sleepTime;
    private final Duration lockTime;
    private String hostName;
    private DocumentCollectionInfo feedCollectionLocation;
    private ChangeFeedDocumentClient feedDocumentClient;
    private boolean closeFeedDocumentClient;
    private ChangeFeedProcessorOptions changeFeedProcessorOptions;
    private ChangeFeedObserverFactory observerFactory;
    private String databaseResourceId;
    private String collectionResourceId;
    private DocumentCollectionInfo leaseCollectionLocation;
    private ChangeFeedDocumentClient leaseDocumentClient;
    private boolean closeLeaseDocumentClient;
    private PartitionLoadBalancingStrategy loadBalancingStrategy;
    private PartitionProcessorFactory partitionProcessorFactory;
    private LeaseStoreManager leaseStoreManager;
    private HealthMonitor healthMonitor;
    private PartitionManager partitionManager;
    private ExecutorService executorService;

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor
    public Completable startAsync() {
        return this.partitionManager.startAsync();
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor
    public Completable stopAsync() {
        return this.partitionManager.stopAsync();
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor
    public void start() {
        startAsync().await();
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor
    public void stop() {
        stopAsync().await();
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessorBuilderImpl withHostName(String str) {
        this.hostName = str;
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessorBuilderImpl withFeedCollection(DocumentCollectionInfo documentCollectionInfo) {
        this.feedCollectionLocation = documentCollectionInfo;
        if (documentCollectionInfo == null || documentCollectionInfo.getConnectionPolicy() == null) {
            throw new IllegalArgumentException("feedCollectionLocation");
        }
        if (documentCollectionInfo.getConnectionPolicy().getUserAgentSuffix() == null || documentCollectionInfo.getConnectionPolicy().getUserAgentSuffix().isEmpty()) {
            this.feedCollectionLocation = new DocumentCollectionInfo(documentCollectionInfo);
            this.feedCollectionLocation.getConnectionPolicy().setUserAgentSuffix("changefeed-2.2.6");
        }
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessorBuilderImpl withFeedDocumentClient(AsyncDocumentClient asyncDocumentClient) {
        if (asyncDocumentClient == null) {
            throw new IllegalArgumentException("feedDocumentClient");
        }
        this.feedDocumentClient = new ChangeFeedDocumentClientImpl(asyncDocumentClient);
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessorBuilderImpl withProcessorOptions(ChangeFeedProcessorOptions changeFeedProcessorOptions) {
        if (changeFeedProcessorOptions == null) {
            throw new IllegalArgumentException("changeFeedProcessorOptions");
        }
        this.changeFeedProcessorOptions = changeFeedProcessorOptions;
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessorBuilderImpl withChangeFeedObserverFactory(ChangeFeedObserverFactory changeFeedObserverFactory) {
        if (changeFeedObserverFactory == null) {
            throw new IllegalArgumentException("observerFactory");
        }
        this.observerFactory = changeFeedObserverFactory;
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessorBuilderImpl withChangeFeedObserver(Class<? extends ChangeFeedObserver> cls) {
        if (cls == null) {
            throw new IllegalArgumentException(Constants.Properties.AUTH_SCHEMA_TYPE);
        }
        this.observerFactory = new ChangeFeedObserverFactoryImpl(cls);
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessorBuilderImpl withDatabaseResourceId(String str) {
        this.databaseResourceId = str;
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessorBuilderImpl withCollectionResourceId(String str) {
        this.collectionResourceId = str;
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessorBuilderImpl withLeaseCollection(DocumentCollectionInfo documentCollectionInfo) {
        this.leaseCollectionLocation = ChangeFeedHelper.canonicalize(documentCollectionInfo);
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessorBuilderImpl withLeaseDocumentClient(AsyncDocumentClient asyncDocumentClient) {
        if (asyncDocumentClient == null) {
            throw new IllegalArgumentException("leaseDocumentClient");
        }
        this.leaseDocumentClient = new ChangeFeedDocumentClientImpl(asyncDocumentClient);
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessorBuilderImpl withPartitionLoadBalancingStrategy(PartitionLoadBalancingStrategy partitionLoadBalancingStrategy) {
        if (partitionLoadBalancingStrategy == null) {
            throw new IllegalArgumentException("loadBalancingStrategy");
        }
        this.loadBalancingStrategy = partitionLoadBalancingStrategy;
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessorBuilderImpl withPartitionProcessorFactory(PartitionProcessorFactory partitionProcessorFactory) {
        if (partitionProcessorFactory == null) {
            throw new IllegalArgumentException("partitionProcessorFactory");
        }
        this.partitionProcessorFactory = partitionProcessorFactory;
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessorBuilderImpl withLeaseStoreManager(LeaseStoreManager leaseStoreManager) {
        if (leaseStoreManager == null) {
            throw new IllegalArgumentException("leaseStoreManager");
        }
        this.leaseStoreManager = leaseStoreManager;
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessorBuilderImpl withHealthMonitor(HealthMonitor healthMonitor) {
        if (healthMonitor == null) {
            throw new IllegalArgumentException("healthMonitor");
        }
        this.healthMonitor = healthMonitor;
        return this;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessor build() {
        return (ChangeFeedProcessor) buildAsync().toBlocking().value();
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor.BuilderDefinition
    public Single<ChangeFeedProcessor> buildAsync() {
        if (this.hostName == null) {
            throw new IllegalArgumentException("Host name was not specified");
        }
        if (this.feedCollectionLocation == null) {
            throw new IllegalArgumentException("feedCollectionLocation was not specified");
        }
        if (this.leaseCollectionLocation == null && this.leaseStoreManager == null) {
            throw new IllegalArgumentException("Either leaseCollectionLocation or leaseStoreManager must be specified");
        }
        if (this.observerFactory == null) {
            throw new IllegalArgumentException("Observer was not specified");
        }
        initializeCollectionPropertiesForBuildAsync().await();
        this.partitionManager = (PartitionManager) buildPartitionManagerAsync((LeaseStoreManager) getLeaseStoreManagerAsync(this.leaseCollectionLocation, true).toBlocking().value()).toBlocking().value();
        return Single.just(this);
    }

    public ChangeFeedProcessorBuilderImpl() {
        this.sleepTime = Duration.ofSeconds(15L);
        this.lockTime = Duration.ofSeconds(30L);
    }

    public ChangeFeedProcessorBuilderImpl(PartitionManager partitionManager) {
        this.sleepTime = Duration.ofSeconds(15L);
        this.lockTime = Duration.ofSeconds(30L);
        this.closeLeaseDocumentClient = true;
        this.partitionManager = partitionManager;
    }

    private Completable initializeCollectionPropertiesForBuildAsync() {
        Completable subscribeOn = Completable.fromAction(() -> {
            if (this.feedDocumentClient == null) {
                this.feedDocumentClient = new ChangeFeedDocumentClientImpl(new AsyncDocumentClient.Builder().withServiceEndpoint(this.feedCollectionLocation.getUri()).withMasterKeyOrResourceToken(this.feedCollectionLocation.getMasterKey()).withConnectionPolicy(this.feedCollectionLocation.getConnectionPolicy()).withConsistencyLevel(this.feedCollectionLocation.getConsistencyLevel()).build());
                this.closeFeedDocumentClient = true;
            }
        }).subscribeOn(Schedulers.io());
        Completable subscribeOn2 = Completable.fromAction(() -> {
            if (this.changeFeedProcessorOptions == null) {
                this.changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
            }
        }).subscribeOn(Schedulers.computation());
        if (this.databaseResourceId == null) {
            subscribeOn2 = subscribeOn2.mergeWith(Completable.fromAction(() -> {
            }));
        }
        if (this.collectionResourceId == null) {
            subscribeOn2 = subscribeOn2.mergeWith(Completable.fromAction(() -> {
            }));
        }
        return subscribeOn.andThen(subscribeOn2);
    }

    private Single<LeaseStoreManager> getLeaseStoreManagerAsync(DocumentCollectionInfo documentCollectionInfo, boolean z) {
        if (this.leaseStoreManager != null) {
            return Single.just(this.leaseStoreManager);
        }
        if (this.leaseDocumentClient == null) {
            this.leaseDocumentClient = new ChangeFeedDocumentClientImpl(new AsyncDocumentClient.Builder().withServiceEndpoint(this.leaseCollectionLocation.getUri()).withMasterKeyOrResourceToken(this.leaseCollectionLocation.getMasterKey()).withConnectionPolicy(this.leaseCollectionLocation.getConnectionPolicy()).withConsistencyLevel(this.leaseCollectionLocation.getConsistencyLevel()).build());
        }
        return ChangeFeedHelper.getDocumentCollectionAsync(this.leaseDocumentClient, documentCollectionInfo).map(documentCollection -> {
            boolean z2 = (documentCollection.getPartitionKey() == null || documentCollection.getPartitionKey().getPaths() == null || documentCollection.getPartitionKey().getPaths().size() <= 0) ? false : true;
            if (z2 && z && (documentCollection.getPartitionKey().getPaths().size() != 1 || !((String) documentCollection.getPartitionKey().getPaths().get(0)).equals("/id"))) {
                Observable.error(new IllegalArgumentException("The lease collection, if partitioned, must have partition key equal to id."));
            }
            this.leaseStoreManager = LeaseStoreManager.Builder().withLeasePrefix(this.getLeasePrefix()).withLeaseCollection(this.leaseCollectionLocation).withLeaseCollectionLink(documentCollection.getSelfLink()).withRequestOptionsFactory(z2 ? new PartitionedByIdCollectionRequestOptionsFactory() : new SinglePartitionRequestOptionsFactory()).withHostName(this.hostName).withLeaseDocumentClient(this.leaseDocumentClient).build();
            return this.leaseStoreManager;
        }).toSingle();
    }

    private String getLeasePrefix() {
        String leasePrefix = this.changeFeedProcessorOptions.getLeasePrefix();
        if (leasePrefix == null) {
            leasePrefix = "";
        }
        try {
            return String.format("%s%s_%s_%s", leasePrefix, new URI(this.feedCollectionLocation.getUri()).getHost(), this.databaseResourceId, this.collectionResourceId);
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException("uri - " + e.getMessage());
        }
    }

    private Single<PartitionManager> buildPartitionManagerAsync(LeaseStoreManager leaseStoreManager) {
        if (this.executorService == null) {
            this.executorService = Executors.newCachedThreadPool();
        }
        String collectionSelfLink = ChangeFeedHelper.getCollectionSelfLink(this.feedCollectionLocation);
        CheckpointerObserverFactory checkpointerObserverFactory = new CheckpointerObserverFactory(this.observerFactory, this.changeFeedProcessorOptions.getCheckpointFrequency());
        PartitionSynchronizerImpl partitionSynchronizerImpl = new PartitionSynchronizerImpl(this.feedDocumentClient, collectionSelfLink, leaseStoreManager, leaseStoreManager, this.changeFeedProcessorOptions.getDegreeOfParallelism(), this.changeFeedProcessorOptions.getQueryPartitionsMaxBatchSize());
        BootstrapperImpl bootstrapperImpl = new BootstrapperImpl(partitionSynchronizerImpl, leaseStoreManager, this.lockTime, this.sleepTime);
        PartitionSupervisorFactoryImpl partitionSupervisorFactoryImpl = new PartitionSupervisorFactoryImpl(checkpointerObserverFactory, leaseStoreManager, this.partitionProcessorFactory != null ? this.partitionProcessorFactory : new PartitionProcessorFactoryImpl(this.feedDocumentClient, this.changeFeedProcessorOptions, leaseStoreManager, collectionSelfLink), this.changeFeedProcessorOptions, this.executorService);
        if (this.loadBalancingStrategy == null) {
            this.loadBalancingStrategy = new EqualPartitionsBalancingStrategy(this.hostName, this.changeFeedProcessorOptions.getMinPartitionCount(), this.changeFeedProcessorOptions.getMaxPartitionCount(), this.changeFeedProcessorOptions.getLeaseExpirationInterval());
        }
        PartitionControllerImpl partitionControllerImpl = new PartitionControllerImpl(leaseStoreManager, leaseStoreManager, partitionSupervisorFactoryImpl, partitionSynchronizerImpl, this.executorService);
        if (this.healthMonitor == null) {
            this.healthMonitor = new TraceHealthMonitor();
        }
        return Single.just(new PartitionManagerImpl(bootstrapperImpl, partitionControllerImpl, new PartitionLoadBalancerImpl(new HealthMonitoringPartitionControllerDecorator(partitionControllerImpl, this.healthMonitor), leaseStoreManager, this.loadBalancingStrategy, this.changeFeedProcessorOptions.getLeaseAcquireInterval(), this.executorService)));
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.closeLeaseDocumentClient) {
            this.leaseDocumentClient.close();
        }
        if (this.closeFeedDocumentClient) {
            this.feedDocumentClient.close();
        }
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.ChangeFeedProcessor.BuilderDefinition
    public /* bridge */ /* synthetic */ ChangeFeedProcessor.BuilderDefinition withChangeFeedObserver(Class cls) {
        return withChangeFeedObserver((Class<? extends ChangeFeedObserver>) cls);
    }
}
