package com.couchbase.spark.streaming;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.endpoint.dcp.DCPConnection;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.dcp.DCPMessage;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
import com.couchbase.client.core.message.kv.MutationToken;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.spark.streaming.state.ConnectorState;
import com.couchbase.spark.streaming.state.StateSerializer;
import com.couchbase.spark.streaming.state.StreamState;
import com.couchbase.spark.streaming.state.StreamStateUpdatedEvent;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Action2;
import rx.functions.Func0;
import rx.functions.Func1;

/* loaded from: input_file:com/couchbase/spark/streaming/CouchbaseReader.class */
public class CouchbaseReader {
    private static final int BUFFER_SIZE = 16384;
    private final ClusterFacade core;
    private final String bucket;
    private final StateSerializer stateSerializer;
    private final String connectionName = "CouchbaseSpark(" + hashCode() + ")";
    private final CouchbaseEnvironment environment;
    private DCPConnection connection;

    public CouchbaseReader(String str, ClusterFacade clusterFacade, CouchbaseEnvironment couchbaseEnvironment, StateSerializer stateSerializer) {
        this.core = clusterFacade;
        this.bucket = str;
        this.stateSerializer = stateSerializer;
        this.environment = couchbaseEnvironment;
    }

    public void connect() {
        connect(this.environment.connectTimeout(), TimeUnit.SECONDS);
    }

    private void connect(long j, TimeUnit timeUnit) {
        this.connection = ((OpenConnectionResponse) this.core.send(new OpenConnectionRequest(this.connectionName, this.bucket)).timeout(j, timeUnit).toBlocking().single()).connection();
    }

    private ConnectorState _currentState() {
        return (ConnectorState) this.connection.getCurrentState().collect(new Func0<ConnectorState>() { // from class: com.couchbase.spark.streaming.CouchbaseReader.1
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public ConnectorState m88call() {
                return new ConnectorState();
            }
        }, new Action2<ConnectorState, MutationToken>() { // from class: com.couchbase.spark.streaming.CouchbaseReader.2
            public void call(ConnectorState connectorState, MutationToken mutationToken) {
                connectorState.put(new StreamState(mutationToken));
            }
        }).toBlocking().single();
    }

    public ConnectorState currentState(int... iArr) {
        ConnectorState _currentState = _currentState();
        if (iArr.length == 0) {
            return _currentState;
        }
        ConnectorState connectorState = new ConnectorState();
        for (int i : iArr) {
            connectorState.put(_currentState.get((short) i));
        }
        return connectorState;
    }

    public ConnectorState startState(int... iArr) {
        return overrideSequenceNumber(currentState(iArr), 0L);
    }

    public ConnectorState endState(int... iArr) {
        return overrideSequenceNumber(currentState(iArr), -1L);
    }

    private ConnectorState overrideSequenceNumber(ConnectorState connectorState, long j) {
        ConnectorState connectorState2 = new ConnectorState();
        Iterator<StreamState> it = connectorState.iterator();
        while (it.hasNext()) {
            StreamState next = it.next();
            connectorState2.put(new StreamState(next.partition(), next.vbucketUUID(), j));
        }
        return connectorState2;
    }

    public Observable<DCPRequest> run(ConnectorState connectorState, final ConnectorState connectorState2) {
        if (!Arrays.equals(connectorState.partitions(), connectorState2.partitions())) {
            throw new IllegalArgumentException("partitions in FROM state do not match partitions in TO state");
        }
        connectorState.m99clone().updates().subscribe(new Action1<StreamStateUpdatedEvent>() { // from class: com.couchbase.spark.streaming.CouchbaseReader.3
            public void call(StreamStateUpdatedEvent streamStateUpdatedEvent) {
                CouchbaseReader.this.stateSerializer.dump(streamStateUpdatedEvent.connectorState(), streamStateUpdatedEvent.partition());
            }
        });
        return Observable.from(connectorState).flatMap(new Func1<StreamState, Observable<ResponseStatus>>() { // from class: com.couchbase.spark.streaming.CouchbaseReader.5
            public Observable<ResponseStatus> call(StreamState streamState) {
                StreamState streamState2 = connectorState2.get(streamState.partition());
                return CouchbaseReader.this.connection.addStream(streamState.partition(), streamState.vbucketUUID(), streamState.sequenceNumber(), streamState2.sequenceNumber(), streamState.sequenceNumber(), streamState2.sequenceNumber());
            }
        }).toList().flatMap(new Func1<List<ResponseStatus>, Observable<DCPRequest>>() { // from class: com.couchbase.spark.streaming.CouchbaseReader.4
            public Observable<DCPRequest> call(List<ResponseStatus> list) {
                return CouchbaseReader.this.connection.subject();
            }
        }).onBackpressureBuffer(16384L);
    }

    public <T extends DCPMessage> void consumed(T t) {
        this.connection.consumed(t);
    }
}
