package com.couchbase.client.dcp.conductor;

import com.couchbase.client.core.deps.io.netty.bootstrap.Bootstrap;
import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.core.deps.io.netty.buffer.PooledByteBufAllocator;
import com.couchbase.client.core.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.core.deps.io.netty.buffer.UnpooledByteBufAllocator;
import com.couchbase.client.core.deps.io.netty.channel.Channel;
import com.couchbase.client.core.deps.io.netty.channel.ChannelFuture;
import com.couchbase.client.core.deps.io.netty.channel.ChannelFutureListener;
import com.couchbase.client.core.deps.io.netty.channel.ChannelOption;
import com.couchbase.client.core.deps.io.netty.util.AttributeKey;
import com.couchbase.client.core.deps.io.netty.util.ReferenceCountUtil;
import com.couchbase.client.core.deps.io.netty.util.concurrent.Future;
import com.couchbase.client.core.deps.io.netty.util.concurrent.GenericFutureListener;
import com.couchbase.client.core.deps.io.netty.util.concurrent.ImmediateEventExecutor;
import com.couchbase.client.core.util.HostAndPort;
import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.buffer.DcpOps;
import com.couchbase.client.dcp.core.logging.RedactableArgument;
import com.couchbase.client.dcp.core.state.AbstractStateMachine;
import com.couchbase.client.dcp.core.state.LifecycleState;
import com.couchbase.client.dcp.core.state.NotConnectedException;
import com.couchbase.client.dcp.core.time.Delay;
import com.couchbase.client.dcp.core.utils.DefaultObjectMapper;
import com.couchbase.client.dcp.error.RollbackException;
import com.couchbase.client.dcp.highlevel.StreamOffset;
import com.couchbase.client.dcp.highlevel.internal.CollectionsManifest;
import com.couchbase.client.dcp.message.DcpCloseStreamRequest;
import com.couchbase.client.dcp.message.DcpFailoverLogRequest;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.DcpGetPartitionSeqnosRequest;
import com.couchbase.client.dcp.message.DcpGetPartitionSeqnosResponse;
import com.couchbase.client.dcp.message.DcpOpenStreamRequest;
import com.couchbase.client.dcp.message.DcpOpenStreamResponse;
import com.couchbase.client.dcp.message.HelloFeature;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.message.PartitionAndSeqno;
import com.couchbase.client.dcp.message.ResponseStatus;
import com.couchbase.client.dcp.message.RollbackMessage;
import com.couchbase.client.dcp.message.StreamFlag;
import com.couchbase.client.dcp.message.VbucketState;
import com.couchbase.client.dcp.metrics.DcpChannelMetrics;
import com.couchbase.client.dcp.metrics.DcpClientMetrics;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.dcp.transport.netty.ChannelUtils;
import com.couchbase.client.dcp.transport.netty.DcpMessageHandler;
import com.couchbase.client.dcp.transport.netty.DcpPipeline;
import com.couchbase.client.dcp.transport.netty.DcpResponse;
import com.couchbase.client.dcp.transport.netty.DcpResponseListener;
import com.couchbase.client.dcp.util.AdaptiveDelay;
import com.couchbase.client.dcp.util.AtomicBooleanArray;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.util.retry.Retry;

/* loaded from: input_file:com/couchbase/client/dcp/conductor/DcpChannel.class */
public class DcpChannel extends AbstractStateMachine<LifecycleState> {
    private final DcpChannelControlHandler controlHandler;
    private volatile boolean isShutdown;
    private volatile Channel channel;
    private volatile ChannelFuture connectFuture;
    private final DcpChannelMetrics metrics;
    private final DcpClientMetrics clientMetrics;
    private final AdaptiveDelay reconnectDelay;
    final Client.Environment env;
    final HostAndPort address;
    final AtomicBooleanArray streamIsOpen;
    final Conductor conductor;
    private static final Logger LOGGER = LoggerFactory.getLogger(DcpChannel.class);
    private static final AttributeKey<HostAndPort> HOST_AND_PORT = AttributeKey.valueOf("hostAndPort");

    public DcpChannel(HostAndPort hostAndPort, Client.Environment environment, Conductor conductor, DcpClientMetrics dcpClientMetrics) {
        super(LifecycleState.DISCONNECTED);
        this.reconnectDelay = new AdaptiveDelay(Delay.exponential(TimeUnit.MILLISECONDS, 4096L, 32L), Duration.ofSeconds(10L));
        this.streamIsOpen = new AtomicBooleanArray(1024);
        this.address = hostAndPort;
        this.env = environment;
        this.conductor = conductor;
        this.controlHandler = new DcpChannelControlHandler(this);
        this.isShutdown = false;
        this.clientMetrics = (DcpClientMetrics) Objects.requireNonNull(dcpClientMetrics);
        this.metrics = dcpClientMetrics.channelMetrics(hostAndPort);
    }

    public static HostAndPort getHostAndPort(Channel channel) {
        return (HostAndPort) channel.attr(HOST_AND_PORT).get();
    }

    public Future<DcpResponse> sendRequest(ByteBuf byteBuf) {
        if (this.channel != null) {
            return this.channel.pipeline().get(DcpMessageHandler.class).sendRequest(byteBuf);
        }
        ReferenceCountUtil.safeRelease(byteBuf);
        return ImmediateEventExecutor.INSTANCE.newFailedFuture(new NotConnectedException("Failed to issue request; channel is not active."));
    }

    private Bootstrap newBootstrap() {
        return new Bootstrap().option(ChannelOption.ALLOCATOR, this.env.poolBuffers() ? PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf((int) this.env.socketConnectTimeout())).remoteAddress(this.address.host(), this.address.port()).attr(HOST_AND_PORT, this.address).channel(ChannelUtils.channelForEventLoopGroup(this.env.eventLoopGroup())).handler(new DcpPipeline(this.env, this.controlHandler, this.conductor.bucketConfigArbiter(), this.metrics, this.clientMetrics)).group(this.env.eventLoopGroup());
    }

    public Mono<Void> connect() {
        return Mono.create(monoSink -> {
            if (this.isShutdown || state() != LifecycleState.DISCONNECTED) {
                monoSink.success();
                return;
            }
            Bootstrap newBootstrap = newBootstrap();
            transitionState(LifecycleState.CONNECTING);
            this.connectFuture = this.metrics.trackConnect(newBootstrap.connect());
            this.connectFuture.addListener(new GenericFutureListener<ChannelFuture>() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.1
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (!channelFuture.isSuccess()) {
                        DcpChannel.LOGGER.info("Connect attempt to {} failed.", RedactableArgument.system(DcpChannel.this.address), channelFuture.cause());
                        DcpChannel.this.transitionState(LifecycleState.DISCONNECTED);
                        monoSink.error(channelFuture.cause());
                        return;
                    }
                    DcpChannel.this.channel = channelFuture.channel();
                    DcpChannel.this.metrics.trackDisconnect(DcpChannel.this.channel.closeFuture());
                    if (!DcpChannel.this.isShutdown) {
                        DcpChannel.this.transitionState(LifecycleState.CONNECTED);
                        DcpChannel.LOGGER.info("Connected to Node {}", RedactableArgument.system(DcpChannel.this.channel.remoteAddress()));
                        DcpChannel.this.channel.closeFuture().addListener(new GenericFutureListener<ChannelFuture>() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.1.1
                            public void operationComplete(ChannelFuture channelFuture2) throws Exception {
                                DcpChannel.LOGGER.debug("Got notified of channel close on Node {}", DcpChannel.this.address);
                                if (DcpChannel.this.env.persistencePollingEnabled()) {
                                    for (int i = 0; i < DcpChannel.this.streamIsOpen.length(); i++) {
                                        if (DcpChannel.this.streamIsOpen.get(i)) {
                                            DcpChannel.this.env.streamEventBuffer().clear(i);
                                        }
                                    }
                                }
                                DcpChannel.this.transitionState(LifecycleState.DISCONNECTED);
                                if (!DcpChannel.this.isShutdown) {
                                    DcpChannel.this.dispatchReconnect();
                                }
                                DcpChannel.this.channel = null;
                            }
                        });
                        monoSink.success();
                        return;
                    }
                    DcpChannel.LOGGER.info("Connected Node {}, but got instructed to disconnect in the meantime.", RedactableArgument.system(DcpChannel.this.address));
                    Mono<Void> disconnect = DcpChannel.this.disconnect();
                    MonoSink monoSink = monoSink;
                    Mono doOnSuccess = disconnect.doOnSuccess(r3 -> {
                        monoSink.success();
                    });
                    MonoSink monoSink2 = monoSink;
                    doOnSuccess.onErrorResume(th -> {
                        DcpChannel.LOGGER.warn("Got error during disconnect.", th);
                        monoSink2.error(th);
                        return Mono.empty();
                    }).subscribe();
                }
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void dispatchReconnect() {
        if (this.isShutdown) {
            LOGGER.debug("Ignoring reconnect on {} because already shutdown.", this.address);
            return;
        }
        LOGGER.info("Node {} socket closed, initiating reconnect.", RedactableArgument.system(this.address));
        Duration calculate = this.reconnectDelay.calculate();
        if (calculate.compareTo(Duration.ZERO) > 0) {
            LOGGER.info("Delaying reconnection attempt by {}ms", Long.valueOf(calculate.toMillis()));
        }
        Mono.delay(calculate).then(connect().retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(32L)).maxBackoff(Duration.ofSeconds(4L)).doAfterRetry(retrySignal -> {
            LOGGER.debug("Rescheduling Node reconnect for DCP channel {}", this.address);
        }))).doOnError(th -> {
            LOGGER.warn("Got error during connect (maybe retried) for node {}", RedactableArgument.system(this.address), th);
        }).doOnSuccess(r5 -> {
            LOGGER.debug("Completed Node connect for DCP channel {}", this.address);
            for (int i = 0; i < this.streamIsOpen.length(); i++) {
                if (this.streamIsOpen.get(i)) {
                    this.conductor.maybeMovePartition(i);
                }
            }
        }).subscribe();
    }

    public boolean isShutdown() {
        return this.isShutdown;
    }

    public Mono<Void> disconnect() {
        return Mono.create(monoSink -> {
            this.isShutdown = true;
            if (this.channel != null) {
                transitionState(LifecycleState.DISCONNECTING);
                this.metrics.trackDisconnect(this.channel.close()).addListener(new GenericFutureListener<ChannelFuture>() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.2
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        DcpChannel.this.transitionState(LifecycleState.DISCONNECTED);
                        DcpChannel.LOGGER.info("Disconnected from Node {}", RedactableArgument.system(DcpChannel.this.address()));
                        if (channelFuture.isSuccess()) {
                            monoSink.success();
                        } else {
                            DcpChannel.LOGGER.debug("Error during channel close.", channelFuture.cause());
                            monoSink.error(channelFuture.cause());
                        }
                    }
                });
            } else if (this.connectFuture != null) {
                this.connectFuture.addListener(new ChannelFutureListener() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.3
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if (channelFuture.isSuccess()) {
                            channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.3.1
                                public void operationComplete(ChannelFuture channelFuture2) throws Exception {
                                    if (channelFuture2.isSuccess()) {
                                        monoSink.success();
                                    } else {
                                        monoSink.error(channelFuture2.cause());
                                    }
                                }
                            });
                        } else {
                            monoSink.success();
                        }
                    }
                });
            } else {
                monoSink.success();
            }
        });
    }

    public HostAndPort address() {
        return this.address;
    }

    public Mono<Optional<CollectionsManifest>> getCollectionsManifest() {
        return Mono.create(monoSink -> {
            if (state() != LifecycleState.CONNECTED) {
                monoSink.error(new NotConnectedException());
            } else {
                if (!HelloFeature.COLLECTIONS.isEnabled(this.channel)) {
                    monoSink.success(Optional.empty());
                    return;
                }
                ByteBuf buffer = Unpooled.buffer();
                MessageUtil.initRequest((byte) -70, buffer);
                sendRequest(buffer).addListener(new DcpResponseListener() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.4
                    public void operationComplete(Future<DcpResponse> future) throws Exception {
                        if (!future.isSuccess()) {
                            if (future.cause() instanceof NotConnectedException) {
                                DcpChannel.LOGGER.debug("Failed to get collections manifest from {}; {}", DcpChannel.this.address, future.cause().toString());
                            } else {
                                DcpChannel.LOGGER.warn("Failed to get collections manifest from {}; {}", DcpChannel.this.address, future.cause().toString());
                            }
                            monoSink.error(future.cause());
                            return;
                        }
                        DcpResponse dcpResponse = (DcpResponse) future.getNow();
                        ByteBuf buffer2 = dcpResponse.buffer();
                        try {
                            ResponseStatus status = dcpResponse.status();
                            if (!status.isSuccess()) {
                                DcpChannel.LOGGER.warn("Failed to get collections manifest from {}, response status: {}", DcpChannel.this.address, status);
                                monoSink.error(new DcpOps.BadResponseStatusException(status));
                                buffer2.release();
                            } else {
                                byte[] contentAsByteArray = MessageUtil.getContentAsByteArray(buffer2);
                                DcpChannel.LOGGER.debug("Got collections manifest from {} ; {}", DcpChannel.this.address, new String(contentAsByteArray, StandardCharsets.UTF_8));
                                try {
                                    monoSink.success(Optional.of(CollectionsManifest.fromJson(contentAsByteArray)));
                                } catch (Exception e) {
                                    DcpChannel.LOGGER.error("Unparsable collections manifest from {} ; {}", new Object[]{RedactableArgument.system(DcpChannel.this.address), RedactableArgument.user(new String(contentAsByteArray, StandardCharsets.UTF_8)), e});
                                    monoSink.error(new RuntimeException("Failed to parse collections manifest", e));
                                }
                            }
                        } finally {
                            buffer2.release();
                        }
                    }
                });
            }
        });
    }

    public Mono<Void> openStream(int i, StreamOffset streamOffset, long j, CollectionsManifest collectionsManifest, Set<StreamFlag> set) {
        return Mono.create(monoSink -> {
            long j2;
            long j3;
            OptionalLong scopeId;
            if (state() != LifecycleState.CONNECTED) {
                monoSink.error(new NotConnectedException());
                return;
            }
            long seqno = streamOffset.getSeqno();
            long startSeqno = streamOffset.getSnapshot().getStartSeqno();
            long endSeqno = streamOffset.getSnapshot().getEndSeqno();
            long vbuuid = streamOffset.getVbuuid();
            long collectionsManifestUid = streamOffset.getCollectionsManifestUid();
            if (startSeqno == seqno + 1) {
                LOGGER.debug("Disregarding snapshot marker from the future.");
                j3 = seqno;
                j2 = seqno;
            } else {
                j2 = endSeqno;
                j3 = startSeqno;
            }
            Set set2 = (Set) set.stream().filter(streamFlag -> {
                return this.conductor.hasCapabilities(streamFlag.requiredCapabilities());
            }).collect(Collectors.toSet());
            HashSet hashSet = new HashSet(set);
            hashSet.removeAll(set2);
            if (!hashSet.isEmpty()) {
                LOGGER.debug("Ignoring unsupported optional stream flags: {}", hashSet);
            }
            LOGGER.debug("Opening Stream against {} with vbid: {}, vbuuid: {}, startSeqno: {}, endSeqno: {},  snapshotStartSeqno: {}, snapshotEndSeqno: {}, manifest: {}, flags: {}", new Object[]{this.address, Integer.valueOf(i), Long.valueOf(vbuuid), Long.valueOf(seqno), Long.valueOf(j), Long.valueOf(j3), Long.valueOf(j2), collectionsManifest, set2});
            ByteBuf buffer = Unpooled.buffer();
            DcpOpenStreamRequest.init(buffer, set2, i);
            DcpOpenStreamRequest.vbuuid(buffer, vbuuid);
            DcpOpenStreamRequest.startSeqno(buffer, seqno);
            DcpOpenStreamRequest.endSeqno(buffer, j);
            DcpOpenStreamRequest.snapshotStartSeqno(buffer, j3);
            DcpOpenStreamRequest.snapshotEndSeqno(buffer, j2);
            HashMap hashMap = new HashMap();
            if (HelloFeature.COLLECTIONS.isEnabled(this.channel)) {
                HashSet hashSet2 = new HashSet(this.env.collectionIds());
                this.env.collectionNames().forEach(str -> {
                    CollectionsManifest.CollectionInfo collection = collectionsManifest.getCollection(str);
                    if (collection == null) {
                        monoSink.error(new RuntimeException("Can't stream from collection '" + str + "' because it does not exist (not present in the collections manifest)."));
                    } else {
                        LOGGER.debug("resolved collection name '{}' to UID {}", str, Long.valueOf(collection.id()));
                        hashSet2.add(Long.valueOf(collection.id()));
                    }
                });
                if (this.env.scopeName().isPresent()) {
                    String str2 = this.env.scopeName().get();
                    CollectionsManifest.ScopeInfo scope = collectionsManifest.getScope(str2);
                    if (scope == null) {
                        monoSink.error(new RuntimeException("Can't stream from scope '" + str2 + "' because it does not exist (not present in the collections manifest)."));
                        return;
                    } else {
                        LOGGER.debug("resolved scope name '{}' to UID {}", str2, Long.valueOf(scope.id()));
                        scopeId = OptionalLong.of(scope.id());
                    }
                } else {
                    scopeId = this.env.scopeId();
                }
                hashMap.put("uid", formatUid(collectionsManifestUid));
                if (!hashSet2.isEmpty()) {
                    hashMap.put("collections", formatUids(hashSet2));
                } else if (scopeId.isPresent()) {
                    hashMap.put("scope", formatUid(scopeId.getAsLong()));
                }
                try {
                    byte[] writeValueAsBytes = DefaultObjectMapper.writeValueAsBytes(hashMap);
                    LOGGER.debug("opening stream for partition {} with value: {}", Integer.valueOf(i), new String(writeValueAsBytes, StandardCharsets.UTF_8));
                    MessageUtil.setContent(writeValueAsBytes, buffer);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            this.env.tracer().onStreamStart(this.address, i, vbuuid, seqno, j, j3, j2, hashMap);
            sendRequest(buffer).addListener(new DcpResponseListener() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.5
                public void operationComplete(Future<DcpResponse> future) throws Exception {
                    if (!future.isSuccess()) {
                        DcpChannel.this.env.tracer().onStreamStartFailed(DcpChannel.this.address, i, future.cause().toString());
                        DcpChannel.LOGGER.debug("Failed open Stream against {} with vbid: {}", DcpChannel.this.address, Integer.valueOf(i));
                        DcpChannel.this.streamIsOpen.set(i, false);
                        monoSink.error(future.cause());
                        return;
                    }
                    DcpResponse dcpResponse = (DcpResponse) future.getNow();
                    ByteBuf buffer2 = dcpResponse.buffer();
                    try {
                        ResponseStatus status = dcpResponse.status();
                        if (!status.isSuccess()) {
                            DcpChannel.this.env.tracer().onStreamStartFailed(DcpChannel.this.address, i, status.formatted());
                        }
                        if (status == ResponseStatus.KEY_EXISTS) {
                            DcpChannel.LOGGER.debug("Stream already open against {} with vbid: {}", DcpChannel.this.address, Integer.valueOf(i));
                            monoSink.success();
                            buffer2.release();
                            return;
                        }
                        if (!status.isSuccess()) {
                            DcpChannel.LOGGER.debug("Failed open Stream against {} with vbid: {}", DcpChannel.this.address, Integer.valueOf(i));
                            DcpChannel.this.streamIsOpen.set(i, false);
                        }
                        if (status.isSuccess()) {
                            DcpChannel.LOGGER.debug("Opened Stream against {} with vbid: {}", DcpChannel.this.address, Integer.valueOf(i));
                            DcpChannel.this.streamIsOpen.set(i, true);
                            monoSink.success();
                            ByteBuf buffer3 = Unpooled.buffer();
                            DcpFailoverLogResponse.init(buffer3);
                            DcpFailoverLogResponse.vbucket(buffer3, DcpOpenStreamResponse.vbucket(buffer2));
                            ByteBuf writeShort = MessageUtil.getContent(buffer2).copy().writeShort(i);
                            MessageUtil.setContent(writeShort, buffer3);
                            writeShort.release();
                            DcpChannel.this.env.controlEventHandler().onEvent(ChannelFlowController.dummy, buffer3);
                        } else if (status == ResponseStatus.ROLLBACK_REQUIRED) {
                            monoSink.error(new RollbackException());
                            ByteBuf buffer4 = Unpooled.buffer();
                            RollbackMessage.init(buffer4, i, DcpOpenStreamResponse.rollbackSeqno(buffer2));
                            DcpChannel.this.env.controlEventHandler().onEvent(ChannelFlowController.dummy, buffer4);
                        } else if (status == ResponseStatus.NOT_MY_VBUCKET) {
                            monoSink.error(new NotMyVbucketException());
                        } else {
                            monoSink.error(new DcpOps.BadResponseStatusException(status));
                        }
                    } finally {
                        buffer2.release();
                    }
                }
            });
        });
    }

    private static String formatUid(long j) {
        return Long.toHexString(j);
    }

    private static List<String> formatUids(Collection<Long> collection) {
        return (List) collection.stream().map((v0) -> {
            return formatUid(v0);
        }).collect(Collectors.toList());
    }

    public Mono<Void> closeStream(int i) {
        return Mono.create(monoSink -> {
            if (state() != LifecycleState.CONNECTED) {
                monoSink.error(new NotConnectedException());
                return;
            }
            LOGGER.debug("Closing Stream against {} with vbid: {}", this.address, Integer.valueOf(i));
            ByteBuf buffer = Unpooled.buffer();
            DcpCloseStreamRequest.init(buffer);
            DcpCloseStreamRequest.vbucket(buffer, i);
            sendRequest(buffer).addListener(new DcpResponseListener() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.6
                public void operationComplete(Future<DcpResponse> future) throws Exception {
                    DcpChannel.this.streamIsOpen.set(i, false);
                    if (!future.isSuccess()) {
                        DcpChannel.LOGGER.debug("Failed close Stream against {} with vbid: {}", DcpChannel.this.address, Integer.valueOf(i));
                        monoSink.error(future.cause());
                    } else {
                        ((DcpResponse) future.getNow()).buffer().release();
                        DcpChannel.LOGGER.debug("Closed Stream against {} with vbid: {}", DcpChannel.this.address, Integer.valueOf(i));
                        monoSink.success();
                    }
                }
            });
        });
    }

    public Mono<List<PartitionAndSeqno>> getSeqnos() {
        return Mono.create(monoSink -> {
            if (state() != LifecycleState.CONNECTED) {
                monoSink.error(new NotConnectedException());
                return;
            }
            ByteBuf buffer = Unpooled.buffer();
            DcpGetPartitionSeqnosRequest.init(buffer);
            DcpGetPartitionSeqnosRequest.vbucketState(buffer, VbucketState.ACTIVE);
            sendRequest(buffer).addListener(new DcpResponseListener() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.7
                public void operationComplete(Future<DcpResponse> future) throws Exception {
                    if (!future.isSuccess()) {
                        monoSink.error(future.cause());
                        return;
                    }
                    DcpResponse dcpResponse = (DcpResponse) future.getNow();
                    ByteBuf buffer2 = dcpResponse.buffer();
                    try {
                        if (dcpResponse.status().isSuccess()) {
                            monoSink.success(DcpGetPartitionSeqnosResponse.parse(buffer2));
                            buffer2.release();
                        } else {
                            monoSink.error(new DcpOps.BadResponseStatusException(dcpResponse.status(), "Failed to get seqnos. Details: " + MessageUtil.getContentAsString(dcpResponse.buffer())));
                            buffer2.release();
                        }
                    } catch (Throwable th) {
                        buffer2.release();
                        throw th;
                    }
                }
            });
        });
    }

    public Mono<ByteBuf> getFailoverLog(int i) {
        return Mono.create(monoSink -> {
            if (state() != LifecycleState.CONNECTED) {
                monoSink.error(new NotConnectedException());
                return;
            }
            ByteBuf buffer = Unpooled.buffer();
            DcpFailoverLogRequest.init(buffer);
            DcpFailoverLogRequest.vbucket(buffer, i);
            LOGGER.debug("Asked for failover log on {} for vbid: {}", this.address, Integer.valueOf(i));
            sendRequest(buffer).addListener(new DcpResponseListener() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.8
                public void operationComplete(Future<DcpResponse> future) throws Exception {
                    if (!future.isSuccess()) {
                        DcpChannel.LOGGER.debug("Failed to ask for failover log on {} for vbid: {}", DcpChannel.this.address, Integer.valueOf(i));
                        monoSink.error(future.cause());
                        return;
                    }
                    ByteBuf buffer2 = ((DcpResponse) future.getNow()).buffer();
                    try {
                        ByteBuf buffer3 = Unpooled.buffer();
                        DcpFailoverLogResponse.init(buffer3);
                        DcpFailoverLogResponse.vbucket(buffer3, DcpFailoverLogResponse.vbucket(buffer2));
                        ByteBuf writeShort = MessageUtil.getContent(buffer2).copy().writeShort(i);
                        MessageUtil.setContent(writeShort, buffer3);
                        writeShort.release();
                        DcpChannel.LOGGER.debug("Failover log for vbid {} is {}", Integer.valueOf(i), DcpFailoverLogResponse.toString(buffer3));
                        monoSink.success(buffer3);
                        buffer2.release();
                    } catch (Throwable th) {
                        buffer2.release();
                        throw th;
                    }
                }
            });
        });
    }

    public boolean streamIsOpen(int i) {
        return this.streamIsOpen.get(i);
    }

    public boolean equals(Object obj) {
        if (obj instanceof HostAndPort) {
            return this.address.equals(obj);
        }
        if (obj instanceof DcpChannel) {
            return this.address.equals(((DcpChannel) obj).address);
        }
        return false;
    }

    public int hashCode() {
        return this.address.hashCode();
    }

    public String toString() {
        return "DcpChannel{address=" + this.address + '}';
    }
}
