package com.couchbase.client.core.dcp;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.annotations.InterfaceAudience;
import com.couchbase.client.core.annotations.InterfaceStability;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
import com.couchbase.client.core.message.dcp.StreamRequestRequest;
import com.couchbase.client.core.message.dcp.StreamRequestResponse;
import rx.Observable;
import rx.functions.Func1;

@InterfaceAudience.Public
@InterfaceStability.Experimental
/* loaded from: input_file:com/couchbase/client/core/dcp/BucketStreamAggregator.class */
public class BucketStreamAggregator {
    private final ClusterFacade core;
    private final String bucket;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.couchbase.client.core.dcp.BucketStreamAggregator$2, reason: invalid class name */
    /* loaded from: input_file:com/couchbase/client/core/dcp/BucketStreamAggregator$2.class */
    public class AnonymousClass2 implements Func1<OpenConnectionResponse, Observable<StreamRequestResponse>> {
        final /* synthetic */ BucketStreamAggregatorState val$aggregatorState;

        AnonymousClass2(BucketStreamAggregatorState bucketStreamAggregatorState) {
            this.val$aggregatorState = bucketStreamAggregatorState;
        }

        public Observable<StreamRequestResponse> call(OpenConnectionResponse openConnectionResponse) {
            return Observable.from(this.val$aggregatorState).flatMap(new Func1<BucketStreamState, Observable<StreamRequestResponse>>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.2.1
                public Observable<StreamRequestResponse> call(final BucketStreamState bucketStreamState) {
                    return BucketStreamAggregator.this.core.send(new StreamRequestRequest(bucketStreamState.partition(), bucketStreamState.vbucketUUID(), bucketStreamState.startSequenceNumber(), bucketStreamState.endSequenceNumber(), bucketStreamState.snapshotStartSequenceNumber(), bucketStreamState.snapshotEndSequenceNumber(), BucketStreamAggregator.this.bucket)).flatMap(new Func1<StreamRequestResponse, Observable<StreamRequestResponse>>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.2.1.1
                        public Observable<StreamRequestResponse> call(StreamRequestResponse streamRequestResponse) {
                            long rollbackToSequenceNumber;
                            switch (AnonymousClass4.$SwitchMap$com$couchbase$client$core$message$ResponseStatus[streamRequestResponse.status().ordinal()]) {
                                case 1:
                                    rollbackToSequenceNumber = 0;
                                    break;
                                case 2:
                                    rollbackToSequenceNumber = streamRequestResponse.getRollbackToSequenceNumber();
                                    break;
                                default:
                                    return Observable.just(streamRequestResponse);
                            }
                            return BucketStreamAggregator.this.core.send(new StreamRequestRequest(bucketStreamState.partition(), bucketStreamState.vbucketUUID(), rollbackToSequenceNumber, bucketStreamState.endSequenceNumber(), bucketStreamState.snapshotStartSequenceNumber(), bucketStreamState.snapshotEndSequenceNumber(), BucketStreamAggregator.this.bucket));
                        }
                    });
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.couchbase.client.core.dcp.BucketStreamAggregator$4, reason: invalid class name */
    /* loaded from: input_file:com/couchbase/client/core/dcp/BucketStreamAggregator$4.class */
    public static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$com$couchbase$client$core$message$ResponseStatus = new int[ResponseStatus.values().length];

        static {
            try {
                $SwitchMap$com$couchbase$client$core$message$ResponseStatus[ResponseStatus.RANGE_ERROR.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$couchbase$client$core$message$ResponseStatus[ResponseStatus.ROLLBACK.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public BucketStreamAggregator(ClusterFacade clusterFacade, String str) {
        this.core = clusterFacade;
        this.bucket = str;
    }

    public Observable<DCPRequest> feed() {
        return feed(new BucketStreamAggregatorState("jvmCore"));
    }

    public Observable<DCPRequest> feed(BucketStreamAggregatorState bucketStreamAggregatorState) {
        return open(bucketStreamAggregatorState).flatMap(new Func1<StreamRequestResponse, Observable<DCPRequest>>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.1
            public Observable<DCPRequest> call(StreamRequestResponse streamRequestResponse) {
                return streamRequestResponse.stream();
            }
        });
    }

    public Observable<StreamRequestResponse> open(BucketStreamAggregatorState bucketStreamAggregatorState) {
        return this.core.send(new OpenConnectionRequest(bucketStreamAggregatorState.name(), this.bucket)).flatMap(new AnonymousClass2(bucketStreamAggregatorState));
    }

    private Observable<Integer> partitionSize() {
        return this.core.send(new GetClusterConfigRequest()).map(new Func1<GetClusterConfigResponse, Integer>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.3
            public Integer call(GetClusterConfigResponse getClusterConfigResponse) {
                return Integer.valueOf(((CouchbaseBucketConfig) getClusterConfigResponse.config().bucketConfig(BucketStreamAggregator.this.bucket)).numberOfPartitions());
            }
        });
    }
}
