package org.apache.beam.sdk.io.gcp.pubsublite;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOrError;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.class */
public class PubsubLiteSink extends DoFn<PubSubMessage, Void> {
    private final PublisherOptions options;

    @GuardedBy("this")
    private transient PublisherOrError publisherOrError;

    @GuardedBy("this")
    private transient int outstanding;

    @GuardedBy("this")
    private transient Deque<CheckedApiException> errorsSinceLastFinish;
    private static final Executor executor = Executors.newCachedThreadPool();

    /* JADX INFO: Access modifiers changed from: package-private */
    public PubsubLiteSink(PublisherOptions publisherOptions) {
        this.options = publisherOptions;
    }

    @DoFn.Setup
    public void setup() throws ApiException {
        Publisher<PublishMetadata> newPublisher = this.options.usesCache() ? PerServerPublisherCache.PUBLISHER_CACHE.get(this.options) : Publishers.newPublisher(this.options);
        synchronized (this) {
            this.outstanding = 0;
            this.errorsSinceLastFinish = new ArrayDeque();
            this.publisherOrError = PublisherOrError.ofPublisher(newPublisher);
        }
        final Consumer consumer = th -> {
            synchronized (this) {
                this.publisherOrError = PublisherOrError.ofError(ExtractStatus.toCanonical(th));
            }
        };
        newPublisher.addListener(new ApiService.Listener() { // from class: org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteSink.1
            public void failed(ApiService.State state, Throwable th2) {
                consumer.accept(th2);
            }
        }, MoreExecutors.directExecutor());
        if (this.options.usesCache()) {
            return;
        }
        newPublisher.startAsync();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void decrementOutstanding() {
        this.outstanding--;
        notify();
    }

    @DoFn.ProcessElement
    public synchronized void processElement(@DoFn.Element PubSubMessage pubSubMessage) throws CheckedApiException {
        this.outstanding++;
        if (this.publisherOrError.getKind() == PublisherOrError.Kind.ERROR) {
            throw this.publisherOrError.error();
        }
        ApiFuture publish = this.publisherOrError.publisher().publish(Message.fromProto(pubSubMessage));
        final Consumer consumer = th -> {
            synchronized (this) {
                decrementOutstanding();
                this.errorsSinceLastFinish.push(ExtractStatus.toCanonical(th));
            }
        };
        ApiFutures.addCallback(publish, new ApiFutureCallback<PublishMetadata>() { // from class: org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteSink.2
            public void onSuccess(PublishMetadata publishMetadata) {
                PubsubLiteSink.this.decrementOutstanding();
            }

            public void onFailure(Throwable th2) {
                consumer.accept(th2);
            }
        }, executor);
    }

    @DoFn.FinishBundle
    public synchronized void finishBundle() throws CheckedApiException, InterruptedException {
        while (this.outstanding > 0) {
            wait();
        }
        if (this.errorsSinceLastFinish.isEmpty()) {
            if (this.publisherOrError.getKind() == PublisherOrError.Kind.ERROR) {
                throw this.publisherOrError.error();
            }
        } else {
            CheckedApiException pop = this.errorsSinceLastFinish.pop();
            while (!this.errorsSinceLastFinish.isEmpty()) {
                pop.addSuppressed(this.errorsSinceLastFinish.pop());
            }
            throw pop;
        }
    }
}
