package com.microsoft.azure.cosmosdb.changefeedprocessor.internal;

import com.microsoft.azure.cosmosdb.changefeedprocessor.CancellationToken;
import com.microsoft.azure.cosmosdb.changefeedprocessor.CancellationTokenSource;
import com.microsoft.azure.cosmosdb.changefeedprocessor.Lease;
import com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseContainer;
import com.microsoft.azure.cosmosdb.changefeedprocessor.LeaseManager;
import com.microsoft.azure.cosmosdb.changefeedprocessor.PartitionController;
import com.microsoft.azure.cosmosdb.changefeedprocessor.PartitionSupervisor;
import com.microsoft.azure.cosmosdb.changefeedprocessor.PartitionSupervisorFactory;
import com.microsoft.azure.cosmosdb.changefeedprocessor.PartitionSynchronizer;
import com.microsoft.azure.cosmosdb.changefeedprocessor.exceptions.PartitionSplitException;
import com.microsoft.azure.cosmosdb.changefeedprocessor.exceptions.TaskCancelledException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import rx.Completable;
import rx.Observable;

/* loaded from: input_file:com/microsoft/azure/cosmosdb/changefeedprocessor/internal/PartitionControllerImpl.class */
public class PartitionControllerImpl implements PartitionController {
    private final Map<String, WorkerTask> currentlyOwnedPartitions = new ConcurrentHashMap();
    private final LeaseContainer leaseContainer;
    private final LeaseManager leaseManager;
    private final PartitionSupervisorFactory partitionSupervisorFactory;
    private final PartitionSynchronizer synchronizer;
    private CancellationTokenSource shutdownCts;
    private final ExecutorService executorService;

    public PartitionControllerImpl(LeaseContainer leaseContainer, LeaseManager leaseManager, PartitionSupervisorFactory partitionSupervisorFactory, PartitionSynchronizer partitionSynchronizer, ExecutorService executorService) {
        this.leaseContainer = leaseContainer;
        this.leaseManager = leaseManager;
        this.partitionSupervisorFactory = partitionSupervisorFactory;
        this.synchronizer = partitionSynchronizer;
        this.executorService = executorService;
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.PartitionController
    public Completable initializeAsync() {
        this.shutdownCts = new CancellationTokenSource();
        return loadLeasesAsync();
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.PartitionController
    public synchronized Observable<Lease> addOrUpdateLeaseAsync(Lease lease) {
        WorkerTask workerTask = this.currentlyOwnedPartitions.get(lease.getLeaseToken());
        if (workerTask != null && workerTask.isRunning()) {
            return Observable.just((Lease) this.leaseManager.updatePropertiesAsync(lease).toBlocking().last());
        }
        try {
            Lease lease2 = (Lease) this.leaseManager.acquireAsync(lease).toBlocking().last();
            if (lease2 != null) {
                lease = lease2;
            }
            this.currentlyOwnedPartitions.put(lease.getLeaseToken(), processPartition(this.partitionSupervisorFactory.create(lease), lease));
            return Observable.just(lease);
        } catch (RuntimeException e) {
            removeLeaseAsync(lease).await();
            throw e;
        }
    }

    @Override // com.microsoft.azure.cosmosdb.changefeedprocessor.PartitionController
    public Completable shutdownAsync() {
        this.shutdownCts.cancel();
        return Completable.complete();
    }

    private Completable loadLeasesAsync() {
        return this.leaseContainer.getOwnedLeasesAsync().flatMap(lease -> {
            return this.addOrUpdateLeaseAsync(lease);
        }).toCompletable();
    }

    private Completable removeLeaseAsync(Lease lease) {
        return Completable.fromAction(() -> {
            if (this.currentlyOwnedPartitions.get(lease.getLeaseToken()) != null) {
                WorkerTask remove = this.currentlyOwnedPartitions.remove(lease.getLeaseToken());
                if (remove.isRunning()) {
                    remove.interrupt();
                }
                try {
                    this.leaseManager.releaseAsync(lease).await();
                } catch (Exception e) {
                }
            }
        });
    }

    private WorkerTask processPartition(PartitionSupervisor partitionSupervisor, Lease lease) {
        CancellationToken token = this.shutdownCts.getToken();
        WorkerTask workerTask = new WorkerTask(Completable.fromAction(() -> {
            try {
                partitionSupervisor.runAsync(token).await();
            } catch (PartitionSplitException e) {
                this.handleSplitAsync(lease, e.getLastContinuation()).await();
            } catch (TaskCancelledException e2) {
            } catch (Exception e3) {
            }
            this.removeLeaseAsync(lease).await();
        }));
        this.executorService.execute(workerTask);
        return workerTask;
    }

    private Completable handleSplitAsync(Lease lease, String str) {
        lease.setContinuationToken(str);
        return this.synchronizer.splitPartitionAsync(lease).flatMap(lease2 -> {
            lease2.setProperties(lease.getProperties());
            return this.addOrUpdateLeaseAsync(lease2);
        }).toCompletable().andThen(this.leaseManager.deleteAsync(lease)).onErrorComplete(th -> {
            return true;
        });
    }
}
