package com.azure.data.cosmos.internal.changefeed.implementation;

import com.azure.data.cosmos.internal.changefeed.CancellationToken;
import com.azure.data.cosmos.internal.changefeed.CancellationTokenSource;
import com.azure.data.cosmos.internal.changefeed.Lease;
import com.azure.data.cosmos.internal.changefeed.LeaseContainer;
import com.azure.data.cosmos.internal.changefeed.LeaseManager;
import com.azure.data.cosmos.internal.changefeed.PartitionController;
import com.azure.data.cosmos.internal.changefeed.PartitionSupervisor;
import com.azure.data.cosmos.internal.changefeed.PartitionSupervisorFactory;
import com.azure.data.cosmos.internal.changefeed.PartitionSynchronizer;
import com.azure.data.cosmos.internal.changefeed.exceptions.PartitionSplitException;
import com.azure.data.cosmos.internal.changefeed.exceptions.TaskCancelledException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/data/cosmos/internal/changefeed/implementation/PartitionControllerImpl.class */
class PartitionControllerImpl implements PartitionController {
    private final Logger logger = LoggerFactory.getLogger(PartitionControllerImpl.class);
    private final Map<String, WorkerTask> currentlyOwnedPartitions = new ConcurrentHashMap();
    private final LeaseContainer leaseContainer;
    private final LeaseManager leaseManager;
    private final PartitionSupervisorFactory partitionSupervisorFactory;
    private final PartitionSynchronizer synchronizer;
    private CancellationTokenSource shutdownCts;
    private final ExecutorService executorService;

    public PartitionControllerImpl(LeaseContainer leaseContainer, LeaseManager leaseManager, PartitionSupervisorFactory partitionSupervisorFactory, PartitionSynchronizer partitionSynchronizer, ExecutorService executorService) {
        this.leaseContainer = leaseContainer;
        this.leaseManager = leaseManager;
        this.partitionSupervisorFactory = partitionSupervisorFactory;
        this.synchronizer = partitionSynchronizer;
        this.executorService = executorService;
    }

    @Override // com.azure.data.cosmos.internal.changefeed.PartitionController
    public Mono<Void> initialize() {
        this.shutdownCts = new CancellationTokenSource();
        return loadLeases();
    }

    @Override // com.azure.data.cosmos.internal.changefeed.PartitionController
    public synchronized Mono<Lease> addOrUpdateLease(Lease lease) {
        WorkerTask workerTask = this.currentlyOwnedPartitions.get(lease.getLeaseToken());
        if (workerTask != null && workerTask.isRunning()) {
            Lease lease2 = (Lease) this.leaseManager.updateProperties(lease).block();
            this.logger.debug(String.format("Partition %s: updated.", lease.getLeaseToken()));
            return Mono.just(lease2);
        }
        try {
            Lease lease3 = (Lease) this.leaseManager.acquire(lease).block();
            if (lease3 != null) {
                lease = lease3;
            }
            this.logger.info(String.format("Partition %s: acquired.", lease.getLeaseToken()));
            this.currentlyOwnedPartitions.put(lease.getLeaseToken(), processPartition(this.partitionSupervisorFactory.create(lease), lease));
            return Mono.just(lease);
        } catch (RuntimeException e) {
            removeLease(lease).block();
            throw e;
        }
    }

    @Override // com.azure.data.cosmos.internal.changefeed.PartitionController
    public Mono<Void> shutdown() {
        this.shutdownCts.cancel();
        return Mono.empty();
    }

    private Mono<Void> loadLeases() {
        this.logger.debug("Starting renew leases assigned to this host on initialize.");
        return this.leaseContainer.getOwnedLeases().flatMap(lease -> {
            this.logger.info(String.format("Acquired lease for PartitionId '%s' on startup.", lease.getLeaseToken()));
            return this.addOrUpdateLease(lease);
        }).then();
    }

    private Mono<Void> removeLease(Lease lease) {
        if (this.currentlyOwnedPartitions.get(lease.getLeaseToken()) != null) {
            WorkerTask remove = this.currentlyOwnedPartitions.remove(lease.getLeaseToken());
            if (remove.isRunning()) {
                remove.interrupt();
            }
            this.logger.info(String.format("Partition %s: released.", lease.getLeaseToken()));
        }
        return this.leaseManager.release(lease).onErrorResume(th -> {
            this.logger.warn(String.format("Partition %s: failed to remove lease.", lease.getLeaseToken()), th);
            return Mono.empty();
        }).doOnSuccess(r6 -> {
            this.logger.info("Partition {}: successfully removed lease.", lease.getLeaseToken());
        });
    }

    private WorkerTask processPartition(PartitionSupervisor partitionSupervisor, Lease lease) {
        CancellationToken token = this.shutdownCts.getToken();
        WorkerTask workerTask = new WorkerTask(lease, () -> {
            partitionSupervisor.run(token).onErrorResume(th -> {
                if (th instanceof PartitionSplitException) {
                    return this.handleSplit(lease, ((PartitionSplitException) th).getLastContinuation());
                }
                if (th instanceof TaskCancelledException) {
                    this.logger.debug(String.format("Partition %s: processing canceled.", lease.getLeaseToken()));
                } else {
                    this.logger.warn(String.format("Partition %s: processing failed.", lease.getLeaseToken()), th);
                }
                return Mono.empty();
            }).then(this.removeLease(lease)).subscribe();
        });
        this.executorService.execute(workerTask);
        return workerTask;
    }

    private Mono<Void> handleSplit(Lease lease, String str) {
        lease.setContinuationToken(str);
        return this.synchronizer.splitPartition(lease).flatMap(lease2 -> {
            lease2.setProperties(lease.getProperties());
            return this.addOrUpdateLease(lease2);
        }).then(this.leaseManager.delete(lease)).onErrorResume(th -> {
            this.logger.warn(String.format("partition %s: failed to split", lease.getLeaseToken()), th);
            return Mono.empty();
        });
    }
}
