package com.couchbase.kafka;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.message.CouchbaseMessage;
import com.couchbase.client.core.message.CouchbaseResponse;
import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
import com.couchbase.client.core.message.cluster.OpenBucketRequest;
import com.couchbase.client.core.message.cluster.SeedNodesRequest;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
import com.couchbase.client.core.message.dcp.StreamRequestRequest;
import com.couchbase.client.core.message.dcp.StreamRequestResponse;
import com.couchbase.client.deps.com.lmax.disruptor.EventTranslatorOneArg;
import com.couchbase.client.deps.com.lmax.disruptor.RingBuffer;
import java.util.List;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

/* loaded from: input_file:com/couchbase/kafka/CouchbaseReader.class */
public class CouchbaseReader {
    private final ClusterFacade core;
    private final RingBuffer<DCPEvent> dcpRingBuffer;
    private final List<String> nodes;
    private final String bucket;
    private final String streamName = "CouchbaseKafka(" + hashCode() + ")";
    private final String password;
    private static final EventTranslatorOneArg<DCPEvent, CouchbaseMessage> TRANSLATOR = new EventTranslatorOneArg<DCPEvent, CouchbaseMessage>() { // from class: com.couchbase.kafka.CouchbaseReader.1
        public void translateTo(DCPEvent dCPEvent, long j, CouchbaseMessage couchbaseMessage) {
            dCPEvent.setMessage(couchbaseMessage);
        }
    };

    public CouchbaseReader(ClusterFacade clusterFacade, RingBuffer<DCPEvent> ringBuffer, List<String> list, String str, String str2) {
        this.core = clusterFacade;
        this.dcpRingBuffer = ringBuffer;
        this.nodes = list;
        this.bucket = str;
        this.password = str2;
    }

    public void connect() {
        connect(2L, TimeUnit.SECONDS);
    }

    public void connect(long j, TimeUnit timeUnit) {
        this.core.send(new SeedNodesRequest(this.nodes)).timeout(j, timeUnit).toBlocking().single();
        this.core.send(new OpenBucketRequest(this.bucket, this.password)).timeout(j, timeUnit).toBlocking().single();
    }

    public void run() {
        this.core.send(new OpenConnectionRequest(this.streamName, this.bucket)).toList().flatMap(new Func1<List<CouchbaseResponse>, Observable<Integer>>() { // from class: com.couchbase.kafka.CouchbaseReader.4
            public Observable<Integer> call(List<CouchbaseResponse> list) {
                return CouchbaseReader.this.partitionSize();
            }
        }).flatMap(new Func1<Integer, Observable<DCPRequest>>() { // from class: com.couchbase.kafka.CouchbaseReader.3
            public Observable<DCPRequest> call(Integer num) {
                return CouchbaseReader.this.requestStreams(num.intValue());
            }
        }).toBlocking().forEach(new Action1<DCPRequest>() { // from class: com.couchbase.kafka.CouchbaseReader.2
            public void call(DCPRequest dCPRequest) {
                CouchbaseReader.this.dcpRingBuffer.tryPublishEvent(CouchbaseReader.TRANSLATOR, dCPRequest);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<Integer> partitionSize() {
        return this.core.send(new GetClusterConfigRequest()).map(new Func1<GetClusterConfigResponse, Integer>() { // from class: com.couchbase.kafka.CouchbaseReader.5
            public Integer call(GetClusterConfigResponse getClusterConfigResponse) {
                return Integer.valueOf(getClusterConfigResponse.config().bucketConfig(CouchbaseReader.this.bucket).numberOfPartitions());
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<DCPRequest> requestStreams(int i) {
        return Observable.merge(Observable.range(0, i).flatMap(new Func1<Integer, Observable<StreamRequestResponse>>() { // from class: com.couchbase.kafka.CouchbaseReader.7
            public Observable<StreamRequestResponse> call(Integer num) {
                return CouchbaseReader.this.core.send(new StreamRequestRequest(num.shortValue(), CouchbaseReader.this.bucket));
            }
        }).map(new Func1<StreamRequestResponse, Observable<DCPRequest>>() { // from class: com.couchbase.kafka.CouchbaseReader.6
            public Observable<DCPRequest> call(StreamRequestResponse streamRequestResponse) {
                return streamRequestResponse.stream();
            }
        }));
    }
}
