package com.couchbase.client.dcp.conductor;

import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.buffer.DcpBucketConfig;
import com.couchbase.client.dcp.config.HostAndPort;
import com.couchbase.client.dcp.core.config.AlternateAddress;
import com.couchbase.client.dcp.core.config.BucketConfig;
import com.couchbase.client.dcp.core.config.CouchbaseBucketConfig;
import com.couchbase.client.dcp.core.config.NodeInfo;
import com.couchbase.client.dcp.core.config.parser.BucketConfigParser;
import com.couchbase.client.dcp.core.env.NetworkResolution;
import com.couchbase.client.dcp.core.logging.RedactableArgument;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.ReplayProcessor;

/* loaded from: input_file:com/couchbase/client/dcp/conductor/BucketConfigArbiter.class */
public class BucketConfigArbiter implements BucketConfigSink, BucketConfigSource {
    private final ReplayProcessor<DcpBucketConfig> configStream = ReplayProcessor.cacheLast();
    private final FluxSink<DcpBucketConfig> configSink = this.configStream.sink(FluxSink.OverflowStrategy.LATEST);
    private final Object revLock = new Object();
    private long currentRev = -1;
    private boolean hasDeterminedAlternateNetworkName = false;
    private String alternateNetworkName;
    private final Client.Environment environment;
    private static final Logger log = LoggerFactory.getLogger(BucketConfigArbiter.class);
    private static final Pattern REV_PATTERN = Pattern.compile("\"rev\"\\s*:\\s*(-?\\d+)");

    public BucketConfigArbiter(Client.Environment environment) {
        this.environment = (Client.Environment) Objects.requireNonNull(environment);
    }

    @Override // com.couchbase.client.dcp.conductor.BucketConfigSink
    public void accept(HostAndPort hostAndPort, String str, long j) {
        synchronized (this.revLock) {
            if (j <= this.currentRev) {
                log.debug("Ignoring bucket config revision {} from {}; not newer than current revision {}", new Object[]{hostAndPort, Long.valueOf(j), Long.valueOf(this.currentRev)});
                return;
            }
            log.debug("Received bucket config revision {} from {} -> {}", new Object[]{Long.valueOf(j), hostAndPort, RedactableArgument.system(str)});
            try {
                this.currentRev = j;
                CouchbaseBucketConfig couchbaseBucketConfig = (CouchbaseBucketConfig) BucketConfigParser.parse(str, hostAndPort);
                selectAlternateNetwork(couchbaseBucketConfig);
                this.configSink.next(new DcpBucketConfig(couchbaseBucketConfig, this.environment.sslEnabled()));
            } catch (Exception e) {
                log.error("Failed to parse bucket config", e);
            }
        }
    }

    @Override // com.couchbase.client.dcp.conductor.BucketConfigSink
    public void accept(HostAndPort hostAndPort, String str) {
        try {
            accept(hostAndPort, str, getRev(str));
        } catch (Exception e) {
            log.error("Failed to parse bucket config", e);
        }
    }

    @Override // com.couchbase.client.dcp.conductor.BucketConfigSource
    public Flux<DcpBucketConfig> configs() {
        return this.configStream;
    }

    private void selectAlternateNetwork(CouchbaseBucketConfig couchbaseBucketConfig) {
        if (!Thread.holdsLock(this.revLock)) {
            throw new IllegalStateException("Must hold revLock");
        }
        if (!this.hasDeterminedAlternateNetworkName) {
            this.alternateNetworkName = determineNetworkResolution(couchbaseBucketConfig, this.environment.networkResolution(), (Set) this.environment.clusterAt().stream().map((v0) -> {
                return v0.host();
            }).collect(Collectors.toSet()));
            this.hasDeterminedAlternateNetworkName = true;
            String str = this.alternateNetworkName == null ? "<default>" : this.alternateNetworkName;
            if (NetworkResolution.AUTO.equals(this.environment.networkResolution())) {
                str = "auto -> " + str;
            }
            log.info("Selected network: {}", str);
        }
        couchbaseBucketConfig.useAlternateNetwork(this.alternateNetworkName);
    }

    private static String determineNetworkResolution(BucketConfig bucketConfig, NetworkResolution networkResolution, Set<String> set) {
        if (networkResolution.equals(NetworkResolution.DEFAULT)) {
            return null;
        }
        if (!networkResolution.equals(NetworkResolution.AUTO)) {
            return networkResolution.name();
        }
        for (NodeInfo nodeInfo : bucketConfig.nodes()) {
            if (set.contains(nodeInfo.hostname())) {
                return null;
            }
            Map<String, AlternateAddress> alternateAddresses = nodeInfo.alternateAddresses();
            if (alternateAddresses != null && !alternateAddresses.isEmpty()) {
                for (Map.Entry<String, AlternateAddress> entry : alternateAddresses.entrySet()) {
                    AlternateAddress value = entry.getValue();
                    if (value != null && set.contains(value.hostname())) {
                        return entry.getKey();
                    }
                }
            }
        }
        return null;
    }

    private static long getRev(String str) {
        Matcher matcher = REV_PATTERN.matcher(str);
        if (matcher.find()) {
            return Long.parseLong(matcher.group(1));
        }
        throw new IllegalArgumentException("Failed to locate revision property in " + RedactableArgument.system(str));
    }
}
