package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions;
import org.apache.beam.sdk.transforms.DoFn;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.class */
public class PubsubLiteSink extends DoFn<PubSubMessage, Void> {
    private final PublisherOptions options;

    @GuardedBy("this")
    private transient RunState runState;

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink$RunState.class */
    private static class RunState {
        private final Deque<ApiFuture<MessageMetadata>> futures = new ArrayDeque();
        private final Publisher<MessageMetadata> publisher;

        RunState(PublisherOptions publisherOptions) {
            this.publisher = PerServerPublisherCache.PUBLISHER_CACHE.get(publisherOptions, () -> {
                return new PublisherAssembler(publisherOptions).newPublisher();
            });
        }

        void publish(PubSubMessage pubSubMessage) {
            this.futures.add(this.publisher.publish(Message.fromProto(pubSubMessage)));
        }

        void waitForDone() throws Exception {
            ApiFutures.allAsList(this.futures).get(1L, TimeUnit.MINUTES);
        }
    }

    public PubsubLiteSink(PublisherOptions publisherOptions) {
        this.options = publisherOptions;
    }

    @DoFn.StartBundle
    public synchronized void startBundle() throws ApiException {
        this.runState = new RunState(this.options);
    }

    @DoFn.ProcessElement
    public synchronized void processElement(@DoFn.Element PubSubMessage pubSubMessage) throws CheckedApiException {
        this.runState.publish(pubSubMessage);
    }

    @DoFn.FinishBundle
    public synchronized void finishBundle() throws Exception {
        this.runState.waitForDone();
    }
}
