package com.azure.data.cosmos.internal;

import com.azure.data.cosmos.BridgeInternal;
import com.azure.data.cosmos.ConnectionPolicy;
import com.azure.data.cosmos.internal.HttpConstants;
import com.azure.data.cosmos.internal.Utils;
import com.azure.data.cosmos.internal.routing.LocationCache;
import com.azure.data.cosmos.internal.routing.LocationHelper;
import java.net.URISyntaxException;
import java.net.URL;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.commons.collections4.list.UnmodifiableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/azure/data/cosmos/internal/GlobalEndpointManager.class */
public class GlobalEndpointManager implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(GlobalEndpointManager.class);
    private final int backgroundRefreshLocationTimeIntervalInMS;
    private final LocationCache locationCache;
    private final URL defaultEndpoint;
    private final ConnectionPolicy connectionPolicy;
    private final DatabaseAccountManagerInternal owner;
    private final AtomicBoolean isRefreshing;
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private final Scheduler scheduler = Schedulers.fromExecutor(this.executor);
    private volatile boolean isClosed;

    public GlobalEndpointManager(DatabaseAccountManagerInternal databaseAccountManagerInternal, ConnectionPolicy connectionPolicy, Configs configs) {
        this.backgroundRefreshLocationTimeIntervalInMS = configs.getUnavailableLocationsExpirationTimeInSeconds() * HttpConstants.SubStatusCodes.NAME_CACHE_IS_STALE;
        try {
            this.locationCache = new LocationCache(new ArrayList(connectionPolicy.preferredLocations() != null ? connectionPolicy.preferredLocations() : Collections.emptyList()), databaseAccountManagerInternal.getServiceEndpoint().toURL(), connectionPolicy.enableEndpointDiscovery(), BridgeInternal.getUseMultipleWriteLocations(connectionPolicy), configs);
            this.owner = databaseAccountManagerInternal;
            this.defaultEndpoint = databaseAccountManagerInternal.getServiceEndpoint().toURL();
            this.connectionPolicy = connectionPolicy;
            this.isRefreshing = new AtomicBoolean(false);
            this.isClosed = false;
        } catch (Exception e) {
            throw new IllegalArgumentException(e);
        }
    }

    public void init() {
        startRefreshLocationTimerAsync(true).block();
    }

    public UnmodifiableList<URL> getReadEndpoints() {
        return this.locationCache.getReadEndpoints();
    }

    public UnmodifiableList<URL> getWriteEndpoints() {
        return this.locationCache.getWriteEndpoints();
    }

    public static Mono<DatabaseAccount> getDatabaseAccountFromAnyLocationsAsync(URL url, List<String> list, Function<URL, Mono<DatabaseAccount>> function) {
        return function.apply(url).onErrorResume(th -> {
            logger.error("Fail to reach global gateway [{}], [{}]", url, th.getMessage());
            return list.isEmpty() ? Mono.error(th) : Flux.concatDelayError(Flux.range(0, list.size()).map(num -> {
                return ((Mono) function.apply(LocationHelper.getLocationEndpoint(url, (String) list.get(num.intValue())))).flux();
            })).take(1L).single().doOnError(th -> {
                logger.error("Fail to reach location any of locations {} {}", String.join(",", list), th.getMessage());
            });
        });
    }

    public URL resolveServiceEndpoint(RxDocumentServiceRequest rxDocumentServiceRequest) {
        return this.locationCache.resolveServiceEndpoint(rxDocumentServiceRequest);
    }

    public void markEndpointUnavailableForRead(URL url) {
        logger.debug("Marking endpoint {} unavailable for read", url);
        this.locationCache.markEndpointUnavailableForRead(url);
    }

    public void markEndpointUnavailableForWrite(URL url) {
        logger.debug("Marking  endpoint {} unavailable for Write", url);
        this.locationCache.markEndpointUnavailableForWrite(url);
    }

    public boolean CanUseMultipleWriteLocations(RxDocumentServiceRequest rxDocumentServiceRequest) {
        return this.locationCache.canUseMultipleWriteLocations(rxDocumentServiceRequest);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.isClosed = true;
        this.executor.shutdown();
        logger.debug("GlobalEndpointManager closed.");
    }

    public Mono<Void> refreshLocationAsync(DatabaseAccount databaseAccount) {
        return Mono.defer(() -> {
            logger.debug("refreshLocationAsync() invoked");
            if (this.isRefreshing.compareAndSet(false, true)) {
                logger.debug("will refresh");
                return refreshLocationPrivateAsync(databaseAccount).doOnError(th -> {
                    this.isRefreshing.set(false);
                });
            }
            logger.debug("in the middle of another refresh. Not invoking a new refresh.");
            return Mono.empty();
        });
    }

    private Mono<Void> refreshLocationPrivateAsync(DatabaseAccount databaseAccount) {
        return Mono.defer(() -> {
            logger.debug("refreshLocationPrivateAsync() refreshing locations");
            if (databaseAccount != null) {
                this.locationCache.onDatabaseAccountRead(databaseAccount);
            }
            Utils.ValueHolder valueHolder = new Utils.ValueHolder();
            if (!this.locationCache.shouldRefreshEndpoints(valueHolder)) {
                logger.debug("shouldRefreshEndpoints: false, nothing to do.");
                this.isRefreshing.set(false);
                return Mono.empty();
            }
            logger.debug("shouldRefreshEndpoints: true");
            if (databaseAccount != null || ((Boolean) valueHolder.v).booleanValue()) {
                startRefreshLocationTimerAsync();
                return Mono.empty();
            }
            logger.debug("shouldRefreshEndpoints: can't be done in background");
            return getDatabaseAccountFromAnyLocationsAsync(this.defaultEndpoint, new ArrayList(this.connectionPolicy.preferredLocations()), this::getDatabaseAccountAsync).map(databaseAccount2 -> {
                this.locationCache.onDatabaseAccountRead(databaseAccount2);
                return databaseAccount2;
            }).flatMap(databaseAccount3 -> {
                startRefreshLocationTimerAsync();
                return Mono.empty();
            });
        });
    }

    private void startRefreshLocationTimerAsync() {
        startRefreshLocationTimerAsync(false).subscribe();
    }

    private Mono<Void> startRefreshLocationTimerAsync(boolean z) {
        if (this.isClosed) {
            logger.debug("startRefreshLocationTimerAsync: nothing to do, it is closed");
            return Mono.empty();
        }
        logger.debug("registering a refresh in [{}] ms", Integer.valueOf(this.backgroundRefreshLocationTimeIntervalInMS));
        LocalDateTime now = LocalDateTime.now();
        return Mono.delay(Duration.ofMillis(z ? 0 : this.backgroundRefreshLocationTimeIntervalInMS)).flatMap(l -> {
            if (this.isClosed) {
                logger.warn("client already closed");
                return Mono.empty();
            }
            logger.debug("startRefreshLocationTimerAsync() - Invoking refresh, I was registered on [{}]", now);
            return getDatabaseAccountFromAnyLocationsAsync(this.defaultEndpoint, new ArrayList(this.connectionPolicy.preferredLocations()), this::getDatabaseAccountAsync).flatMap(databaseAccount -> {
                logger.debug("db account retrieved");
                return refreshLocationPrivateAsync(databaseAccount);
            });
        }).onErrorResume(th -> {
            logger.error("startRefreshLocationTimerAsync() - Unable to refresh database account from any location. Exception: {}", th.toString(), th);
            startRefreshLocationTimerAsync();
            return Mono.empty();
        }).subscribeOn(this.scheduler);
    }

    private Mono<DatabaseAccount> getDatabaseAccountAsync(URL url) {
        try {
            return this.owner.getDatabaseAccountFromEndpoint(url.toURI()).doOnNext(databaseAccount -> {
                logger.debug("account retrieved: {}", databaseAccount);
            }).single();
        } catch (URISyntaxException e) {
            return Mono.error(e);
        }
    }

    public boolean isClosed() {
        return this.isClosed;
    }
}
