package com.azure.data.cosmos.internal.changefeed.implementation;

import com.azure.data.cosmos.internal.changefeed.CancellationToken;
import com.azure.data.cosmos.internal.changefeed.CancellationTokenSource;
import com.azure.data.cosmos.internal.changefeed.Lease;
import com.azure.data.cosmos.internal.changefeed.LeaseContainer;
import com.azure.data.cosmos.internal.changefeed.PartitionController;
import com.azure.data.cosmos.internal.changefeed.PartitionLoadBalancer;
import com.azure.data.cosmos.internal.changefeed.PartitionLoadBalancingStrategy;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/data/cosmos/internal/changefeed/implementation/PartitionLoadBalancerImpl.class */
class PartitionLoadBalancerImpl implements PartitionLoadBalancer {
    private final Logger logger = LoggerFactory.getLogger(PartitionLoadBalancerImpl.class);
    private final PartitionController partitionController;
    private final LeaseContainer leaseContainer;
    private final PartitionLoadBalancingStrategy partitionLoadBalancingStrategy;
    private final Duration leaseAcquireInterval;
    private final ExecutorService executorService;
    private CancellationTokenSource cancellationTokenSource;
    private volatile boolean started;
    private final Object lock;

    public PartitionLoadBalancerImpl(PartitionController partitionController, LeaseContainer leaseContainer, PartitionLoadBalancingStrategy partitionLoadBalancingStrategy, Duration duration, ExecutorService executorService) {
        if (partitionController == null) {
            throw new IllegalArgumentException("partitionController");
        }
        if (leaseContainer == null) {
            throw new IllegalArgumentException("leaseContainer");
        }
        if (partitionLoadBalancingStrategy == null) {
            throw new IllegalArgumentException("partitionLoadBalancingStrategy");
        }
        if (executorService == null) {
            throw new IllegalArgumentException("executorService");
        }
        this.partitionController = partitionController;
        this.leaseContainer = leaseContainer;
        this.partitionLoadBalancingStrategy = partitionLoadBalancingStrategy;
        this.leaseAcquireInterval = duration;
        this.executorService = executorService;
        this.started = false;
        this.lock = new Object();
    }

    @Override // com.azure.data.cosmos.internal.changefeed.PartitionLoadBalancer
    public Mono<Void> start() {
        return Mono.fromRunnable(() -> {
            synchronized (this.lock) {
                if (this.started) {
                    throw new IllegalStateException("Partition load balancer already started");
                }
                this.started = true;
                this.cancellationTokenSource = new CancellationTokenSource();
            }
            CancellationToken token = this.cancellationTokenSource.getToken();
            this.executorService.execute(() -> {
            });
        });
    }

    @Override // com.azure.data.cosmos.internal.changefeed.PartitionLoadBalancer
    public Mono<Void> stop() {
        return Mono.fromRunnable(() -> {
            synchronized (this.lock) {
                this.started = false;
                this.cancellationTokenSource.cancel();
            }
            this.partitionController.shutdown().block();
            this.cancellationTokenSource = null;
        });
    }

    private Mono<Void> run(CancellationToken cancellationToken) {
        return Mono.fromRunnable(() -> {
            while (!cancellationToken.isCancellationRequested()) {
                try {
                    Iterator<Lease> it = this.partitionLoadBalancingStrategy.selectLeasesToTake((List) this.leaseContainer.getAllLeases().collectList().block()).iterator();
                    while (it.hasNext()) {
                        this.partitionController.addOrUpdateLease(it.next()).block();
                    }
                    for (long millis = this.leaseAcquireInterval.toMillis(); !cancellationToken.isCancellationRequested() && millis > 0; millis -= 100) {
                        try {
                            Thread.sleep(100L);
                        } catch (InterruptedException e) {
                            this.logger.warn("Partition load balancer caught an interrupted exception", e);
                        }
                    }
                } catch (Exception e2) {
                    this.logger.info("Partition load balancer task stopped.");
                    stop();
                    return;
                }
            }
        });
    }
}
