package com.google.cloud.pubsub;

import com.google.auth.Credentials;
import com.google.cloud.pubsub.AbstractSubscriberConnection;
import com.google.cloud.pubsub.Subscriber;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.SettableFuture;
import com.google.pubsub.v1.StreamingPullRequest;
import com.google.pubsub.v1.StreamingPullResponse;
import com.google.pubsub.v1.SubscriberGrpc;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.ClientResponseObserver;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/google/cloud/pubsub/StreamingSubscriberConnection.class */
public final class StreamingSubscriberConnection extends AbstractSubscriberConnection {
    private static final Logger logger = LoggerFactory.getLogger(StreamingSubscriberConnection.class);
    private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = new Duration(100);
    private static final int MAX_PER_REQUEST_CHANGES = 10000;
    private Duration channelReconnectBackoff;
    private final Channel channel;
    private final Credentials credentials;
    private ClientCallStreamObserver<StreamingPullRequest> requestObserver;

    public StreamingSubscriberConnection(String str, Credentials credentials, Subscriber.MessageReceiver messageReceiver, Duration duration, int i, Distribution distribution, Channel channel, FlowController flowController, ScheduledExecutorService scheduledExecutorService) {
        super(str, messageReceiver, duration, distribution, flowController, scheduledExecutorService);
        this.channelReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF;
        this.credentials = credentials;
        this.channel = channel;
        setMessageDeadlineSeconds(i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.google.cloud.pubsub.AbstractSubscriberConnection
    public void doStop() {
        super.doStop();
        this.requestObserver.onError(Status.CANCELLED.asException());
    }

    @Override // com.google.cloud.pubsub.AbstractSubscriberConnection
    void initialize() {
        final SettableFuture create = SettableFuture.create();
        ClientCallStreamObserver asyncBidiStreamingCall = ClientCalls.asyncBidiStreamingCall(this.channel.newCall(SubscriberGrpc.METHOD_STREAMING_PULL, CallOptions.DEFAULT.withCallCredentials(MoreCallCredentials.from(this.credentials))), new ClientResponseObserver<StreamingPullRequest, StreamingPullResponse>() { // from class: com.google.cloud.pubsub.StreamingSubscriberConnection.1
            public void beforeStart(ClientCallStreamObserver<StreamingPullRequest> clientCallStreamObserver) {
                StreamingSubscriberConnection.this.requestObserver = clientCallStreamObserver;
                clientCallStreamObserver.disableAutoInboundFlowControl();
            }

            public void onNext(StreamingPullResponse streamingPullResponse) {
                StreamingSubscriberConnection.this.processReceivedMessages(streamingPullResponse.getReceivedMessagesList());
                if (StreamingSubscriberConnection.this.isAlive()) {
                    StreamingSubscriberConnection.this.requestObserver.request(1);
                }
            }

            public void onError(Throwable th) {
                StreamingSubscriberConnection.logger.debug("Terminated streaming with exception", th);
                create.setException(th);
            }

            public void onCompleted() {
                StreamingSubscriberConnection.logger.debug("Streaming pull terminated successfully!");
                create.set((Object) null);
            }
        });
        logger.debug("Initializing stream to subscription {} with deadline {}", this.subscription, Integer.valueOf(getMessageDeadlineSeconds()));
        asyncBidiStreamingCall.onNext(StreamingPullRequest.newBuilder().setSubscription(this.subscription).setStreamAckDeadlineSeconds(getMessageDeadlineSeconds()).build());
        asyncBidiStreamingCall.request(1);
        Futures.addCallback(create, new FutureCallback<Void>() { // from class: com.google.cloud.pubsub.StreamingSubscriberConnection.2
            public void onSuccess(@Nullable Void r4) {
                StreamingSubscriberConnection.this.channelReconnectBackoff = StreamingSubscriberConnection.INITIAL_CHANNEL_RECONNECT_BACKOFF;
                StreamingSubscriberConnection.this.initialize();
            }

            public void onFailure(Throwable th) {
                if (!StreamingSubscriberConnection.this.isRetryable(Status.fromThrowable(th)) || !StreamingSubscriberConnection.this.isAlive()) {
                    if (StreamingSubscriberConnection.this.isAlive()) {
                        StreamingSubscriberConnection.this.notifyFailed(th);
                    }
                } else {
                    long millis = StreamingSubscriberConnection.this.channelReconnectBackoff.getMillis();
                    StreamingSubscriberConnection.this.channelReconnectBackoff = StreamingSubscriberConnection.this.channelReconnectBackoff.plus(millis);
                    StreamingSubscriberConnection.this.executor.schedule(new Runnable() { // from class: com.google.cloud.pubsub.StreamingSubscriberConnection.2.1
                        @Override // java.lang.Runnable
                        public void run() {
                            StreamingSubscriberConnection.this.initialize();
                        }
                    }, millis, TimeUnit.MILLISECONDS);
                }
            }
        }, this.executor);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isAlive() {
        return state() == Service.State.RUNNING || state() == Service.State.STARTING;
    }

    @Override // com.google.cloud.pubsub.AbstractSubscriberConnection
    void sendAckOperations(List<String> list, List<AbstractSubscriberConnection.PendingModifyAckDeadline> list2) {
        List partition = Lists.partition(list, MAX_PER_REQUEST_CHANGES);
        List partition2 = Lists.partition(list2, MAX_PER_REQUEST_CHANGES);
        Iterator it = partition.iterator();
        Iterator it2 = partition2.iterator();
        while (true) {
            if (!it.hasNext() && !it2.hasNext()) {
                return;
            }
            StreamingPullRequest.Builder newBuilder = StreamingPullRequest.newBuilder();
            if (it2.hasNext()) {
                for (AbstractSubscriberConnection.PendingModifyAckDeadline pendingModifyAckDeadline : (List) it2.next()) {
                    Iterator<String> it3 = pendingModifyAckDeadline.ackIds.iterator();
                    while (it3.hasNext()) {
                        newBuilder.addModifyDeadlineSeconds(pendingModifyAckDeadline.deadlineExtensionSeconds).addModifyDeadlineAckIds(it3.next());
                    }
                }
            }
            if (it.hasNext()) {
                newBuilder.addAllAckIds((List) it.next());
            }
            this.requestObserver.onNext(newBuilder.build());
        }
    }

    public void updateStreamAckDeadline(int i) {
        setMessageDeadlineSeconds(i);
        this.requestObserver.onNext(StreamingPullRequest.newBuilder().setStreamAckDeadlineSeconds(i).build());
    }
}
