package com.couchbase.client.dcp;

import com.couchbase.client.dcp.buffer.PersistedSeqnos;
import com.couchbase.client.dcp.buffer.StreamEventBuffer;
import com.couchbase.client.dcp.conductor.Conductor;
import com.couchbase.client.dcp.config.CompressionMode;
import com.couchbase.client.dcp.config.DcpControl;
import com.couchbase.client.dcp.config.HostAndPort;
import com.couchbase.client.dcp.config.SecureEnvironment;
import com.couchbase.client.dcp.core.env.CoreScheduler;
import com.couchbase.client.dcp.core.env.NetworkResolution;
import com.couchbase.client.dcp.core.env.resources.NoOpShutdownHook;
import com.couchbase.client.dcp.core.env.resources.ShutdownHook;
import com.couchbase.client.dcp.core.event.CouchbaseEvent;
import com.couchbase.client.dcp.core.event.EventBus;
import com.couchbase.client.dcp.core.event.EventType;
import com.couchbase.client.dcp.core.logging.RedactableArgument;
import com.couchbase.client.dcp.core.time.Delay;
import com.couchbase.client.dcp.core.utils.CbCollections;
import com.couchbase.client.dcp.core.utils.ConnectionString;
import com.couchbase.client.dcp.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.dcp.deps.io.netty.channel.EventLoopGroup;
import com.couchbase.client.dcp.deps.io.netty.channel.epoll.Epoll;
import com.couchbase.client.dcp.deps.io.netty.channel.epoll.EpollEventLoopGroup;
import com.couchbase.client.dcp.deps.io.netty.channel.kqueue.KQueue;
import com.couchbase.client.dcp.deps.io.netty.channel.kqueue.KQueueEventLoopGroup;
import com.couchbase.client.dcp.deps.io.netty.channel.nio.NioEventLoopGroup;
import com.couchbase.client.dcp.deps.io.netty.util.concurrent.DefaultThreadFactory;
import com.couchbase.client.dcp.error.BootstrapException;
import com.couchbase.client.dcp.error.RollbackException;
import com.couchbase.client.dcp.events.DefaultDcpEventBus;
import com.couchbase.client.dcp.events.StreamEndEvent;
import com.couchbase.client.dcp.highlevel.DatabaseChangeListener;
import com.couchbase.client.dcp.highlevel.FlowControlMode;
import com.couchbase.client.dcp.highlevel.SnapshotMarker;
import com.couchbase.client.dcp.highlevel.StreamOffset;
import com.couchbase.client.dcp.highlevel.internal.AsyncEventDispatcher;
import com.couchbase.client.dcp.highlevel.internal.CollectionsManifest;
import com.couchbase.client.dcp.highlevel.internal.EventHandlerAdapter;
import com.couchbase.client.dcp.highlevel.internal.ImmediateEventDispatcher;
import com.couchbase.client.dcp.message.DcpDeletionMessage;
import com.couchbase.client.dcp.message.DcpExpirationMessage;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.DcpSeqnoAdvancedRequest;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerRequest;
import com.couchbase.client.dcp.message.DcpSystemEvent;
import com.couchbase.client.dcp.message.DcpSystemEventRequest;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.message.OpenConnectionFlag;
import com.couchbase.client.dcp.message.RollbackMessage;
import com.couchbase.client.dcp.message.StreamEndReason;
import com.couchbase.client.dcp.metrics.DcpClientMetrics;
import com.couchbase.client.dcp.metrics.MetricsContext;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.state.SessionState;
import com.couchbase.client.dcp.state.StateFormat;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.dcp.util.MathUtils;
import java.io.Closeable;
import java.nio.charset.StandardCharsets;
import java.security.KeyStore;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Completable;
import rx.CompletableSubscriber;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;

/* loaded from: input_file:com/couchbase/client/dcp/Client.class */
public class Client implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(Client.class);
    private static final ThreadFactory threadFactory = new DefaultThreadFactory("dcp-io");
    private final Conductor conductor;
    private final Environment env;
    private final boolean bufferAckEnabled;
    private volatile AsyncEventDispatcher listenerDispatcher;
    private final AtomicBoolean hasHighLevelListener;

    /* loaded from: input_file:com/couchbase/client/dcp/Client$Builder.class */
    public static class Builder {
        private EventLoopGroup eventLoopGroup;
        private boolean collectionsAware;
        private int bufferAckWatermark;
        private EventBus eventBus;
        private String sslKeystoreFile;
        private String sslKeystorePassword;
        private KeyStore sslKeystore;
        private long persistencePollingIntervalMillis;
        private List<HostAndPort> seedNodes = Collections.singletonList(new HostAndPort("127.0.0.1", 0));
        private NetworkResolution networkResolution = NetworkResolution.AUTO;
        private String bucket = "default";
        private Set<Long> collectionIds = new HashSet();
        private Set<String> collectionNames = new HashSet();
        private OptionalLong scopeId = OptionalLong.empty();
        private Optional<String> scopeName = Optional.empty();
        private CredentialsProvider credentialsProvider = new StaticCredentialsProvider("", "");
        private ConnectionNameGenerator connectionNameGenerator = DefaultConnectionNameGenerator.INSTANCE;
        private final DcpControl dcpControl = new DcpControl().put(DcpControl.Names.ENABLE_NOOP, "true");
        private final EnumSet<OpenConnectionFlag> connectionFlags = EnumSet.noneOf(OpenConnectionFlag.class);
        private boolean poolBuffers = true;
        private long connectTimeout = Environment.DEFAULT_SOCKET_CONNECT_TIMEOUT;
        private Duration bootstrapTimeout = Environment.DEFAULT_BOOTSTRAP_TIMEOUT;
        private Duration configRefreshInterval = Environment.DEFAULT_CONFIG_REFRESH_INTERVAL;
        private long socketConnectTimeout = Environment.DEFAULT_SOCKET_CONNECT_TIMEOUT;
        private int dcpChannelsReconnectMaxAttempts = Integer.MAX_VALUE;
        private Delay dcpChannelsReconnectDelay = Environment.DEFAULT_DCP_CHANNELS_RECONNECT_DELAY;
        private boolean sslEnabled = false;

        public Builder noValue(boolean z) {
            return setConnectionFlag(OpenConnectionFlag.NO_VALUE, z);
        }

        public Builder xattrs(boolean z) {
            return setConnectionFlag(OpenConnectionFlag.INCLUDE_XATTRS, z);
        }

        private Builder setConnectionFlag(OpenConnectionFlag openConnectionFlag, boolean z) {
            if (z) {
                this.connectionFlags.add(openConnectionFlag);
            } else {
                this.connectionFlags.remove(openConnectionFlag);
            }
            return this;
        }

        public Builder bufferAckWatermark(int i) {
            if (i > 100 || i < 0) {
                throw new IllegalArgumentException("The bufferAckWatermark is percents, so it needs to be between 0 and 100");
            }
            this.bufferAckWatermark = i;
            return this;
        }

        public Builder seedNodes(Collection<String> collection) {
            this.seedNodes = getSeedNodes(ConnectionString.fromHostnames(new ArrayList(new HashSet(collection))));
            return this;
        }

        public Builder seedNodes(String... strArr) {
            return seedNodes(Arrays.asList(strArr));
        }

        @Deprecated
        public Builder hostnames(List<String> list) {
            return seedNodes(list);
        }

        @Deprecated
        public Builder hostnames(String... strArr) {
            return seedNodes(strArr);
        }

        public Builder connectionString(String str) {
            this.seedNodes = getSeedNodes(ConnectionString.create(str));
            return this;
        }

        private static List<HostAndPort> getSeedNodes(ConnectionString connectionString) {
            return (List) connectionString.hosts().stream().map(unresolvedSocket -> {
                return new HostAndPort(unresolvedSocket.hostname(), unresolvedSocket.port());
            }).collect(Collectors.toList());
        }

        public Builder networkResolution(NetworkResolution networkResolution) {
            this.networkResolution = (NetworkResolution) Objects.requireNonNull(networkResolution);
            return this;
        }

        public Builder eventLoopGroup(EventLoopGroup eventLoopGroup) {
            this.eventLoopGroup = eventLoopGroup;
            return this;
        }

        public Builder bucket(String str) {
            this.bucket = str;
            if (this.credentialsProvider.get(null).getUsername().isEmpty()) {
                username(str);
            }
            return this;
        }

        public Builder collectionsAware(boolean z) {
            this.collectionsAware = z;
            return this;
        }

        public Builder collectionNames(Collection<String> collection) {
            for (String str : collection) {
                if (str == null) {
                    throw new IllegalArgumentException("Collection name must not be null");
                }
                if (str.split("\\.", -1).length != 2) {
                    throw new IllegalArgumentException("Collection name '" + str + "' must be qualified by a scope name, like: myScope.myCollection");
                }
            }
            this.collectionNames = new HashSet(collection);
            return this;
        }

        public Builder collectionNames(String... strArr) {
            return collectionNames(Arrays.asList(strArr));
        }

        public Builder collectionIds(Collection<Long> collection) {
            if (collection.stream().anyMatch((v0) -> {
                return Objects.isNull(v0);
            })) {
                throw new IllegalArgumentException("Collection ID must not be null");
            }
            this.collectionIds = new HashSet(collection);
            return this;
        }

        public Builder collectionIds(long... jArr) {
            return collectionIds((Collection<Long>) Arrays.stream(jArr).boxed().collect(Collectors.toList()));
        }

        public Builder scopeName(String str) {
            this.scopeName = CbCollections.isNullOrEmpty(str) ? Optional.empty() : Optional.of(str);
            return this;
        }

        public Builder scopeId(long j) {
            this.scopeId = OptionalLong.of(j);
            return this;
        }

        public Builder credentials(String str, String str2) {
            this.credentialsProvider = new StaticCredentialsProvider(str, str2);
            return this;
        }

        @Deprecated
        public Builder username(String str) {
            this.credentialsProvider = new StaticCredentialsProvider(str, this.credentialsProvider.get(null).getPassword());
            return this;
        }

        public Builder credentialsProvider(CredentialsProvider credentialsProvider) {
            this.credentialsProvider = (CredentialsProvider) Objects.requireNonNull(credentialsProvider);
            return this;
        }

        @Deprecated
        public Builder password(String str) {
            this.credentialsProvider = new StaticCredentialsProvider(this.credentialsProvider.get(null).getUsername(), str);
            return this;
        }

        public Builder userAgent(String str, String str2, String... strArr) {
            return connectionNameGenerator(DefaultConnectionNameGenerator.forProduct(str, str2, strArr));
        }

        public Builder connectionNameGenerator(ConnectionNameGenerator connectionNameGenerator) {
            this.connectionNameGenerator = connectionNameGenerator;
            return this;
        }

        public Builder controlParam(DcpControl.Names names, Object obj) {
            this.dcpControl.put(names, obj.toString());
            return this;
        }

        public Builder compression(CompressionMode compressionMode) {
            this.dcpControl.compression(compressionMode);
            return this;
        }

        public Builder poolBuffers(boolean z) {
            this.poolBuffers = z;
            return this;
        }

        public Builder socketConnectTimeout(long j) {
            this.socketConnectTimeout = j;
            return this;
        }

        public Builder bootstrapTimeout(Duration duration) {
            this.bootstrapTimeout = duration;
            return this;
        }

        public Builder configRefreshInterval(Duration duration) {
            if (duration.compareTo(Duration.ofSeconds(1L)) < 0) {
                throw new IllegalArgumentException("Minimum config refresh interval is 1 second.");
            }
            if (duration.compareTo(Duration.ofMinutes(2L)) > 0) {
                throw new IllegalArgumentException("Maximum config refresh interval is 2 minutes.");
            }
            this.configRefreshInterval = (Duration) Objects.requireNonNull(duration);
            return this;
        }

        public Builder connectTimeout(long j) {
            this.connectTimeout = j;
            return this;
        }

        public Builder dcpChannelsReconnectMaxAttempts(int i) {
            this.dcpChannelsReconnectMaxAttempts = i;
            return this;
        }

        public Builder dcpChannelsReconnectDelay(Delay delay) {
            this.dcpChannelsReconnectDelay = delay;
            return this;
        }

        public Builder eventBus(EventBus eventBus) {
            this.eventBus = eventBus;
            return this;
        }

        public Builder sslEnabled(boolean z) {
            this.sslEnabled = z;
            return this;
        }

        public Builder sslKeystoreFile(String str) {
            this.sslKeystoreFile = str;
            return this;
        }

        public Builder sslKeystorePassword(String str) {
            this.sslKeystorePassword = str;
            return this;
        }

        public Builder sslKeystore(KeyStore keyStore) {
            this.sslKeystore = keyStore;
            return this;
        }

        public Builder mitigateRollbacks(long j, TimeUnit timeUnit) {
            this.persistencePollingIntervalMillis = timeUnit.toMillis(j);
            return this;
        }

        public Builder flowControl(int i) {
            controlParam(DcpControl.Names.CONNECTION_BUFFER_SIZE, Integer.valueOf(i));
            if (this.bufferAckWatermark == 0) {
                this.bufferAckWatermark = 80;
            }
            return this;
        }

        public Client build() {
            if (this.collectionsAware && !this.dcpControl.noopEnabled()) {
                throw new IllegalStateException("Collections awareness requires NOOPs; must not disable NOOPs.");
            }
            if (this.scopeName.isPresent() && this.scopeId.isPresent()) {
                throw new IllegalStateException("May specify scope name or ID, but not both.");
            }
            boolean z = (this.collectionIds.isEmpty() && this.collectionNames.isEmpty()) ? false : true;
            boolean z2 = this.scopeId.isPresent() || this.scopeName.isPresent();
            if (z && z2) {
                throw new IllegalStateException("May specify scope or collections, but not both.");
            }
            if ((z || z2) && !this.collectionsAware) {
                throw new IllegalStateException("Must call collectionsAware(true) when specifying scope or collections.");
            }
            return new Client(this);
        }
    }

    /* loaded from: input_file:com/couchbase/client/dcp/Client$Environment.class */
    public static class Environment implements SecureEnvironment {
        private static final Logger log = LoggerFactory.getLogger(Environment.class);
        public static final Duration DEFAULT_BOOTSTRAP_TIMEOUT = Duration.ofSeconds(5);
        public static final Duration DEFAULT_CONFIG_REFRESH_INTERVAL = Duration.ofSeconds(2);
        public static final long DEFAULT_CONNECT_TIMEOUT = TimeUnit.SECONDS.toMillis(10);
        public static final long DEFAULT_SOCKET_CONNECT_TIMEOUT = TimeUnit.SECONDS.toMillis(1);
        public static final Delay DEFAULT_DCP_CHANNELS_RECONNECT_DELAY = Delay.fixed(200, TimeUnit.MILLISECONDS);
        public static final int DEFAULT_DCP_CHANNELS_RECONNECT_MAX_ATTEMPTS = Integer.MAX_VALUE;
        public static final boolean DEFAULT_SSL_ENABLED = false;
        private static final int DEFAULT_KV_PORT = 11210;
        private static final int DEFAULT_KV_TLS_PORT = 11207;
        private final List<HostAndPort> seedNodes;
        private final NetworkResolution networkResolution;
        private final ConnectionNameGenerator connectionNameGenerator;
        private final String bucket;
        private final boolean collectionsAware;
        private final OptionalLong scopeId;
        private final Optional<String> scopeName;
        private final Set<Long> collectionIds;
        private final Set<String> collectionNames;
        private final CredentialsProvider credentialsProvider;
        private final Duration bootstrapTimeout;
        private final Duration configRefreshInterval;
        private final long connectTimeout;
        private final DcpControl dcpControl;
        private final Set<OpenConnectionFlag> connectionFlags;
        private final EventLoopGroup eventLoopGroup;
        private final boolean eventLoopGroupIsPrivate;
        private final boolean poolBuffers;
        private final int bufferAckWatermark;
        private final long socketConnectTimeout;
        private final long persistencePollingIntervalMillis;
        private volatile DataEventHandler dataEventHandler;
        private volatile ControlEventHandler controlEventHandler;
        private final PersistedSeqnos persistedSeqnos;
        private final Delay dcpChannelsReconnectDelay;
        private final int dcpChannelsReconnectMaxAttempts;
        private final EventBus eventBus;
        private final Scheduler scheduler;
        private final ShutdownHook schedulerShutdownHook;
        private Subscription systemEventSubscription;
        private final boolean sslEnabled;
        private final String sslKeystoreFile;
        private final String sslKeystorePassword;
        private final KeyStore sslKeystore;

        private Environment(Builder builder) {
            this.persistedSeqnos = PersistedSeqnos.uninitialized();
            this.connectionNameGenerator = builder.connectionNameGenerator;
            this.bucket = builder.bucket;
            this.credentialsProvider = builder.credentialsProvider;
            this.bootstrapTimeout = builder.bootstrapTimeout;
            this.configRefreshInterval = builder.configRefreshInterval;
            this.connectTimeout = builder.connectTimeout;
            this.dcpControl = builder.dcpControl;
            this.connectionFlags = Collections.unmodifiableSet(EnumSet.copyOf(builder.connectionFlags));
            this.eventLoopGroup = (EventLoopGroup) Optional.ofNullable(builder.eventLoopGroup).orElseGet(() -> {
                return Client.access$3200();
            });
            this.eventLoopGroupIsPrivate = builder.eventLoopGroup == null;
            this.bufferAckWatermark = builder.bufferAckWatermark;
            this.poolBuffers = builder.poolBuffers;
            this.socketConnectTimeout = builder.socketConnectTimeout;
            this.dcpChannelsReconnectDelay = builder.dcpChannelsReconnectDelay;
            this.dcpChannelsReconnectMaxAttempts = builder.dcpChannelsReconnectMaxAttempts;
            this.collectionsAware = builder.collectionsAware;
            this.collectionIds = Collections.unmodifiableSet(builder.collectionIds);
            this.collectionNames = Collections.unmodifiableSet(builder.collectionNames);
            this.scopeId = builder.scopeId;
            this.scopeName = builder.scopeName;
            if (builder.eventBus != null) {
                this.eventBus = builder.eventBus;
                this.scheduler = null;
                this.schedulerShutdownHook = new NoOpShutdownHook();
            } else {
                CoreScheduler coreScheduler = new CoreScheduler(3);
                this.scheduler = coreScheduler;
                this.schedulerShutdownHook = coreScheduler;
                this.eventBus = new DefaultDcpEventBus(coreScheduler);
            }
            this.sslEnabled = builder.sslEnabled;
            this.sslKeystoreFile = builder.sslKeystoreFile;
            this.sslKeystorePassword = builder.sslKeystorePassword;
            this.sslKeystore = builder.sslKeystore;
            this.seedNodes = makeDefaultPortsExplicit(builder.seedNodes, builder.sslEnabled);
            this.networkResolution = builder.networkResolution;
            this.persistencePollingIntervalMillis = builder.persistencePollingIntervalMillis;
            if (this.persistencePollingIntervalMillis > 0) {
                if (this.bufferAckWatermark == 0) {
                    throw new IllegalArgumentException("Rollback mitigation requires flow control.");
                }
                StreamEventBuffer streamEventBuffer = new StreamEventBuffer(this.eventBus);
                this.dataEventHandler = streamEventBuffer;
                this.controlEventHandler = streamEventBuffer;
            }
        }

        public List<HostAndPort> clusterAt() {
            return this.seedNodes;
        }

        public NetworkResolution networkResolution() {
            return this.networkResolution;
        }

        public DataEventHandler dataEventHandler() {
            return this.dataEventHandler;
        }

        public StreamEventBuffer streamEventBuffer() {
            try {
                return (StreamEventBuffer) this.dataEventHandler;
            } catch (ClassCastException e) {
                throw new IllegalStateException("Stream event buffer not configured");
            }
        }

        public PersistedSeqnos persistedSeqnos() {
            return this.persistedSeqnos;
        }

        public long persistencePollingIntervalMillis() {
            return this.persistencePollingIntervalMillis;
        }

        public boolean persistencePollingEnabled() {
            return this.persistencePollingIntervalMillis > 0;
        }

        public ControlEventHandler controlEventHandler() {
            return this.controlEventHandler;
        }

        public ConnectionNameGenerator connectionNameGenerator() {
            return this.connectionNameGenerator;
        }

        public String bucket() {
            return this.bucket;
        }

        public boolean collectionsAware() {
            return this.collectionsAware;
        }

        public Set<Long> collectionIds() {
            return this.collectionIds;
        }

        public Set<String> collectionNames() {
            return this.collectionNames;
        }

        public OptionalLong scopeId() {
            return this.scopeId;
        }

        public Optional<String> scopeName() {
            return this.scopeName;
        }

        public CredentialsProvider credentialsProvider() {
            return this.credentialsProvider;
        }

        @Deprecated
        public String username() {
            return this.credentialsProvider.get(null).getUsername();
        }

        @Deprecated
        public String password() {
            return this.credentialsProvider.get(null).getPassword();
        }

        public DcpControl dcpControl() {
            return this.dcpControl;
        }

        public Set<OpenConnectionFlag> connectionFlags() {
            return this.connectionFlags;
        }

        public int bufferAckWatermark() {
            return this.bufferAckWatermark;
        }

        public EventLoopGroup eventLoopGroup() {
            return this.eventLoopGroup;
        }

        public Duration configRefreshInterval() {
            return this.configRefreshInterval;
        }

        public Duration bootstrapTimeout() {
            return this.bootstrapTimeout;
        }

        public long connectTimeout() {
            return this.connectTimeout;
        }

        public void setDataEventHandler(DataEventHandler dataEventHandler) {
            if (persistencePollingEnabled()) {
                streamEventBuffer().setDataEventHandler(dataEventHandler);
            } else {
                this.dataEventHandler = dataEventHandler;
            }
        }

        public void setControlEventHandler(ControlEventHandler controlEventHandler) {
            if (persistencePollingEnabled()) {
                streamEventBuffer().setControlEventHandler(controlEventHandler);
            } else {
                this.controlEventHandler = controlEventHandler;
            }
        }

        public void setSystemEventHandler(final SystemEventHandler systemEventHandler) {
            if (this.systemEventSubscription != null) {
                this.systemEventSubscription.unsubscribe();
            }
            if (systemEventHandler != null) {
                this.systemEventSubscription = eventBus().get().filter(couchbaseEvent -> {
                    return Boolean.valueOf(couchbaseEvent.type().equals(EventType.SYSTEM));
                }).subscribe(new Subscriber<CouchbaseEvent>() { // from class: com.couchbase.client.dcp.Client.Environment.1
                    public void onCompleted() {
                    }

                    public void onError(Throwable th) {
                    }

                    public void onNext(CouchbaseEvent couchbaseEvent2) {
                        systemEventHandler.onEvent(couchbaseEvent2);
                    }
                });
            }
        }

        public boolean poolBuffers() {
            return this.poolBuffers;
        }

        public long socketConnectTimeout() {
            return this.socketConnectTimeout;
        }

        public EventBus eventBus() {
            return this.eventBus;
        }

        @Override // com.couchbase.client.dcp.config.SecureEnvironment
        public boolean sslEnabled() {
            return this.sslEnabled;
        }

        @Override // com.couchbase.client.dcp.config.SecureEnvironment
        public String sslKeystoreFile() {
            return this.sslKeystoreFile;
        }

        @Override // com.couchbase.client.dcp.config.SecureEnvironment
        public String sslKeystorePassword() {
            return this.sslKeystorePassword;
        }

        @Override // com.couchbase.client.dcp.config.SecureEnvironment
        public KeyStore sslKeystore() {
            return this.sslKeystore;
        }

        private static List<HostAndPort> makeDefaultPortsExplicit(List<HostAndPort> list, boolean z) {
            int i = z ? DEFAULT_KV_TLS_PORT : DEFAULT_KV_PORT;
            ArrayList arrayList = new ArrayList();
            for (HostAndPort hostAndPort : list) {
                if (hostAndPort.port() == 8091 || hostAndPort.port() == 18091) {
                    log.warn("Seed node '{}' uses port '{}' which is likely incorrect. This should be the port of the KV service, not the Manager service. If the connection fails, omit the port so the client can supply the correct default.", hostAndPort.host(), Integer.valueOf(hostAndPort.port()));
                }
                arrayList.add(hostAndPort.port() == 0 ? hostAndPort.withPort(i) : hostAndPort);
            }
            return arrayList;
        }

        public Completable shutdown() {
            Observable empty = Observable.empty();
            if (this.eventLoopGroupIsPrivate) {
                empty = Completable.create(completableSubscriber -> {
                    this.eventLoopGroup.shutdownGracefully(0L, 10L, TimeUnit.MILLISECONDS).addListener2(future -> {
                        if (future.isSuccess()) {
                            completableSubscriber.onCompleted();
                        } else {
                            completableSubscriber.onError(future.cause());
                        }
                    });
                }).toObservable();
            }
            return Observable.merge(this.schedulerShutdownHook.shutdown(), empty).reduce(true, (bool, bool2) -> {
                return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
            }).toCompletable();
        }

        public String toString() {
            return "ClientEnvironment{seedNodes=" + this.seedNodes + ", connectionNameGenerator=" + this.connectionNameGenerator + ", bucket='" + this.bucket + "', collectionsAware=" + this.collectionsAware + ", collectionIds=" + this.collectionIds + ", collectionNames=" + this.collectionNames + ", scopeId=" + this.scopeId + ", scopeName=" + this.scopeName + ", dcpControl=" + this.dcpControl + ", eventLoopGroup=" + this.eventLoopGroup.getClass().getSimpleName() + ", eventLoopGroupIsPrivate=" + this.eventLoopGroupIsPrivate + ", poolBuffers=" + this.poolBuffers + ", bufferAckWatermark=" + this.bufferAckWatermark + ", connectTimeout=" + this.connectTimeout + ", bootstrapTimeout=" + this.bootstrapTimeout + ", configRefreshInterval=" + this.configRefreshInterval + ", sslEnabled=" + this.sslEnabled + ", sslKeystoreFile='" + this.sslKeystoreFile + "', sslKeystorePassword=" + ((this.sslKeystorePassword == null || this.sslKeystorePassword.isEmpty()) ? false : true) + ", sslKeystore=" + this.sslKeystore + '}';
        }

        public Delay dcpChannelsReconnectDelay() {
            return this.dcpChannelsReconnectDelay;
        }

        public int dcpChannelsReconnectMaxAttempts() {
            return this.dcpChannelsReconnectMaxAttempts;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/couchbase/client/dcp/Client$PartitionAndSeqno.class */
    public static class PartitionAndSeqno {
        private final int partition;
        private final long seqno;

        public PartitionAndSeqno(int i, long j) {
            this.partition = i;
            this.seqno = j;
        }

        public int partition() {
            return this.partition;
        }

        public long seqno() {
            return this.seqno;
        }
    }

    private Client(Builder builder) {
        this.hasHighLevelListener = new AtomicBoolean();
        this.env = new Environment(builder);
        this.bufferAckEnabled = this.env.dcpControl().bufferAckEnabled();
        if (this.bufferAckEnabled && this.env.bufferAckWatermark() == 0) {
            throw new IllegalArgumentException("The bufferAckWatermark needs to be set if bufferAck is enabled.");
        }
        controlEventHandler((channelFlowController, byteBuf) -> {
            try {
                if (DcpSnapshotMarkerRequest.is(byteBuf)) {
                    channelFlowController.ack(byteBuf);
                }
            } finally {
                byteBuf.release();
            }
        });
        dataEventHandler((channelFlowController2, byteBuf2) -> {
            try {
                channelFlowController2.ack(byteBuf2);
            } finally {
                byteBuf2.release();
            }
        });
        this.conductor = new Conductor(this.env, new DcpClientMetrics(new MetricsContext("dcp")));
        LOGGER.info("Environment Configuration Used: {}", RedactableArgument.system(this.env));
    }

    private static EventLoopGroup newEventLoopGroup() {
        if (Epoll.isAvailable()) {
            LOGGER.info("Using Netty epoll native transport.");
            return new EpollEventLoopGroup(threadFactory);
        }
        if (KQueue.isAvailable()) {
            LOGGER.info("Using Netty kqueue native transport.");
            return new KQueueEventLoopGroup(threadFactory);
        }
        LOGGER.info("Using Netty NIO transport.");
        return new NioEventLoopGroup(threadFactory);
    }

    @Deprecated
    public static Builder configure() {
        return builder();
    }

    public static Builder builder() {
        return new Builder();
    }

    private Observable<PartitionAndSeqno> getSeqnos() {
        return this.conductor.getSeqnos().flatMap(new Func1<ByteBuf, Observable<PartitionAndSeqno>>() { // from class: com.couchbase.client.dcp.Client.1
            public Observable<PartitionAndSeqno> call(ByteBuf byteBuf) {
                int readableBytes = byteBuf.readableBytes() / 10;
                ArrayList arrayList = new ArrayList(readableBytes);
                for (int i = 0; i < readableBytes; i++) {
                    arrayList.add(new PartitionAndSeqno(byteBuf.getShort(10 * i), byteBuf.getLong((10 * i) + 2)));
                }
                byteBuf.release();
                return Observable.from(arrayList);
            }
        });
    }

    public SessionState sessionState() {
        return this.conductor.sessionState();
    }

    public void listener(DatabaseChangeListener databaseChangeListener, FlowControlMode flowControlMode) {
        if (!this.hasHighLevelListener.compareAndSet(false, true)) {
            throw new IllegalStateException("Listener may only be set once.");
        }
        if (flowControlMode == FlowControlMode.AUTOMATIC && !this.bufferAckEnabled) {
            throw new IllegalStateException("Can't register listener in automatic flow control mode because the DCP client was not configured for flow control. Make sure to call flowControl(bufferSizeInBytes) when building the DCP client.");
        }
        this.listenerDispatcher = new AsyncEventDispatcher(flowControlMode, databaseChangeListener);
        EventHandlerAdapter.register(this, this.listenerDispatcher);
    }

    public void nonBlockingListener(DatabaseChangeListener databaseChangeListener) {
        if (!this.hasHighLevelListener.compareAndSet(false, true)) {
            throw new IllegalStateException("Listener may only be set once.");
        }
        EventHandlerAdapter.register(this, new ImmediateEventDispatcher(databaseChangeListener));
    }

    public void controlEventHandler(final ControlEventHandler controlEventHandler) {
        final boolean z = controlEventHandler instanceof EventHandlerAdapter;
        this.env.setControlEventHandler(new ControlEventHandler() { // from class: com.couchbase.client.dcp.Client.2
            @Override // com.couchbase.client.dcp.ControlEventHandler
            public void onEvent(ChannelFlowController channelFlowController, ByteBuf byteBuf) {
                if (DcpSnapshotMarkerRequest.is(byteBuf)) {
                    Client.this.sessionState().get(DcpSnapshotMarkerRequest.partition(byteBuf)).setSnapshot(new SnapshotMarker(DcpSnapshotMarkerRequest.startSeqno(byteBuf), DcpSnapshotMarkerRequest.endSeqno(byteBuf)));
                } else if (DcpFailoverLogResponse.is(byteBuf)) {
                    Client.this.handleFailoverLogResponse(byteBuf);
                    if (!z) {
                        byteBuf.release();
                        return;
                    }
                } else if (RollbackMessage.is(byteBuf)) {
                    Client.LOGGER.warn("Received rollback for vbucket {} to seqno {}", Integer.valueOf(RollbackMessage.vbucket(byteBuf)), Long.valueOf(RollbackMessage.seqno(byteBuf)));
                } else if (DcpSeqnoAdvancedRequest.is(byteBuf)) {
                    handleSeqnoAdvanced(byteBuf);
                } else if (DcpSystemEventRequest.is(byteBuf)) {
                    handleDcpSystemEvent(byteBuf);
                }
                controlEventHandler.onEvent(channelFlowController, byteBuf);
            }

            private void handleSeqnoAdvanced(ByteBuf byteBuf) {
                int vbucket = MessageUtil.getVbucket(byteBuf);
                long seqno = DcpSeqnoAdvancedRequest.getSeqno(byteBuf);
                Client.LOGGER.debug("Seqno for vbucket {} advanced to {}", Integer.valueOf(vbucket), Long.valueOf(seqno));
                PartitionState partitionState = Client.this.sessionState().get(vbucket);
                partitionState.setStartSeqno(seqno);
                partitionState.setSnapshot(new SnapshotMarker(seqno, seqno));
            }

            private void handleDcpSystemEvent(ByteBuf byteBuf) {
                long seqno = DcpSystemEventRequest.getSeqno(byteBuf);
                PartitionState partitionState = Client.this.sessionState().get(MessageUtil.getVbucket(byteBuf));
                partitionState.setStartSeqno(seqno);
                Object parse = DcpSystemEvent.parse(byteBuf);
                if (!(parse instanceof DcpSystemEvent.CollectionsManifestEvent)) {
                    Client.LOGGER.warn("Ignoring unrecognized DCP system event!\n{}", RedactableArgument.meta(MessageUtil.humanize(byteBuf)));
                    return;
                }
                DcpSystemEvent.CollectionsManifestEvent collectionsManifestEvent = (DcpSystemEvent.CollectionsManifestEvent) parse;
                CollectionsManifest collectionsManifest = partitionState.getCollectionsManifest();
                partitionState.setCollectionsManifestUid(collectionsManifestEvent.getManifestId());
                if (MathUtils.lessThanUnsigned(collectionsManifestEvent.getManifestId(), collectionsManifest.getId())) {
                    Client.LOGGER.debug("Ignoring collection manifest event; UID {} is < current manifest UID {}", Long.valueOf(collectionsManifestEvent.getManifestId()), Long.valueOf(collectionsManifest.getId()));
                } else {
                    Client.LOGGER.debug("Applying collection manifest change; UID {} is >= current manifest UID {}", Long.valueOf(collectionsManifestEvent.getManifestId()), Long.valueOf(collectionsManifest.getId()));
                    partitionState.setCollectionsManifest(collectionsManifestEvent.apply(collectionsManifest));
                }
            }
        });
    }

    public void systemEventHandler(SystemEventHandler systemEventHandler) {
        this.env.setSystemEventHandler(systemEventHandler);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleFailoverLogResponse(ByteBuf byteBuf) {
        sessionState().get(DcpFailoverLogResponse.vbucket(byteBuf)).setFailoverLog(DcpFailoverLogResponse.entries(byteBuf));
    }

    public void dataEventHandler(DataEventHandler dataEventHandler) {
        this.env.setDataEventHandler((channelFlowController, byteBuf) -> {
            if (DcpMutationMessage.is(byteBuf) || DcpDeletionMessage.is(byteBuf) || DcpExpirationMessage.is(byteBuf)) {
                int vbucket = MessageUtil.getVbucket(byteBuf);
                sessionState().get(vbucket).setStartSeqno(DcpMutationMessage.bySeqno(byteBuf));
            }
            dataEventHandler.onEvent(channelFlowController, byteBuf);
        });
    }

    public Completable connect() {
        if (!this.conductor.disconnected()) {
            LOGGER.debug("Ignoring duplicate connect attempt, already connecting/connected.");
            return Completable.complete();
        }
        if (this.env.dataEventHandler() == null) {
            throw new IllegalArgumentException("A DataEventHandler needs to be provided!");
        }
        if (this.env.controlEventHandler() == null) {
            throw new IllegalArgumentException("A ControlEventHandler needs to be provided!");
        }
        LOGGER.info("Connecting to seed nodes and bootstrapping bucket {}.", RedactableArgument.meta(this.env.bucket()));
        return this.conductor.connect().onErrorResumeNext(new Func1<Throwable, Completable>() { // from class: com.couchbase.client.dcp.Client.3
            public Completable call(Throwable th) {
                return Client.this.disconnect().andThen(Completable.error(new BootstrapException("Could not connect to Cluster/Bucket", th)));
            }
        });
    }

    public Completable disconnect() {
        return dispatcherGracefulShutdown().andThen(this.conductor.stop()).andThen(this.env.shutdown()).andThen(dispatcherAwaitShutdown());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        disconnect().await(60L, TimeUnit.SECONDS);
    }

    private Completable dispatcherGracefulShutdown() {
        return Completable.fromAction(() -> {
            if (this.listenerDispatcher != null) {
                LOGGER.info("Asking event dispatcher to shut down.");
                this.listenerDispatcher.gracefulShutdown();
            }
        });
    }

    private Completable dispatcherAwaitShutdown() {
        return Completable.fromCallable(() -> {
            long nanoTime = System.nanoTime();
            if (this.listenerDispatcher == null || this.listenerDispatcher.awaitTermination(Duration.ofSeconds(30L))) {
                return null;
            }
            LOGGER.info("Forcing event dispatcher to shut down.");
            this.listenerDispatcher.shutdownNow();
            if (this.listenerDispatcher.awaitTermination(Duration.ofSeconds(10L))) {
                return null;
            }
            LOGGER.warn("Event dispatcher still hasn't terminated after {} seconds.", Long.valueOf(TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - nanoTime)));
            return null;
        }).subscribeOn(Schedulers.io());
    }

    public Completable resumeStreaming(Map<Integer, StreamOffset> map) {
        if (map.isEmpty()) {
            return Completable.complete();
        }
        map.forEach((num, streamOffset) -> {
            sessionState().set(num.intValue(), PartitionState.fromOffset(streamOffset));
        });
        return startStreaming(map.keySet());
    }

    private static List<Integer> toIntList(Short... shArr) {
        ArrayList arrayList = new ArrayList();
        for (Short sh : shArr) {
            arrayList.add(Integer.valueOf(sh.shortValue()));
        }
        return arrayList;
    }

    @Deprecated
    public Completable startStreaming(Short... shArr) {
        return startStreaming(toIntList(shArr));
    }

    public Completable startStreaming() {
        return startStreaming(Collections.emptyList());
    }

    public Completable startStreaming(Collection<Integer> collection) {
        int numPartitions = numPartitions();
        List<Integer> partitionsForVbids = partitionsForVbids(numPartitions, collection);
        List<Integer> selectInitializedPartitions = selectInitializedPartitions(numPartitions, partitionsForVbids);
        ArrayList arrayList = new ArrayList();
        Iterator<Integer> it = partitionsForVbids.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            if (!selectInitializedPartitions.contains(Integer.valueOf(intValue))) {
                arrayList.add(Integer.valueOf(intValue));
            }
        }
        if (!arrayList.isEmpty()) {
            LOGGER.info("Immediately sending stream end events for {} partitions already at desired end.", Integer.valueOf(arrayList.size()));
            LOGGER.debug("Immediately sending stream end events for partitions already at desired end: {}", arrayList);
            arrayList.forEach(num -> {
                this.env.eventBus().publish(new StreamEndEvent(num.intValue(), StreamEndReason.OK));
            });
        }
        if (selectInitializedPartitions.isEmpty()) {
            LOGGER.info("The configured session state does not require any streams to be opened. Completing immediately.");
            return Completable.complete();
        }
        LOGGER.info("Starting to Stream for " + selectInitializedPartitions.size() + " partitions");
        LOGGER.debug("Stream start against partitions: {}", selectInitializedPartitions);
        return Observable.from(selectInitializedPartitions).flatMapCompletable(num2 -> {
            PartitionState partitionState = sessionState().get(num2.intValue());
            return this.conductor.startStreamForPartition(num2.intValue(), partitionState.getOffset(), partitionState.getEndSeqno()).onErrorResumeNext(th -> {
                return th instanceof RollbackException ? Completable.complete() : Completable.error(th);
            });
        }).toCompletable();
    }

    private List<Integer> selectInitializedPartitions(int i, List<Integer> list) {
        ArrayList arrayList = new ArrayList();
        SessionState sessionState = sessionState();
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            PartitionState partitionState = sessionState.get(intValue);
            if (partitionState == null) {
                LOGGER.debug("Skipping partition {}, because its state is null", Integer.valueOf(intValue));
            } else if (MathUtils.lessThanUnsigned(partitionState.getStartSeqno(), partitionState.getEndSeqno())) {
                arrayList.add(Integer.valueOf(intValue));
            } else {
                LOGGER.debug("Skipping partition {}, because startSeqno({}) >= endSeqno({})", new Object[]{Integer.valueOf(intValue), Long.valueOf(partitionState.getStartSeqno()), Long.valueOf(partitionState.getEndSeqno())});
            }
        }
        if (arrayList.size() > i) {
            throw new IllegalStateException("Session State has " + arrayList + " partitions while the cluster has " + i + "!");
        }
        return arrayList;
    }

    @Deprecated
    public Completable stopStreaming(Short... shArr) {
        return stopStreaming(toIntList(shArr));
    }

    public Completable stopStreaming(Collection<Integer> collection) {
        List<Integer> partitionsForVbids = partitionsForVbids(numPartitions(), collection);
        LOGGER.info("Stopping to Stream for " + partitionsForVbids.size() + " partitions");
        LOGGER.debug("Stream stop against partitions: {}", partitionsForVbids);
        return Observable.from(partitionsForVbids).flatMapCompletable(new Func1<Integer, Completable>() { // from class: com.couchbase.client.dcp.Client.4
            public Completable call(Integer num) {
                return Client.this.conductor.stopStreamForPartition(num.intValue());
            }
        }).toCompletable();
    }

    private static List<Integer> partitionsForVbids(int i, Collection<Integer> collection) {
        if (!collection.isEmpty()) {
            ArrayList arrayList = new ArrayList(collection);
            Collections.sort(arrayList);
            return arrayList;
        }
        ArrayList arrayList2 = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList2.add(Integer.valueOf(i2));
        }
        return arrayList2;
    }

    @Deprecated
    public Observable<ByteBuf> failoverLogs(Short... shArr) {
        return failoverLogs(toIntList(shArr));
    }

    public Observable<ByteBuf> failoverLogs(Collection<Integer> collection) {
        List<Integer> partitionsForVbids = partitionsForVbids(numPartitions(), collection);
        LOGGER.debug("Asking for failover logs on partitions {}", partitionsForVbids);
        Observable from = Observable.from(partitionsForVbids);
        Conductor conductor = this.conductor;
        conductor.getClass();
        return from.flatMapSingle((v1) -> {
            return r1.getFailoverLog(v1);
        });
    }

    public Completable rollbackAndRestartStream(final int i, final long j) {
        return stopStreaming(Collections.singletonList(Integer.valueOf(i))).andThen(Completable.create(new Completable.OnSubscribe() { // from class: com.couchbase.client.dcp.Client.5
            public void call(CompletableSubscriber completableSubscriber) {
                Client.this.sessionState().rollbackToPosition(i, j);
                completableSubscriber.onCompleted();
            }
        })).andThen(startStreaming(Collections.singletonList(Integer.valueOf(i))));
    }

    public int numPartitions() {
        return this.conductor.numberOfPartitions();
    }

    public boolean streamIsOpen(int i) {
        return this.conductor.streamIsOpen(i);
    }

    public Completable initializeState(StreamFrom streamFrom, StreamTo streamTo) {
        if (streamFrom == StreamFrom.BEGINNING && streamTo == StreamTo.INFINITY) {
            buzzMe();
            return initFromBeginningToInfinity();
        }
        if (streamFrom == StreamFrom.BEGINNING && streamTo == StreamTo.NOW) {
            return initFromBeginningToNow();
        }
        if (streamFrom != StreamFrom.NOW || streamTo != StreamTo.INFINITY) {
            throw new IllegalStateException("Unsupported FROM/TO combination: " + streamFrom + " -> " + streamTo);
        }
        buzzMe();
        return initFromNowToInfinity();
    }

    public Completable recoverState(final StateFormat stateFormat, final byte[] bArr) {
        return Completable.create(new Completable.OnSubscribe() { // from class: com.couchbase.client.dcp.Client.6
            public void call(CompletableSubscriber completableSubscriber) {
                Client.LOGGER.info("Recovering state from format {}", stateFormat);
                Client.LOGGER.debug("PersistedState on recovery is: {}", new String(bArr, StandardCharsets.UTF_8));
                try {
                    if (stateFormat == StateFormat.JSON) {
                        Client.this.sessionState().setFromJson(bArr);
                        completableSubscriber.onCompleted();
                    } else {
                        completableSubscriber.onError(new IllegalStateException("Unsupported StateFormat " + stateFormat));
                    }
                } catch (Exception e) {
                    completableSubscriber.onError(e);
                }
            }
        });
    }

    public Completable recoverOrInitializeState(StateFormat stateFormat, byte[] bArr, StreamFrom streamFrom, StreamTo streamTo) {
        return (bArr == null || bArr.length == 0) ? initializeState(streamFrom, streamTo) : recoverState(stateFormat, bArr);
    }

    private Completable initFromBeginningToInfinity() {
        return Completable.create(new Completable.OnSubscribe() { // from class: com.couchbase.client.dcp.Client.7
            public void call(CompletableSubscriber completableSubscriber) {
                Client.LOGGER.info("Initializing state from beginning to no end.");
                try {
                    Client.this.sessionState().setToBeginningWithNoEnd(Client.this.numPartitions());
                    completableSubscriber.onCompleted();
                } catch (Exception e) {
                    Client.LOGGER.warn("Failed to initialize state from beginning to no end.", e);
                    completableSubscriber.onError(e);
                }
            }
        });
    }

    private Completable initFromNowToInfinity() {
        return initWithCallback(new Action1<PartitionAndSeqno>() { // from class: com.couchbase.client.dcp.Client.8
            public void call(PartitionAndSeqno partitionAndSeqno) {
                int partition = partitionAndSeqno.partition();
                long seqno = partitionAndSeqno.seqno();
                PartitionState partitionState = Client.this.sessionState().get(partition);
                partitionState.setStartSeqno(seqno);
                partitionState.setSnapshot(new SnapshotMarker(seqno, seqno));
                Client.this.sessionState().set(partition, partitionState);
            }
        });
    }

    private Completable initFromBeginningToNow() {
        return initWithCallback(new Action1<PartitionAndSeqno>() { // from class: com.couchbase.client.dcp.Client.9
            public void call(PartitionAndSeqno partitionAndSeqno) {
                int partition = partitionAndSeqno.partition();
                long seqno = partitionAndSeqno.seqno();
                PartitionState partitionState = Client.this.sessionState().get(partition);
                partitionState.setEndSeqno(seqno);
                Client.this.sessionState().set(partition, partitionState);
            }
        });
    }

    private Completable initWithCallback(Action1<PartitionAndSeqno> action1) {
        sessionState().setToBeginningWithNoEnd(numPartitions());
        return getSeqnos().doOnNext(action1).reduce(new ArrayList(), new Func2<List<Integer>, PartitionAndSeqno, List<Integer>>() { // from class: com.couchbase.client.dcp.Client.10
            public List<Integer> call(List<Integer> list, PartitionAndSeqno partitionAndSeqno) {
                list.add(Integer.valueOf(partitionAndSeqno.partition()));
                return list;
            }
        }).flatMap((v1) -> {
            return failoverLogs(v1);
        }).map(byteBuf -> {
            int vbucket = DcpFailoverLogResponse.vbucket(byteBuf);
            handleFailoverLogResponse(byteBuf);
            byteBuf.release();
            return Integer.valueOf(vbucket);
        }).last().toCompletable();
    }

    private static void buzzMe() {
        LOGGER.debug("To Infinity... AND BEYOND!");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static /* synthetic */ EventLoopGroup access$3200() {
        return newEventLoopGroup();
    }
}
