package com.couchbase.client.dcp;

import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.dcp.conductor.Conductor;
import com.couchbase.client.dcp.conductor.ConfigProvider;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.config.DcpControl;
import com.couchbase.client.dcp.message.DcpDeletionMessage;
import com.couchbase.client.dcp.message.DcpExpirationMessage;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerMessage;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.state.SessionState;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.channel.EventLoopGroup;
import com.couchbase.client.deps.io.netty.channel.nio.NioEventLoopGroup;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import rx.Completable;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;

/* loaded from: input_file:com/couchbase/client/dcp/Client.class */
public class Client {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(Client.class);
    private final Conductor conductor;
    private final ClientEnvironment env;
    private final boolean bufferAckEnabled;
    private final SessionState sessionState;

    /* loaded from: input_file:com/couchbase/client/dcp/Client$Builder.class */
    public static class Builder {
        private EventLoopGroup eventLoopGroup;
        private int bufferAckWatermark;
        private List<String> clusterAt = Arrays.asList("127.0.0.1");
        private String bucket = "default";
        private String password = "";
        private ConnectionNameGenerator connectionNameGenerator = DefaultConnectionNameGenerator.INSTANCE;
        private DcpControl dcpControl = new DcpControl();
        private ConfigProvider configProvider = null;

        public Builder bufferAckWatermark(int i) {
            if (i > 100 || i < 0) {
                throw new IllegalArgumentException("The bufferAckWatermark is percents, so it needs to be between 0 and 100");
            }
            this.bufferAckWatermark = i;
            return this;
        }

        public Builder hostnames(List<String> list) {
            this.clusterAt = list;
            return this;
        }

        public Builder hostnames(String... strArr) {
            return hostnames(Arrays.asList(strArr));
        }

        public Builder eventLoopGroup(EventLoopGroup eventLoopGroup) {
            this.eventLoopGroup = eventLoopGroup;
            return this;
        }

        public Builder bucket(String str) {
            this.bucket = str;
            return this;
        }

        public Builder password(String str) {
            this.password = str;
            return this;
        }

        public Builder connectionNameGenerator(ConnectionNameGenerator connectionNameGenerator) {
            this.connectionNameGenerator = connectionNameGenerator;
            return this;
        }

        public Builder controlParam(DcpControl.Names names, Object obj) {
            this.dcpControl.put(names, obj.toString());
            return this;
        }

        public Builder configProvider(ConfigProvider configProvider) {
            this.configProvider = configProvider;
            return this;
        }

        public Client build() {
            return new Client(this);
        }
    }

    private Client(Builder builder) {
        this.env = ClientEnvironment.builder().setClusterAt(builder.clusterAt).setConnectionNameGenerator(builder.connectionNameGenerator).setBucket(builder.bucket).setPassword(builder.password).setDcpControl(builder.dcpControl).setEventLoopGroup(builder.eventLoopGroup == null ? new NioEventLoopGroup() : builder.eventLoopGroup).setBufferAckWatermark(builder.bufferAckWatermark).build();
        this.sessionState = new SessionState();
        this.bufferAckEnabled = this.env.dcpControl().bufferAckEnabled();
        if (this.bufferAckEnabled && this.env.bufferAckWatermark() == 0) {
            throw new IllegalArgumentException("BufferAckWatermark needs to be set if bufferAck is enabled.");
        }
        this.conductor = new Conductor(this.env, builder.configProvider);
    }

    public Observable<long[]> getSeqnos() {
        return this.conductor.getSeqnos().flatMap(new Func1<ByteBuf, Observable<long[]>>() { // from class: com.couchbase.client.dcp.Client.1
            public Observable<long[]> call(ByteBuf byteBuf) {
                int readableBytes = byteBuf.readableBytes() / 10;
                ArrayList arrayList = new ArrayList(readableBytes);
                for (int i = 0; i < readableBytes; i++) {
                    arrayList.add(new long[]{byteBuf.getShort(10 * i), byteBuf.getLong((10 * i) + 2)});
                }
                byteBuf.release();
                return Observable.from(arrayList);
            }
        });
    }

    public SessionState sessionState() {
        return this.sessionState;
    }

    public void controlEventHandler(final ControlEventHandler controlEventHandler) {
        this.env.setControlEventHandler(new ControlEventHandler() { // from class: com.couchbase.client.dcp.Client.2
            @Override // com.couchbase.client.dcp.ControlEventHandler
            public void onEvent(ByteBuf byteBuf) {
                if (DcpSnapshotMarkerMessage.is(byteBuf)) {
                    short partition = DcpSnapshotMarkerMessage.partition(byteBuf);
                    PartitionState partitionState = Client.this.sessionState.get(partition);
                    partitionState.setSnapshotStartSeqno(DcpSnapshotMarkerMessage.startSeqno(byteBuf));
                    partitionState.setSnapshotEndSeqno(DcpSnapshotMarkerMessage.endSeqno(byteBuf));
                    Client.this.sessionState.set(partition, partitionState);
                    Client.this.acknowledgeBuffer(byteBuf);
                    byteBuf.release();
                    return;
                }
                if (!DcpFailoverLogResponse.is(byteBuf)) {
                    controlEventHandler.onEvent(byteBuf);
                    return;
                }
                short vbucket = DcpFailoverLogResponse.vbucket(byteBuf);
                long vbuuidEntry = DcpFailoverLogResponse.vbuuidEntry(byteBuf, DcpFailoverLogResponse.numLogEntries(byteBuf) - 1);
                PartitionState partitionState2 = Client.this.sessionState.get(vbucket);
                partitionState2.setUuid(vbuuidEntry);
                Client.this.sessionState.set(vbucket, partitionState2);
                byteBuf.release();
            }
        });
    }

    public void dataEventHandler(final DataEventHandler dataEventHandler) {
        this.env.setDataEventHandler(new DataEventHandler() { // from class: com.couchbase.client.dcp.Client.3
            @Override // com.couchbase.client.dcp.DataEventHandler
            public void onEvent(ByteBuf byteBuf) {
                if (DcpMutationMessage.is(byteBuf)) {
                    short partition = DcpMutationMessage.partition(byteBuf);
                    PartitionState partitionState = Client.this.sessionState.get(partition);
                    partitionState.setStartSeqno(DcpMutationMessage.revisionSeqno(byteBuf));
                    Client.this.sessionState.set(partition, partitionState);
                } else if (DcpDeletionMessage.is(byteBuf)) {
                    short partition2 = DcpDeletionMessage.partition(byteBuf);
                    PartitionState partitionState2 = Client.this.sessionState.get(partition2);
                    partitionState2.setStartSeqno(DcpDeletionMessage.revisionSeqno(byteBuf));
                    Client.this.sessionState.set(partition2, partitionState2);
                } else if (DcpExpirationMessage.is(byteBuf)) {
                    short partition3 = DcpExpirationMessage.partition(byteBuf);
                    PartitionState partitionState3 = Client.this.sessionState.get(partition3);
                    partitionState3.setStartSeqno(DcpExpirationMessage.revisionSeqno(byteBuf));
                    Client.this.sessionState.set(partition3, partitionState3);
                }
                dataEventHandler.onEvent(byteBuf);
            }
        });
    }

    public static Builder configure() {
        return new Builder();
    }

    public Completable connect() {
        if (this.env.dataEventHandler() == null) {
            throw new IllegalArgumentException("A DataEventHandler needs to be provided!");
        }
        if (this.env.controlEventHandler() == null) {
            throw new IllegalArgumentException("A ControlEventHandler needs to be provided!");
        }
        return this.conductor.connect();
    }

    public Completable disconnect() {
        return this.conductor.stop();
    }

    public Completable startStreams(Integer... numArr) {
        ArrayList arrayList = new ArrayList();
        if (numArr.length > 0) {
            LOGGER.info("Starting Stream against partitions {} with no end.", arrayList);
            arrayList.addAll(Arrays.asList(numArr));
        } else {
            int numberOfPartitions = this.conductor.numberOfPartitions();
            LOGGER.info("Starting Stream against all {} partitions with no end.", Integer.valueOf(numberOfPartitions));
            for (int i = 0; i < numberOfPartitions; i++) {
                arrayList.add(Integer.valueOf(i));
            }
        }
        Collections.sort(arrayList);
        return Observable.from(arrayList).flatMap(new Func1<Integer, Observable<?>>() { // from class: com.couchbase.client.dcp.Client.4
            public Observable<?> call(Integer num) {
                PartitionState partitionState = Client.this.sessionState.get(num.intValue());
                return Client.this.conductor.startStreamForPartition(num.shortValue(), partitionState.getUuid(), partitionState.getStartSeqno(), partitionState.getEndSeqno(), partitionState.getSnapshotStartSeqno(), partitionState.getSnapshotEndSeqno()).toObservable();
            }
        }).toCompletable();
    }

    public Completable initializeFromBeginningToNoEnd() {
        this.sessionState.intializeToBeginningWithNoEnd();
        return Completable.complete();
    }

    public Completable initializeFromNowToNoEnd() {
        this.sessionState.intializeToBeginningWithNoEnd();
        return getSeqnos().doOnNext(new Action1<long[]>() { // from class: com.couchbase.client.dcp.Client.8
            public void call(long[] jArr) {
                short s = (short) jArr[0];
                long j = jArr[1];
                PartitionState partitionState = Client.this.sessionState.get(s);
                partitionState.setStartSeqno(j);
                partitionState.setSnapshotStartSeqno(j);
                Client.this.sessionState.set(s, partitionState);
            }
        }).reduce(new ArrayList(), new Func2<ArrayList<Integer>, long[], ArrayList<Integer>>() { // from class: com.couchbase.client.dcp.Client.7
            public ArrayList<Integer> call(ArrayList<Integer> arrayList, long[] jArr) {
                arrayList.add(Integer.valueOf((int) jArr[0]));
                return arrayList;
            }
        }).flatMap(new Func1<ArrayList<Integer>, Observable<ByteBuf>>() { // from class: com.couchbase.client.dcp.Client.6
            public Observable<ByteBuf> call(ArrayList<Integer> arrayList) {
                return Client.this.getFailoverLogs((Integer[]) arrayList.toArray(new Integer[0]));
            }
        }).map(new Func1<ByteBuf, Integer>() { // from class: com.couchbase.client.dcp.Client.5
            public Integer call(ByteBuf byteBuf) {
                short vbucket = DcpFailoverLogResponse.vbucket(byteBuf);
                long vbuuidEntry = DcpFailoverLogResponse.vbuuidEntry(byteBuf, DcpFailoverLogResponse.numLogEntries(byteBuf) - 1);
                PartitionState partitionState = Client.this.sessionState.get(vbucket);
                partitionState.setUuid(vbuuidEntry);
                Client.this.sessionState.set(vbucket, partitionState);
                byteBuf.release();
                return Integer.valueOf(vbucket);
            }
        }).last().toCompletable();
    }

    public Completable stopStreams(Integer... numArr) {
        return Observable.from(partitionsForVbids(this.conductor.numberOfPartitions(), numArr)).flatMap(new Func1<Integer, Observable<?>>() { // from class: com.couchbase.client.dcp.Client.9
            public Observable<?> call(Integer num) {
                return Client.this.conductor.stopStreamForPartition(num.shortValue()).toObservable();
            }
        }).toCompletable();
    }

    public Observable<ByteBuf> getFailoverLogs(Integer... numArr) {
        return Observable.from(partitionsForVbids(this.conductor.numberOfPartitions(), numArr)).flatMap(new Func1<Integer, Observable<ByteBuf>>() { // from class: com.couchbase.client.dcp.Client.10
            public Observable<ByteBuf> call(Integer num) {
                return Client.this.conductor.getFailoverLog(num.shortValue()).toObservable();
            }
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v10, types: [java.util.List] */
    private static final List<Integer> partitionsForVbids(int i, Integer... numArr) {
        ArrayList arrayList = new ArrayList();
        if (numArr.length > 0) {
            arrayList = Arrays.asList(numArr);
        } else {
            for (int i2 = 0; i2 < i; i2++) {
                arrayList.add(Integer.valueOf(i2));
            }
        }
        Collections.sort(arrayList);
        return arrayList;
    }

    public int numPartitions() {
        return this.conductor.numberOfPartitions();
    }

    public boolean streamIsOpen(int i) {
        return this.conductor.streamIsOpen((short) i);
    }

    public void acknowledgeBuffer(int i, int i2) {
        if (this.bufferAckEnabled) {
            this.conductor.acknowledgeBuffer((short) i, i2);
        }
    }

    public void acknowledgeBuffer(ByteBuf byteBuf) {
        acknowledgeBuffer(MessageUtil.getVbucket(byteBuf), byteBuf.readableBytes());
    }
}
