package com.google.pubsub.kafka.source;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.Future;

/* loaded from: input_file:com/google/pubsub/kafka/source/AckBatchingSubscriber.class */
public class AckBatchingSubscriber implements CloudPubSubSubscriber {
    private final CloudPubSubSubscriber underlying;

    @GuardedBy("this")
    private final Deque<IdsAndFuture> toSend = new ArrayDeque();
    private final Future<?> alarm;

    /* loaded from: input_file:com/google/pubsub/kafka/source/AckBatchingSubscriber$AlarmFactory.class */
    interface AlarmFactory {
        Future<?> newAlarm(Runnable runnable);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/pubsub/kafka/source/AckBatchingSubscriber$IdsAndFuture.class */
    public static class IdsAndFuture {
        Collection<String> ids;
        SettableApiFuture<Empty> future;

        private IdsAndFuture() {
        }
    }

    public AckBatchingSubscriber(CloudPubSubSubscriber cloudPubSubSubscriber, AlarmFactory alarmFactory) {
        this.underlying = cloudPubSubSubscriber;
        this.alarm = alarmFactory.newAlarm(this::flush);
    }

    @Override // com.google.pubsub.kafka.source.CloudPubSubSubscriber
    public ApiFuture<List<ReceivedMessage>> pull() {
        return this.underlying.pull();
    }

    @Override // com.google.pubsub.kafka.source.CloudPubSubSubscriber
    public synchronized ApiFuture<Empty> ackMessages(Collection<String> collection) {
        IdsAndFuture idsAndFuture = new IdsAndFuture();
        idsAndFuture.ids = collection;
        idsAndFuture.future = SettableApiFuture.create();
        this.toSend.add(idsAndFuture);
        return idsAndFuture.future;
    }

    private void flush() {
        ArrayList arrayList = new ArrayList();
        final ArrayList arrayList2 = new ArrayList();
        synchronized (this) {
            if (this.toSend.isEmpty()) {
                return;
            }
            this.toSend.forEach(idsAndFuture -> {
                arrayList.addAll(idsAndFuture.ids);
                arrayList2.add(idsAndFuture.future);
            });
            this.toSend.clear();
            ApiFutures.addCallback(this.underlying.ackMessages(arrayList), new ApiFutureCallback<Empty>() { // from class: com.google.pubsub.kafka.source.AckBatchingSubscriber.1
                @Override // com.google.api.core.ApiFutureCallback
                public void onFailure(Throwable th) {
                    arrayList2.forEach(settableApiFuture -> {
                        settableApiFuture.setException(th);
                    });
                }

                @Override // com.google.api.core.ApiFutureCallback
                public void onSuccess(Empty empty) {
                    arrayList2.forEach(settableApiFuture -> {
                        settableApiFuture.set(empty);
                    });
                }
            }, MoreExecutors.directExecutor());
        }
    }

    @Override // com.google.pubsub.kafka.source.CloudPubSubSubscriber, java.lang.AutoCloseable
    public void close() {
        this.alarm.cancel(false);
        try {
            this.alarm.get();
        } catch (Throwable th) {
        }
        flush();
        this.underlying.close();
    }
}
