package com.couchbase.client.dcp.conductor;

import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.state.AbstractStateMachine;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.core.state.NotConnectedException;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.config.DcpControl;
import com.couchbase.client.dcp.message.DcpBufferAckRequest;
import com.couchbase.client.dcp.message.DcpCloseStreamRequest;
import com.couchbase.client.dcp.message.DcpCloseStreamResponse;
import com.couchbase.client.dcp.message.DcpFailoverLogRequest;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.DcpGetPartitionSeqnosRequest;
import com.couchbase.client.dcp.message.DcpGetPartitionSeqnosResponse;
import com.couchbase.client.dcp.message.DcpOpenStreamRequest;
import com.couchbase.client.dcp.message.DcpOpenStreamResponse;
import com.couchbase.client.dcp.message.DcpStreamEndMessage;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.message.RollbackMessage;
import com.couchbase.client.dcp.state.SessionState;
import com.couchbase.client.dcp.transport.netty.ChannelUtils;
import com.couchbase.client.dcp.transport.netty.DcpPipeline;
import com.couchbase.client.deps.io.netty.bootstrap.Bootstrap;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.deps.io.netty.channel.Channel;
import com.couchbase.client.deps.io.netty.channel.ChannelFuture;
import com.couchbase.client.deps.io.netty.channel.ChannelPromise;
import com.couchbase.client.deps.io.netty.util.concurrent.DefaultPromise;
import com.couchbase.client.deps.io.netty.util.concurrent.Future;
import com.couchbase.client.deps.io.netty.util.concurrent.GenericFutureListener;
import com.couchbase.client.deps.io.netty.util.concurrent.Promise;
import java.net.InetAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import rx.Completable;
import rx.Single;
import rx.SingleSubscriber;
import rx.Subscriber;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;

/* loaded from: input_file:com/couchbase/client/dcp/conductor/DcpChannel.class */
public class DcpChannel extends AbstractStateMachine<LifecycleState> {
    private static final AtomicInteger OPAQUE = new AtomicInteger(0);
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(DcpChannel.class);
    private final ClientEnvironment env;
    private final InetAddress inetAddress;
    private final Subject<ByteBuf, ByteBuf> controlSubject;
    private final Map<Integer, Promise<?>> outstandingPromises;
    private final Map<Integer, Short> outstandingVbucketInfos;
    private volatile Channel channel;
    private final AtomicIntegerArray openStreams;
    private final boolean needsBufferAck;
    private final int bufferAckWatermark;
    private volatile int bufferAckCounter;

    public DcpChannel(InetAddress inetAddress, final ClientEnvironment clientEnvironment) {
        super(LifecycleState.DISCONNECTED);
        this.inetAddress = inetAddress;
        this.env = clientEnvironment;
        this.outstandingPromises = new ConcurrentHashMap();
        this.outstandingVbucketInfos = new ConcurrentHashMap();
        this.controlSubject = PublishSubject.create().toSerialized();
        this.openStreams = new AtomicIntegerArray(SessionState.NUM_PARTITIONS);
        this.needsBufferAck = clientEnvironment.dcpControl().bufferAckEnabled();
        this.bufferAckCounter = 0;
        if (this.needsBufferAck) {
            this.bufferAckWatermark = (int) Math.round((Integer.parseInt(clientEnvironment.dcpControl().get(DcpControl.Names.CONNECTION_BUFFER_SIZE)) / 100.0d) * clientEnvironment.bufferAckWatermark());
            LOGGER.debug("BufferAckWatermark absolute is {}", Integer.valueOf(this.bufferAckWatermark));
        } else {
            this.bufferAckWatermark = 0;
        }
        this.controlSubject.filter(new Func1<ByteBuf, Boolean>() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.2
            public Boolean call(ByteBuf byteBuf) {
                if (DcpOpenStreamResponse.is(byteBuf)) {
                    return Boolean.valueOf(DcpChannel.this.filterOpenStreamResponse(byteBuf));
                }
                if (DcpFailoverLogResponse.is(byteBuf)) {
                    return Boolean.valueOf(DcpChannel.this.filterFailoverLogResponse(byteBuf));
                }
                if (DcpStreamEndMessage.is(byteBuf)) {
                    return Boolean.valueOf(DcpChannel.this.filterDcpStreamEndMessage(byteBuf));
                }
                if (DcpCloseStreamResponse.is(byteBuf)) {
                    return Boolean.valueOf(DcpChannel.this.filterDcpCloseStreamResponse(byteBuf));
                }
                if (DcpGetPartitionSeqnosResponse.is(byteBuf)) {
                    return Boolean.valueOf(DcpChannel.this.filterDcpGetPartitionSeqnosResponse(byteBuf));
                }
                return true;
            }
        }).subscribe(new Subscriber<ByteBuf>() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.1
            public void onCompleted() {
            }

            public void onError(Throwable th) {
            }

            public void onNext(ByteBuf byteBuf) {
                clientEnvironment.controlEventHandler().onEvent(byteBuf);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean filterOpenStreamResponse(ByteBuf byteBuf) {
        try {
            Promise<?> remove = this.outstandingPromises.remove(Integer.valueOf(MessageUtil.getOpaque(byteBuf)));
            short shortValue = this.outstandingVbucketInfos.remove(Integer.valueOf(MessageUtil.getOpaque(byteBuf))).shortValue();
            short status = MessageUtil.getStatus(byteBuf);
            switch (status) {
                case 0:
                    remove.setSuccess((Object) null);
                    ByteBuf buffer = Unpooled.buffer();
                    DcpFailoverLogResponse.init(buffer);
                    DcpFailoverLogResponse.vbucket(buffer, DcpOpenStreamResponse.vbucket(byteBuf));
                    ByteBuf writeShort = MessageUtil.getContent(byteBuf).copy().writeShort(shortValue);
                    MessageUtil.setContent(writeShort, buffer);
                    writeShort.release();
                    this.env.controlEventHandler().onEvent(buffer);
                    break;
                case 35:
                    remove.setSuccess((Object) null);
                    ByteBuf buffer2 = Unpooled.buffer();
                    RollbackMessage.init(buffer2, shortValue, MessageUtil.getContent(byteBuf).getLong(0));
                    this.env.controlEventHandler().onEvent(buffer2);
                    break;
                default:
                    remove.setFailure(new IllegalStateException("Unhandled Status: " + ((int) status)));
                    break;
            }
            return false;
        } finally {
            byteBuf.release();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean filterDcpGetPartitionSeqnosResponse(ByteBuf byteBuf) {
        try {
            this.outstandingPromises.remove(Integer.valueOf(MessageUtil.getOpaque(byteBuf))).setSuccess(MessageUtil.getContent(byteBuf).copy());
            byteBuf.release();
            return false;
        } catch (Throwable th) {
            byteBuf.release();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean filterFailoverLogResponse(ByteBuf byteBuf) {
        try {
            Promise<?> remove = this.outstandingPromises.remove(Integer.valueOf(MessageUtil.getOpaque(byteBuf)));
            short shortValue = this.outstandingVbucketInfos.remove(Integer.valueOf(MessageUtil.getOpaque(byteBuf))).shortValue();
            ByteBuf buffer = Unpooled.buffer();
            DcpFailoverLogResponse.init(buffer);
            DcpFailoverLogResponse.vbucket(buffer, DcpFailoverLogResponse.vbucket(byteBuf));
            MessageUtil.setContent(MessageUtil.getContent(byteBuf).copy().writeShort(shortValue), buffer);
            remove.setSuccess(buffer);
            byteBuf.release();
            return false;
        } catch (Throwable th) {
            byteBuf.release();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean filterDcpStreamEndMessage(ByteBuf byteBuf) {
        try {
            int readInt = MessageUtil.getExtras(byteBuf).readInt();
            short vbucket = DcpStreamEndMessage.vbucket(byteBuf);
            LOGGER.debug("Server closed Stream on vbid {} with flag {}", Short.valueOf(vbucket), Integer.valueOf(readInt));
            this.openStreams.set(vbucket, 0);
            if (this.needsBufferAck) {
                acknowledgeBuffer(byteBuf.readableBytes());
            }
            return false;
        } finally {
            byteBuf.release();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean filterDcpCloseStreamResponse(ByteBuf byteBuf) {
        try {
            this.outstandingPromises.remove(Integer.valueOf(MessageUtil.getOpaque(byteBuf))).setSuccess((Object) null);
            if (this.needsBufferAck) {
                acknowledgeBuffer(byteBuf.readableBytes());
            }
            return false;
        } finally {
            byteBuf.release();
        }
    }

    public Completable connect() {
        return Completable.create(new Completable.CompletableOnSubscribe() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.3
            public void call(final Completable.CompletableSubscriber completableSubscriber) {
                if (DcpChannel.this.state() != LifecycleState.DISCONNECTED) {
                    completableSubscriber.onCompleted();
                    return;
                }
                Bootstrap group = new Bootstrap().remoteAddress(DcpChannel.this.inetAddress, 11210).channel(ChannelUtils.channelForEventLoopGroup(DcpChannel.this.env.eventLoopGroup())).handler(new DcpPipeline(DcpChannel.this.env, DcpChannel.this.controlSubject)).group(DcpChannel.this.env.eventLoopGroup());
                DcpChannel.this.transitionState(LifecycleState.CONNECTING);
                group.connect().addListener(new GenericFutureListener<ChannelFuture>() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.3.1
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if (!channelFuture.isSuccess()) {
                            DcpChannel.this.transitionState(LifecycleState.DISCONNECTED);
                            DcpChannel.LOGGER.warn("IMPLEMENT ME!!! (retry on failure until removed)");
                            completableSubscriber.onError(channelFuture.cause());
                        } else {
                            DcpChannel.this.channel = channelFuture.channel();
                            DcpChannel.this.transitionState(LifecycleState.CONNECTED);
                            DcpChannel.LOGGER.info("Connected to Node {}", DcpChannel.this.inetAddress);
                            completableSubscriber.onCompleted();
                        }
                    }
                });
            }
        });
    }

    public Completable disconnect() {
        return Completable.create(new Completable.CompletableOnSubscribe() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.4
            public void call(final Completable.CompletableSubscriber completableSubscriber) {
                if (DcpChannel.this.channel == null) {
                    completableSubscriber.onCompleted();
                    return;
                }
                DcpChannel.this.transitionState(LifecycleState.DISCONNECTING);
                DcpChannel.this.bufferAckCounter = 0;
                DcpChannel.this.channel.close().addListener(new GenericFutureListener<ChannelFuture>() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.4.1
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        DcpChannel.this.transitionState(LifecycleState.DISCONNECTED);
                        DcpChannel.LOGGER.info("Disconnected from Node " + DcpChannel.this.hostname());
                        if (channelFuture.isSuccess()) {
                            completableSubscriber.onCompleted();
                        } else {
                            DcpChannel.LOGGER.debug("Error during channel close.", channelFuture.cause());
                            completableSubscriber.onError(channelFuture.cause());
                        }
                    }
                });
            }
        });
    }

    public InetAddress hostname() {
        return this.inetAddress;
    }

    public void acknowledgeBuffer(int i) {
        if (state() != LifecycleState.CONNECTED) {
            throw new NotConnectedException(new NotConnectedException());
        }
        LOGGER.trace("Acknowledging {} bytes against connection {}.", Integer.valueOf(i), this.channel.remoteAddress());
        this.bufferAckCounter += i;
        LOGGER.trace("BufferAckCounter is now {}", Integer.valueOf(this.bufferAckCounter));
        if (this.bufferAckCounter >= this.bufferAckWatermark) {
            LOGGER.trace("BufferAckWatermark reached on {}, acking now against the server.", this.channel.remoteAddress());
            ByteBuf buffer = Unpooled.buffer();
            DcpBufferAckRequest.init(buffer);
            DcpBufferAckRequest.ackBytes(buffer, this.bufferAckCounter);
            this.channel.writeAndFlush(buffer);
            this.bufferAckCounter = 0;
        }
    }

    public Completable openStream(final short s, final long j, final long j2, final long j3, final long j4, final long j5) {
        return Completable.create(new Completable.CompletableOnSubscribe() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.5
            public void call(final Completable.CompletableSubscriber completableSubscriber) {
                if (DcpChannel.this.state() != LifecycleState.CONNECTED) {
                    completableSubscriber.onError(new NotConnectedException());
                    return;
                }
                DcpChannel.LOGGER.debug("Opening Stream against {} with vbid: {}, vbuuid: {}, startSeqno: {}, endSeqno: {},  snapshotStartSeqno: {}, snapshotEndSeqno: {}", new Object[]{DcpChannel.this.channel.remoteAddress(), Short.valueOf(s), Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j3), Long.valueOf(j4), Long.valueOf(j5)});
                int incrementAndGet = DcpChannel.OPAQUE.incrementAndGet();
                ChannelPromise newPromise = DcpChannel.this.channel.newPromise();
                ByteBuf buffer = Unpooled.buffer();
                DcpOpenStreamRequest.init(buffer, s);
                DcpOpenStreamRequest.opaque(buffer, incrementAndGet);
                DcpOpenStreamRequest.vbuuid(buffer, j);
                DcpOpenStreamRequest.startSeqno(buffer, j2);
                DcpOpenStreamRequest.endSeqno(buffer, j3);
                DcpOpenStreamRequest.snapshotStartSeqno(buffer, j4);
                DcpOpenStreamRequest.snapshotEndSeqno(buffer, j5);
                DcpChannel.this.outstandingPromises.put(Integer.valueOf(incrementAndGet), newPromise);
                DcpChannel.this.outstandingVbucketInfos.put(Integer.valueOf(incrementAndGet), Short.valueOf(s));
                DcpChannel.this.channel.writeAndFlush(buffer);
                newPromise.addListener(new GenericFutureListener<ChannelFuture>() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.5.1
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if (channelFuture.isSuccess()) {
                            DcpChannel.LOGGER.debug("Opened Stream against {} with vbid: {}", DcpChannel.this.channel.remoteAddress(), Short.valueOf(s));
                            DcpChannel.this.openStreams.set(s, 1);
                            completableSubscriber.onCompleted();
                        } else {
                            DcpChannel.LOGGER.debug("Failed open Stream against {} with vbid: {}", DcpChannel.this.channel.remoteAddress(), Short.valueOf(s));
                            DcpChannel.this.openStreams.set(s, 0);
                            completableSubscriber.onError(channelFuture.cause());
                        }
                    }
                });
            }
        });
    }

    public Completable closeStream(final short s) {
        return Completable.create(new Completable.CompletableOnSubscribe() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.6
            public void call(final Completable.CompletableSubscriber completableSubscriber) {
                if (DcpChannel.this.state() != LifecycleState.CONNECTED) {
                    completableSubscriber.onError(new NotConnectedException());
                    return;
                }
                DcpChannel.LOGGER.debug("Closing Stream against {} with vbid: {}", DcpChannel.this.channel.remoteAddress(), Short.valueOf(s));
                int incrementAndGet = DcpChannel.OPAQUE.incrementAndGet();
                ChannelPromise newPromise = DcpChannel.this.channel.newPromise();
                ByteBuf buffer = Unpooled.buffer();
                DcpCloseStreamRequest.init(buffer);
                DcpCloseStreamRequest.vbucket(buffer, s);
                DcpCloseStreamRequest.opaque(buffer, incrementAndGet);
                DcpChannel.this.outstandingPromises.put(Integer.valueOf(incrementAndGet), newPromise);
                DcpChannel.this.channel.writeAndFlush(buffer);
                newPromise.addListener(new GenericFutureListener<ChannelFuture>() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.6.1
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        DcpChannel.this.openStreams.set(s, 0);
                        if (channelFuture.isSuccess()) {
                            DcpChannel.LOGGER.debug("Closed Stream against {} with vbid: {}", DcpChannel.this.channel.remoteAddress(), Short.valueOf(s));
                            completableSubscriber.onCompleted();
                        } else {
                            DcpChannel.LOGGER.debug("Failed close Stream against {} with vbid: {}", DcpChannel.this.channel.remoteAddress(), Short.valueOf(s));
                            completableSubscriber.onError(channelFuture.cause());
                        }
                    }
                });
            }
        });
    }

    public Single<ByteBuf> getSeqnos() {
        return Single.create(new Single.OnSubscribe<ByteBuf>() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.7
            public void call(final SingleSubscriber<? super ByteBuf> singleSubscriber) {
                if (DcpChannel.this.state() != LifecycleState.CONNECTED) {
                    singleSubscriber.onError(new NotConnectedException());
                    return;
                }
                int incrementAndGet = DcpChannel.OPAQUE.incrementAndGet();
                DefaultPromise defaultPromise = new DefaultPromise(DcpChannel.this.channel.eventLoop());
                ByteBuf buffer = Unpooled.buffer();
                DcpGetPartitionSeqnosRequest.init(buffer);
                DcpGetPartitionSeqnosRequest.opaque(buffer, incrementAndGet);
                DcpChannel.this.outstandingPromises.put(Integer.valueOf(incrementAndGet), defaultPromise);
                DcpChannel.this.channel.writeAndFlush(buffer);
                defaultPromise.addListener(new GenericFutureListener<Future<ByteBuf>>() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.7.1
                    public void operationComplete(Future<ByteBuf> future) throws Exception {
                        if (future.isSuccess()) {
                            singleSubscriber.onSuccess(future.getNow());
                        } else {
                            singleSubscriber.onError(future.cause());
                        }
                    }
                });
            }
        });
    }

    public Single<ByteBuf> getFailoverLog(final short s) {
        return Single.create(new Single.OnSubscribe<ByteBuf>() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.8
            public void call(final SingleSubscriber<? super ByteBuf> singleSubscriber) {
                if (DcpChannel.this.state() != LifecycleState.CONNECTED) {
                    singleSubscriber.onError(new NotConnectedException());
                    return;
                }
                int incrementAndGet = DcpChannel.OPAQUE.incrementAndGet();
                DefaultPromise defaultPromise = new DefaultPromise(DcpChannel.this.channel.eventLoop());
                ByteBuf buffer = Unpooled.buffer();
                DcpFailoverLogRequest.init(buffer);
                DcpFailoverLogRequest.opaque(buffer, incrementAndGet);
                DcpFailoverLogRequest.vbucket(buffer, s);
                DcpChannel.this.outstandingPromises.put(Integer.valueOf(incrementAndGet), defaultPromise);
                DcpChannel.this.outstandingVbucketInfos.put(Integer.valueOf(incrementAndGet), Short.valueOf(s));
                DcpChannel.this.channel.writeAndFlush(buffer);
                defaultPromise.addListener(new GenericFutureListener<Future<ByteBuf>>() { // from class: com.couchbase.client.dcp.conductor.DcpChannel.8.1
                    public void operationComplete(Future<ByteBuf> future) throws Exception {
                        if (future.isSuccess()) {
                            DcpChannel.LOGGER.debug("Asked for failover log on {} for vbid: {}", DcpChannel.this.channel.remoteAddress(), Short.valueOf(s));
                            singleSubscriber.onSuccess(future.getNow());
                        } else {
                            DcpChannel.LOGGER.debug("Failed to ask for failover log on {} for vbid: {}", DcpChannel.this.channel.remoteAddress(), Short.valueOf(s));
                            singleSubscriber.onError(future.cause());
                        }
                    }
                });
            }
        });
    }

    public boolean streamIsOpen(short s) {
        return this.openStreams.get(s) == 1;
    }

    public boolean equals(Object obj) {
        return this.inetAddress.equals(obj);
    }

    public int hashCode() {
        return this.inetAddress.hashCode();
    }

    public String toString() {
        return "DcpChannel{inetAddress=" + this.inetAddress + '}';
    }
}
