package com.google.cloud.pubsub;

import com.google.cloud.pubsub.Publisher;
import com.google.cloud.pubsub.Subscriber;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.Interval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/google/cloud/pubsub/AbstractSubscriberConnection.class */
public abstract class AbstractSubscriberConnection extends AbstractService {
    private static final int INITIAL_ACK_DEADLINE_EXTENSION_SECONDS = 2;
    private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 600;
    protected final String subscription;
    protected final ScheduledExecutorService executor;
    private final Duration ackExpirationPadding;
    private final Subscriber.MessageReceiver receiver;
    private final FlowController flowController;
    private int messageDeadlineSeconds;
    private ScheduledFuture<?> ackDeadlineExtensionAlarm;
    private ScheduledFuture<?> pendingAcksAlarm;
    private final Distribution ackLatencyDistribution;
    private static final Logger logger = LoggerFactory.getLogger(AbstractSubscriberConnection.class);

    @VisibleForTesting
    static final Duration PENDING_ACKS_SEND_DELAY = Duration.millis(100);
    private final Map<ExpirationInfo, List<AckHandler>> outstandingAckHandlers = new HashMap();
    private final Set<String> pendingAcks = new HashSet();
    private final Set<String> pendingNacks = new HashSet();
    private final Lock alarmsLock = new ReentrantLock();
    private Instant nextAckDeadlineExtensionAlarmTime = new Instant(Long.MAX_VALUE);
    private final MessagesWaiter messagesWaiter = new MessagesWaiter();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.google.cloud.pubsub.AbstractSubscriberConnection$3, reason: invalid class name */
    /* loaded from: input_file:com/google/cloud/pubsub/AbstractSubscriberConnection$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$com$google$cloud$pubsub$Subscriber$MessageReceiver$AckReply;
        static final /* synthetic */ int[] $SwitchMap$io$grpc$Status$Code = new int[Status.Code.values().length];

        static {
            try {
                $SwitchMap$io$grpc$Status$Code[Status.Code.DEADLINE_EXCEEDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$grpc$Status$Code[Status.Code.INTERNAL.ordinal()] = AbstractSubscriberConnection.INITIAL_ACK_DEADLINE_EXTENSION_SECONDS;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$grpc$Status$Code[Status.Code.CANCELLED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$grpc$Status$Code[Status.Code.RESOURCE_EXHAUSTED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$grpc$Status$Code[Status.Code.UNAVAILABLE.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            $SwitchMap$com$google$cloud$pubsub$Subscriber$MessageReceiver$AckReply = new int[Subscriber.MessageReceiver.AckReply.values().length];
            try {
                $SwitchMap$com$google$cloud$pubsub$Subscriber$MessageReceiver$AckReply[Subscriber.MessageReceiver.AckReply.ACK.ordinal()] = 1;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$com$google$cloud$pubsub$Subscriber$MessageReceiver$AckReply[Subscriber.MessageReceiver.AckReply.NACK.ordinal()] = AbstractSubscriberConnection.INITIAL_ACK_DEADLINE_EXTENSION_SECONDS;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/pubsub/AbstractSubscriberConnection$AckDeadlineAlarm.class */
    public class AckDeadlineAlarm implements Runnable {
        private AckDeadlineAlarm() {
        }

        @Override // java.lang.Runnable
        public void run() {
            AbstractSubscriberConnection.this.alarmsLock.lock();
            try {
                AbstractSubscriberConnection.this.nextAckDeadlineExtensionAlarmTime = new Instant(Long.MAX_VALUE);
                AbstractSubscriberConnection.this.ackDeadlineExtensionAlarm = null;
                if (AbstractSubscriberConnection.this.pendingAcksAlarm != null) {
                    AbstractSubscriberConnection.this.pendingAcksAlarm.cancel(false);
                    AbstractSubscriberConnection.this.pendingAcksAlarm = null;
                }
                Instant now = Instant.now();
                Instant instant = new Instant(((long) Math.ceil(now.plus(AbstractSubscriberConnection.this.ackExpirationPadding).plus(500L).getMillis() / 1000.0d)) * 1000);
                AbstractSubscriberConnection.logger.debug("Running alarm sent outstanding acks, at now time: {}, with cutover time: {}, padding: {}", new Object[]{now, instant, AbstractSubscriberConnection.this.ackExpirationPadding});
                ExpirationInfo expirationInfo = null;
                ArrayList arrayList = new ArrayList();
                synchronized (AbstractSubscriberConnection.this.outstandingAckHandlers) {
                    for (ExpirationInfo expirationInfo2 : AbstractSubscriberConnection.this.outstandingAckHandlers.keySet()) {
                        if (expirationInfo2.expiration.compareTo(instant) <= 0) {
                            Collection<AckHandler> collection = (Collection) AbstractSubscriberConnection.this.outstandingAckHandlers.get(expirationInfo2);
                            AbstractSubscriberConnection.this.outstandingAckHandlers.remove(expirationInfo2);
                            ArrayList arrayList2 = new ArrayList(collection.size());
                            expirationInfo2.extendExpiration();
                            PendingModifyAckDeadline pendingModifyAckDeadline = new PendingModifyAckDeadline(Ints.saturatedCast(new Interval(now, expirationInfo2.expiration).toDuration().getStandardSeconds()));
                            for (AckHandler ackHandler : collection) {
                                if (!ackHandler.acked.get()) {
                                    pendingModifyAckDeadline.addAckId(ackHandler.ackId);
                                    arrayList2.add(ackHandler);
                                }
                            }
                            arrayList.add(pendingModifyAckDeadline);
                            if (arrayList2.isEmpty()) {
                                AbstractSubscriberConnection.this.outstandingAckHandlers.remove(expirationInfo2);
                            } else {
                                AbstractSubscriberConnection.this.addOutstadingAckHandlers(expirationInfo2, arrayList2);
                            }
                        }
                        if (expirationInfo == null || expirationInfo.expiration.isAfter(expirationInfo2.expiration)) {
                            expirationInfo = expirationInfo2;
                        }
                    }
                }
                AbstractSubscriberConnection.this.processOutstandingAckOperations(arrayList);
                if (expirationInfo != null) {
                    AbstractSubscriberConnection.logger.debug("Scheduling based on outstanding, now time: {}, next schedule time: {}", now, expirationInfo);
                    AbstractSubscriberConnection.this.setupNextAckDeadlineExtensionAlarm(expirationInfo);
                }
            } finally {
                AbstractSubscriberConnection.this.alarmsLock.unlock();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/pubsub/AbstractSubscriberConnection$AckHandler.class */
    public class AckHandler implements FutureCallback<Subscriber.MessageReceiver.AckReply> {
        private final String ackId;
        private final int outstandingBytes;
        private final AtomicBoolean acked = new AtomicBoolean(false);
        private final Instant receivedTime = Instant.now();

        AckHandler(String str, int i) {
            this.ackId = str;
            this.outstandingBytes = i;
        }

        public void onFailure(Throwable th) {
            AbstractSubscriberConnection.logger.warn("MessageReceiver failed to processes ack ID: " + this.ackId + ", the message will be nacked.", th);
            synchronized (AbstractSubscriberConnection.this.pendingNacks) {
                AbstractSubscriberConnection.this.pendingNacks.add(this.ackId);
            }
            AbstractSubscriberConnection.this.setupPendingAcksAlarm();
            AbstractSubscriberConnection.this.flowController.release(1, this.outstandingBytes);
            AbstractSubscriberConnection.this.messagesWaiter.incrementPendingMessages(-1);
        }

        public void onSuccess(Subscriber.MessageReceiver.AckReply ackReply) {
            this.acked.getAndSet(true);
            switch (AnonymousClass3.$SwitchMap$com$google$cloud$pubsub$Subscriber$MessageReceiver$AckReply[ackReply.ordinal()]) {
                case 1:
                    synchronized (AbstractSubscriberConnection.this.pendingAcks) {
                        AbstractSubscriberConnection.this.pendingAcks.add(this.ackId);
                    }
                    AbstractSubscriberConnection.this.setupPendingAcksAlarm();
                    AbstractSubscriberConnection.this.flowController.release(1, this.outstandingBytes);
                    AbstractSubscriberConnection.this.ackLatencyDistribution.record(Ints.saturatedCast((long) Math.ceil(new Duration(this.receivedTime, Instant.now()).getMillis() / 1000.0d)));
                    AbstractSubscriberConnection.this.messagesWaiter.incrementPendingMessages(-1);
                    return;
                case AbstractSubscriberConnection.INITIAL_ACK_DEADLINE_EXTENSION_SECONDS /* 2 */:
                    synchronized (AbstractSubscriberConnection.this.pendingNacks) {
                        AbstractSubscriberConnection.this.pendingNacks.add(this.ackId);
                    }
                    AbstractSubscriberConnection.this.setupPendingAcksAlarm();
                    AbstractSubscriberConnection.this.flowController.release(1, this.outstandingBytes);
                    AbstractSubscriberConnection.this.messagesWaiter.incrementPendingMessages(-1);
                    return;
                default:
                    throw new IllegalArgumentException(String.format("AckReply: %s not supported", ackReply));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/pubsub/AbstractSubscriberConnection$ExpirationInfo.class */
    public static class ExpirationInfo implements Comparable<ExpirationInfo> {
        Instant expiration;
        int nextExtensionSeconds;

        ExpirationInfo(Instant instant, int i) {
            this.expiration = instant;
            this.nextExtensionSeconds = i;
        }

        void extendExpiration() {
            this.expiration = Instant.now().plus(Duration.standardSeconds(this.nextExtensionSeconds));
            this.nextExtensionSeconds = AbstractSubscriberConnection.INITIAL_ACK_DEADLINE_EXTENSION_SECONDS * this.nextExtensionSeconds;
            if (this.nextExtensionSeconds > AbstractSubscriberConnection.MAX_ACK_DEADLINE_EXTENSION_SECS) {
                this.nextExtensionSeconds = AbstractSubscriberConnection.MAX_ACK_DEADLINE_EXTENSION_SECS;
            }
        }

        public int hashCode() {
            return this.expiration.hashCode();
        }

        public boolean equals(Object obj) {
            if (obj instanceof ExpirationInfo) {
                return this.expiration.equals(((ExpirationInfo) obj).expiration);
            }
            return false;
        }

        @Override // java.lang.Comparable
        public int compareTo(ExpirationInfo expirationInfo) {
            return this.expiration.compareTo(expirationInfo.expiration);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/pubsub/AbstractSubscriberConnection$PendingModifyAckDeadline.class */
    public static class PendingModifyAckDeadline {
        final List<String> ackIds;
        final int deadlineExtensionSeconds;

        PendingModifyAckDeadline(int i) {
            this.ackIds = new ArrayList();
            this.deadlineExtensionSeconds = i;
        }

        PendingModifyAckDeadline(String str, int i) {
            this(i);
            addAckId(str);
        }

        public void addAckId(String str) {
            this.ackIds.add(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractSubscriberConnection(String str, Subscriber.MessageReceiver messageReceiver, Duration duration, Distribution distribution, FlowController flowController, ScheduledExecutorService scheduledExecutorService) {
        this.executor = scheduledExecutorService;
        this.ackExpirationPadding = duration;
        this.receiver = messageReceiver;
        this.subscription = str;
        this.flowController = flowController;
        this.ackLatencyDistribution = distribution;
    }

    protected void doStart() {
        logger.debug("Starting subscriber.");
        initialize();
        notifyStarted();
    }

    abstract void initialize();

    /* JADX INFO: Access modifiers changed from: protected */
    public void doStop() {
        this.messagesWaiter.waitNoMessages();
        this.alarmsLock.lock();
        try {
            if (this.ackDeadlineExtensionAlarm != null) {
                this.ackDeadlineExtensionAlarm.cancel(true);
                this.ackDeadlineExtensionAlarm = null;
            }
            processOutstandingAckOperations();
            notifyStopped();
        } finally {
            this.alarmsLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isRetryable(Status status) {
        switch (AnonymousClass3.$SwitchMap$io$grpc$Status$Code[status.getCode().ordinal()]) {
            case 1:
            case INITIAL_ACK_DEADLINE_EXTENSION_SECONDS /* 2 */:
            case 3:
            case 4:
            case 5:
                return true;
            default:
                return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setMessageDeadlineSeconds(int i) {
        this.messageDeadlineSeconds = i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getMessageDeadlineSeconds() {
        return this.messageDeadlineSeconds;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processReceivedMessages(List<ReceivedMessage> list) {
        int size = list.size();
        if (size == 0) {
            return;
        }
        Instant now = Instant.now();
        int i = 0;
        ArrayList arrayList = new ArrayList(list.size());
        for (ReceivedMessage receivedMessage : list) {
            int serializedSize = receivedMessage.getMessage().getSerializedSize();
            i += serializedSize;
            arrayList.add(new AckHandler(receivedMessage.getAckId(), serializedSize));
        }
        ExpirationInfo expirationInfo = new ExpirationInfo(now.plus(this.messageDeadlineSeconds * 1000), INITIAL_ACK_DEADLINE_EXTENSION_SECONDS);
        synchronized (this.outstandingAckHandlers) {
            addOutstadingAckHandlers(expirationInfo, arrayList);
        }
        logger.debug("Received {} messages at {}", Integer.valueOf(list.size()), now);
        setupNextAckDeadlineExtensionAlarm(expirationInfo);
        this.messagesWaiter.incrementPendingMessages(list.size());
        Iterator<AckHandler> it = arrayList.iterator();
        Iterator<ReceivedMessage> it2 = list.iterator();
        while (it2.hasNext()) {
            final PubsubMessage message = it2.next().getMessage();
            final AckHandler next = it.next();
            this.executor.submit(new Runnable() { // from class: com.google.cloud.pubsub.AbstractSubscriberConnection.1
                @Override // java.lang.Runnable
                public void run() {
                    Futures.addCallback(AbstractSubscriberConnection.this.receiver.receiveMessage(message), next);
                }
            });
        }
        try {
            this.flowController.reserve(size, i);
        } catch (Publisher.CloudPubsubFlowControlException e) {
            throw new IllegalStateException("Flow control unexpected exception", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addOutstadingAckHandlers(ExpirationInfo expirationInfo, List<AckHandler> list) {
        if (this.outstandingAckHandlers.get(expirationInfo) == null) {
            this.outstandingAckHandlers.put(expirationInfo, new ArrayList(list.size()));
        }
        this.outstandingAckHandlers.get(expirationInfo).addAll(list);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setupPendingAcksAlarm() {
        this.alarmsLock.lock();
        try {
            if (this.pendingAcksAlarm == null) {
                this.pendingAcksAlarm = this.executor.schedule(new Runnable() { // from class: com.google.cloud.pubsub.AbstractSubscriberConnection.2
                    @Override // java.lang.Runnable
                    public void run() {
                        AbstractSubscriberConnection.this.alarmsLock.lock();
                        try {
                            AbstractSubscriberConnection.this.pendingAcksAlarm = null;
                            AbstractSubscriberConnection.this.processOutstandingAckOperations();
                        } finally {
                            AbstractSubscriberConnection.this.alarmsLock.unlock();
                        }
                    }
                }, PENDING_ACKS_SEND_DELAY.getMillis(), TimeUnit.MILLISECONDS);
            }
        } finally {
            this.alarmsLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setupNextAckDeadlineExtensionAlarm(ExpirationInfo expirationInfo) {
        Instant minus = expirationInfo.expiration.minus(this.ackExpirationPadding);
        this.alarmsLock.lock();
        try {
            if (this.nextAckDeadlineExtensionAlarmTime.isAfter(minus)) {
                logger.debug("Scheduling next alarm time: {}, last alarm set to time: {}", minus, this.nextAckDeadlineExtensionAlarmTime);
                if (this.ackDeadlineExtensionAlarm != null) {
                    logger.debug("Canceling previous alarm");
                    this.ackDeadlineExtensionAlarm.cancel(false);
                }
                this.nextAckDeadlineExtensionAlarmTime = minus;
                this.ackDeadlineExtensionAlarm = this.executor.schedule(new AckDeadlineAlarm(), this.nextAckDeadlineExtensionAlarmTime.getMillis() - Instant.now().getMillis(), TimeUnit.MILLISECONDS);
            }
        } finally {
            this.alarmsLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processOutstandingAckOperations() {
        processOutstandingAckOperations(new ArrayList());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processOutstandingAckOperations(List<PendingModifyAckDeadline> list) {
        ArrayList newArrayList = Lists.newArrayList(list);
        ArrayList arrayList = new ArrayList(this.pendingAcks.size());
        synchronized (this.pendingAcks) {
            if (!this.pendingAcks.isEmpty()) {
                try {
                    arrayList = new ArrayList(this.pendingAcks);
                    logger.debug("Sending {} acks", Integer.valueOf(arrayList.size()));
                    this.pendingAcks.clear();
                } catch (Throwable th) {
                    this.pendingAcks.clear();
                    throw th;
                }
            }
        }
        PendingModifyAckDeadline pendingModifyAckDeadline = new PendingModifyAckDeadline(0);
        synchronized (this.pendingNacks) {
            if (!this.pendingNacks.isEmpty()) {
                try {
                    Iterator<String> it = this.pendingNacks.iterator();
                    while (it.hasNext()) {
                        pendingModifyAckDeadline.addAckId(it.next());
                    }
                    logger.debug("Sending {} nacks", Integer.valueOf(this.pendingNacks.size()));
                    this.pendingNacks.clear();
                    newArrayList.add(pendingModifyAckDeadline);
                } catch (Throwable th2) {
                    this.pendingNacks.clear();
                    throw th2;
                }
            }
        }
        sendAckOperations(arrayList, newArrayList);
    }

    abstract void sendAckOperations(List<String> list, List<PendingModifyAckDeadline> list2);
}
