package com.couchbase.client.dcp.conductor;

import com.couchbase.client.core.util.HostAndPort;
import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.buffer.DcpBucketConfig;
import com.couchbase.client.dcp.core.config.ConfigRevision;
import com.couchbase.client.dcp.core.config.CouchbaseBucketConfigParser;
import com.couchbase.client.dcp.core.logging.RedactableArgument;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.ReplayProcessor;

/* loaded from: input_file:com/couchbase/client/dcp/conductor/BucketConfigArbiter.class */
public class BucketConfigArbiter implements BucketConfigSink, BucketConfigSource {
    private final ReplayProcessor<DcpBucketConfig> configStream = ReplayProcessor.cacheLast();
    private final FluxSink<DcpBucketConfig> configSink = this.configStream.sink(FluxSink.OverflowStrategy.LATEST);
    private final Object revLock = new Object();
    private ConfigRevision currentRev = ConfigRevision.ZERO;
    private final Client.Environment environment;
    private static final Logger log = LoggerFactory.getLogger(BucketConfigArbiter.class);
    private static final Pattern REV_PATTERN = Pattern.compile("\"rev\"\\s*:\\s*(-?\\d+)");
    private static final Pattern REV_EPOCH_PATTERN = Pattern.compile("\"revEpoch\"\\s*:\\s*(-?\\d+)");

    public BucketConfigArbiter(Client.Environment environment) {
        this.environment = (Client.Environment) Objects.requireNonNull(environment);
    }

    @Override // com.couchbase.client.dcp.conductor.BucketConfigSink
    public void accept(HostAndPort hostAndPort, String str, ConfigRevision configRevision) {
        synchronized (this.revLock) {
            if (!configRevision.newerThan(this.currentRev)) {
                log.debug("Ignoring config revision {} from {}; not newer than current revision {}", new Object[]{configRevision, hostAndPort, this.currentRev});
                return;
            }
            log.debug("Received config revision {} from {} -> {}", new Object[]{configRevision, hostAndPort, RedactableArgument.redactSystem(str)});
            if (!str.contains("\"nodeLocator\"")) {
                log.info("Received a global config (revision {}). Ignoring it, because a global config doesn't have info about the bucket. Waiting for a bucket config instead!", configRevision);
                return;
            }
            try {
                this.currentRev = configRevision;
                this.configSink.next(new DcpBucketConfig(CouchbaseBucketConfigParser.parse(str.getBytes(StandardCharsets.UTF_8), hostAndPort.host(), this.environment.portSelector(), this.environment.networkSelector())));
            } catch (Exception e) {
                log.error("Failed to parse bucket config", e);
            }
        }
    }

    @Override // com.couchbase.client.dcp.conductor.BucketConfigSink
    public void accept(HostAndPort hostAndPort, String str) {
        try {
            accept(hostAndPort, str, getRev(str));
        } catch (Exception e) {
            log.error("Failed to parse bucket config", e);
        }
    }

    @Override // com.couchbase.client.dcp.conductor.BucketConfigSource
    public Flux<DcpBucketConfig> configs() {
        return this.configStream;
    }

    private static OptionalLong matchLong(Pattern pattern, String str) {
        Matcher matcher = pattern.matcher(str);
        return matcher.find() ? OptionalLong.of(Long.parseLong(matcher.group(1))) : OptionalLong.empty();
    }

    private static ConfigRevision getRev(String str) {
        return new ConfigRevision(matchLong(REV_EPOCH_PATTERN, str).orElse(0L), matchLong(REV_PATTERN, str).orElseThrow(() -> {
            return new IllegalArgumentException("Failed to locate revision property in " + RedactableArgument.redactSystem(str));
        }));
    }
}
