package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.api.client.util.Clock;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.BucketingFunction;
import org.apache.beam.sdk.util.MovingFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.class */
public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<PubsubMessage>> {
    private static final int DEAULT_ACK_TIMEOUT_SEC = 60;
    private static final int PULL_BATCH_SIZE = 1000;
    private static final int ACK_BATCH_SIZE = 2000;
    private static final int MAX_IN_FLIGHT = 20000;
    private static final int ACK_EXTENSION_PCT = 50;
    private static final int ACK_SAFETY_PCT = 20;
    private static final int MIN_WATERMARK_MESSAGES = 10;
    private static final int MIN_WATERMARK_SPREAD = 2;
    private static final int SCALE_OUT = 4;
    private Clock clock;
    private final PubsubClient.PubsubClientFactory pubsubFactory;
    private final ValueProvider<PubsubClient.ProjectPath> project;
    private final ValueProvider<PubsubClient.TopicPath> topic;
    private ValueProvider<PubsubClient.SubscriptionPath> subscription;
    private final String timestampAttribute;
    private final String idAttribute;
    private final boolean needsAttributes;
    private final boolean needsMessageId;
    private final boolean needsOrderingKey;
    private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSource.class);
    private static final PubsubCheckpointCoder<?> CHECKPOINT_CODER = PubsubCheckpointCoder.of();
    private static final Duration PROCESSING_TIMEOUT = Duration.standardMinutes(2);
    private static final Duration ACK_TOO_LATE = Duration.standardSeconds(2);
    private static final Duration SAMPLE_PERIOD = Duration.standardMinutes(1);
    private static final Duration SAMPLE_UPDATE = Duration.standardSeconds(5);
    private static final Duration LOG_PERIOD = Duration.standardSeconds(30);
    private static final Combine.BinaryCombineLongFn MIN = new Combine.BinaryCombineLongFn() { // from class: org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.1
        public long apply(long j, long j2) {
            return Math.min(j, j2);
        }

        public long identity() {
            return Long.MAX_VALUE;
        }
    };
    private static final Combine.BinaryCombineLongFn MAX = new Combine.BinaryCombineLongFn() { // from class: org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.2
        public long apply(long j, long j2) {
            return Math.max(j, j2);
        }

        public long identity() {
            return Long.MIN_VALUE;
        }
    };
    private static final Combine.BinaryCombineLongFn SUM = Sum.ofLongs();

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource$PubsubCheckpoint.class */
    public static class PubsubCheckpoint implements UnboundedSource.CheckpointMark {

        @VisibleForTesting
        String subscriptionPath;
        private PubsubReader reader;
        private List<String> safeToAckIds;

        @VisibleForTesting
        final List<String> notYetReadIds;

        public PubsubCheckpoint(String str, PubsubReader pubsubReader, List<String> list, List<String> list2) {
            this.subscriptionPath = str;
            this.reader = pubsubReader;
            this.safeToAckIds = list;
            this.notYetReadIds = list2;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public PubsubClient.SubscriptionPath getSubscription() {
            if (this.subscriptionPath == null) {
                return null;
            }
            return PubsubClient.subscriptionPathFromPath(this.subscriptionPath);
        }

        public void finalizeCheckpoint() throws IOException {
            Preconditions.checkState((this.reader == null || this.safeToAckIds == null) ? false : true, "Cannot finalize a restored checkpoint");
            try {
                int size = this.safeToAckIds.size();
                ArrayList arrayList = new ArrayList(Math.min(size, PubsubUnboundedSource.ACK_BATCH_SIZE));
                Iterator<String> it = this.safeToAckIds.iterator();
                while (it.hasNext()) {
                    arrayList.add(it.next());
                    if (arrayList.size() >= PubsubUnboundedSource.ACK_BATCH_SIZE) {
                        this.reader.ackBatch(arrayList);
                        size -= arrayList.size();
                        arrayList = new ArrayList(Math.min(size, PubsubUnboundedSource.ACK_BATCH_SIZE));
                    }
                }
                if (!arrayList.isEmpty()) {
                    this.reader.ackBatch(arrayList);
                }
            } finally {
                Preconditions.checkState(this.reader.numInFlightCheckpoints.decrementAndGet() >= 0, "Miscounted in-flight checkpoints");
                this.reader.maybeCloseClient();
                this.reader = null;
                this.safeToAckIds = null;
            }
        }

        private static long now(PubsubReader pubsubReader) {
            return pubsubReader.outer.outer.clock == null ? System.currentTimeMillis() : pubsubReader.outer.outer.clock.currentTimeMillis();
        }

        public void nackAll(PubsubReader pubsubReader) throws IOException {
            Preconditions.checkState(this.reader == null, "Cannot nackAll on persisting checkpoint");
            ArrayList arrayList = new ArrayList(Math.min(this.notYetReadIds.size(), PubsubUnboundedSource.ACK_BATCH_SIZE));
            Iterator<String> it = this.notYetReadIds.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next());
                if (arrayList.size() >= PubsubUnboundedSource.ACK_BATCH_SIZE) {
                    pubsubReader.nackBatch(now(pubsubReader), arrayList);
                    arrayList.clear();
                }
            }
            if (arrayList.isEmpty()) {
                return;
            }
            pubsubReader.nackBatch(now(pubsubReader), arrayList);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource$PubsubCheckpointCoder.class */
    private static class PubsubCheckpointCoder<T> extends AtomicCoder<PubsubCheckpoint> {
        private static final Coder<String> SUBSCRIPTION_PATH_CODER = NullableCoder.of(StringUtf8Coder.of());
        private static final Coder<List<String>> LIST_CODER = ListCoder.of(StringUtf8Coder.of());

        public static <T> PubsubCheckpointCoder<T> of() {
            return new PubsubCheckpointCoder<>();
        }

        private PubsubCheckpointCoder() {
        }

        public void encode(PubsubCheckpoint pubsubCheckpoint, OutputStream outputStream) throws IOException {
            SUBSCRIPTION_PATH_CODER.encode(pubsubCheckpoint.subscriptionPath, outputStream);
            LIST_CODER.encode(pubsubCheckpoint.notYetReadIds, outputStream);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public PubsubCheckpoint m251decode(InputStream inputStream) throws IOException {
            return new PubsubCheckpoint((String) SUBSCRIPTION_PATH_CODER.decode(inputStream), null, null, (List) LIST_CODER.decode(inputStream));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource$PubsubReader.class */
    public static class PubsubReader extends UnboundedSource.UnboundedReader<byte[]> {
        private final PubsubSource outer;

        @VisibleForTesting
        final PubsubClient.SubscriptionPath subscription;
        private AtomicReference<PubsubClient> pubsubClient;
        private AtomicBoolean active = new AtomicBoolean(true);
        private int ackTimeoutMs = -1;
        private Set<String> safeToAckIds = new HashSet();
        private final Queue<PubsubClient.IncomingMessage> notYetRead = new ArrayDeque();
        private final LinkedHashMap<String, InFlightState> inFlight = new LinkedHashMap<>();
        private final Queue<List<String>> ackedIds = new ConcurrentLinkedQueue();
        private long notYetReadBytes = 0;
        private BucketingFunction minUnreadTimestampMsSinceEpoch = new BucketingFunction(PubsubUnboundedSource.SAMPLE_UPDATE.getMillis(), PubsubUnboundedSource.MIN_WATERMARK_SPREAD, 10, PubsubUnboundedSource.MIN);
        private MovingFunction minReadTimestampMsSinceEpoch = newFun(PubsubUnboundedSource.MIN);
        private long lastReceivedMsSinceEpoch = -1;
        private long lastWatermarkMsSinceEpoch = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
        private PubsubClient.IncomingMessage current = null;
        private long lastLogTimestampMsSinceEpoch = -1;
        private long numReceived = 0;
        private MovingFunction numReceivedRecently = newFun(PubsubUnboundedSource.SUM);
        private MovingFunction numExtendedDeadlines = newFun(PubsubUnboundedSource.SUM);
        private MovingFunction numLateDeadlines = newFun(PubsubUnboundedSource.SUM);
        private MovingFunction numAcked = newFun(PubsubUnboundedSource.SUM);
        private MovingFunction numExpired = newFun(PubsubUnboundedSource.SUM);
        private MovingFunction numNacked = newFun(PubsubUnboundedSource.SUM);
        private MovingFunction numReadBytes = newFun(PubsubUnboundedSource.SUM);
        private MovingFunction minReceivedTimestampMsSinceEpoch = newFun(PubsubUnboundedSource.MIN);
        private MovingFunction maxReceivedTimestampMsSinceEpoch = newFun(PubsubUnboundedSource.MAX);
        private MovingFunction minWatermarkMsSinceEpoch = newFun(PubsubUnboundedSource.MIN);
        private MovingFunction maxWatermarkMsSinceEpoch = newFun(PubsubUnboundedSource.MAX);
        private MovingFunction numLateMessages = newFun(PubsubUnboundedSource.SUM);
        private AtomicInteger numInFlightCheckpoints = new AtomicInteger();
        private int maxInFlightCheckpoints = 0;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource$PubsubReader$InFlightState.class */
        public static class InFlightState {
            long requestTimeMsSinceEpoch;
            long ackDeadlineMsSinceEpoch;

            public InFlightState(long j, long j2) {
                this.requestTimeMsSinceEpoch = j;
                this.ackDeadlineMsSinceEpoch = j2;
            }
        }

        private static MovingFunction newFun(Combine.BinaryCombineLongFn binaryCombineLongFn) {
            return new MovingFunction(PubsubUnboundedSource.SAMPLE_PERIOD.getMillis(), PubsubUnboundedSource.SAMPLE_UPDATE.getMillis(), PubsubUnboundedSource.MIN_WATERMARK_SPREAD, 10, binaryCombineLongFn);
        }

        public PubsubReader(PubsubOptions pubsubOptions, PubsubSource pubsubSource, PubsubClient.SubscriptionPath subscriptionPath) throws IOException, GeneralSecurityException {
            this.outer = pubsubSource;
            this.subscription = subscriptionPath;
            this.pubsubClient = new AtomicReference<>(pubsubSource.outer.pubsubFactory.newClient(pubsubSource.outer.timestampAttribute, pubsubSource.outer.idAttribute, pubsubOptions));
        }

        @VisibleForTesting
        PubsubClient getPubsubClient() {
            return this.pubsubClient.get();
        }

        void ackBatch(List<String> list) throws IOException {
            this.pubsubClient.get().acknowledge(this.subscription, list);
            this.ackedIds.add(list);
        }

        public void nackBatch(long j, List<String> list) throws IOException {
            this.pubsubClient.get().modifyAckDeadline(this.subscription, list, 0);
            this.numNacked.add(j, list.size());
        }

        private void extendBatch(long j, List<String> list) throws IOException {
            this.pubsubClient.get().modifyAckDeadline(this.subscription, list, (this.ackTimeoutMs * PubsubUnboundedSource.ACK_EXTENSION_PCT) / 100000);
            this.numExtendedDeadlines.add(j, list.size());
        }

        private long now() {
            return this.outer.outer.clock == null ? System.currentTimeMillis() : this.outer.outer.clock.currentTimeMillis();
        }

        private void retire() throws IOException {
            long now = now();
            while (true) {
                List<String> poll = this.ackedIds.poll();
                if (poll == null) {
                    return;
                }
                this.numAcked.add(now, poll.size());
                for (String str : poll) {
                    this.inFlight.remove(str);
                    this.safeToAckIds.remove(str);
                }
            }
        }

        private void extend() throws IOException {
            while (true) {
                long now = now();
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                ArrayList arrayList3 = new ArrayList();
                for (Map.Entry<String, InFlightState> entry : this.inFlight.entrySet()) {
                    if (entry.getValue().ackDeadlineMsSinceEpoch - ((this.ackTimeoutMs * PubsubUnboundedSource.ACK_SAFETY_PCT) / 100) > now) {
                        break;
                    }
                    if (entry.getValue().ackDeadlineMsSinceEpoch - PubsubUnboundedSource.ACK_TOO_LATE.getMillis() >= now) {
                        if (entry.getValue().requestTimeMsSinceEpoch + PubsubUnboundedSource.PROCESSING_TIMEOUT.getMillis() >= now) {
                            arrayList2.add(entry.getKey());
                            if (arrayList2.size() >= PubsubUnboundedSource.ACK_BATCH_SIZE) {
                                break;
                            }
                        } else {
                            arrayList3.add(entry.getKey());
                        }
                    } else {
                        arrayList.add(entry.getKey());
                    }
                }
                if (arrayList.isEmpty() && arrayList2.isEmpty() && arrayList3.isEmpty()) {
                    return;
                }
                if (!arrayList.isEmpty()) {
                    this.numLateDeadlines.add(now, arrayList.size());
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        this.inFlight.remove((String) it.next());
                    }
                }
                if (!arrayList3.isEmpty()) {
                    this.numExpired.add(now, arrayList3.size());
                    Iterator it2 = arrayList3.iterator();
                    while (it2.hasNext()) {
                        this.inFlight.remove((String) it2.next());
                    }
                }
                if (!arrayList2.isEmpty()) {
                    long j = now + ((this.ackTimeoutMs * PubsubUnboundedSource.ACK_EXTENSION_PCT) / 100);
                    for (String str : arrayList2) {
                        this.inFlight.put(str, new InFlightState(this.inFlight.remove(str).requestTimeMsSinceEpoch, j));
                    }
                    extendBatch(now, arrayList2);
                }
            }
        }

        private void pull() throws IOException {
            if (this.inFlight.size() >= PubsubUnboundedSource.MAX_IN_FLIGHT) {
                return;
            }
            long now = now();
            long j = now + this.ackTimeoutMs;
            List<PubsubClient.IncomingMessage> pull = this.pubsubClient.get().pull(now, this.subscription, PubsubUnboundedSource.PULL_BATCH_SIZE, true);
            if (pull.isEmpty()) {
                return;
            }
            this.lastReceivedMsSinceEpoch = now;
            for (PubsubClient.IncomingMessage incomingMessage : pull) {
                this.notYetRead.add(incomingMessage);
                this.notYetReadBytes += incomingMessage.message().getData().size();
                this.inFlight.put(incomingMessage.ackId(), new InFlightState(now, j));
                this.numReceived++;
                this.numReceivedRecently.add(now, 1L);
                this.minReceivedTimestampMsSinceEpoch.add(now, incomingMessage.timestampMsSinceEpoch());
                this.maxReceivedTimestampMsSinceEpoch.add(now, incomingMessage.timestampMsSinceEpoch());
                this.minUnreadTimestampMsSinceEpoch.add(now, incomingMessage.timestampMsSinceEpoch());
            }
        }

        private void stats() {
            long now = now();
            if (this.lastLogTimestampMsSinceEpoch < 0) {
                this.lastLogTimestampMsSinceEpoch = now;
                return;
            }
            if (now - this.lastLogTimestampMsSinceEpoch < PubsubUnboundedSource.LOG_PERIOD.getMillis()) {
                return;
            }
            String str = "unknown";
            long j = this.minReceivedTimestampMsSinceEpoch.get(now);
            long j2 = this.maxReceivedTimestampMsSinceEpoch.get(now);
            if (j < Long.MAX_VALUE && j2 > Long.MIN_VALUE) {
                str = (j2 - j) + "ms";
            }
            String str2 = "unknown";
            long j3 = this.minWatermarkMsSinceEpoch.get(now);
            long j4 = this.maxWatermarkMsSinceEpoch.get(now);
            if (j3 < Long.MAX_VALUE && j4 > Long.MIN_VALUE) {
                str2 = (j4 - j3) + "ms";
            }
            String str3 = (String) Iterables.getFirst(this.inFlight.keySet(), (Object) null);
            PubsubUnboundedSource.LOG.debug("Pubsub {} has {} received messages, {} current unread messages, {} current unread bytes, {} current in-flight msgs, {} oldest in-flight, {} current in-flight checkpoints, {} max in-flight checkpoints, {}B/s recent read, {} recent received, {} recent extended, {} recent late extended, {} recent ACKed, {} recent NACKed, {} recent expired, {} recent message timestamp skew, {} recent watermark skew, {} recent late messages, {} last reported watermark", new Object[]{this.subscription, Long.valueOf(this.numReceived), Integer.valueOf(this.notYetRead.size()), Long.valueOf(this.notYetReadBytes), Integer.valueOf(this.inFlight.size()), str3 != null ? (now - this.inFlight.get(str3).requestTimeMsSinceEpoch) + "ms" : "no", Integer.valueOf(this.numInFlightCheckpoints.get()), Integer.valueOf(this.maxInFlightCheckpoints), Long.valueOf(this.numReadBytes.get(now) / (PubsubUnboundedSource.SAMPLE_PERIOD.getMillis() / 1000)), Long.valueOf(this.numReceivedRecently.get(now)), Long.valueOf(this.numExtendedDeadlines.get(now)), Long.valueOf(this.numLateDeadlines.get(now)), Long.valueOf(this.numAcked.get(now)), Long.valueOf(this.numNacked.get(now)), Long.valueOf(this.numExpired.get(now)), str, str2, Long.valueOf(this.numLateMessages.get(now)), new Instant(this.lastWatermarkMsSinceEpoch)});
            this.lastLogTimestampMsSinceEpoch = now;
        }

        public boolean start() throws IOException {
            this.ackTimeoutMs = this.pubsubClient.get().ackDeadlineSeconds(this.subscription) * PubsubUnboundedSource.PULL_BATCH_SIZE;
            return advance();
        }

        public boolean advance() throws IOException {
            stats();
            if (this.current != null) {
                this.minUnreadTimestampMsSinceEpoch.remove(this.current.requestTimeMsSinceEpoch());
                this.current = null;
            }
            retire();
            extend();
            if (this.notYetRead.isEmpty()) {
                pull();
            }
            this.current = this.notYetRead.poll();
            if (this.current == null) {
                return false;
            }
            this.notYetReadBytes -= this.current.message().getData().size();
            Preconditions.checkState(this.notYetReadBytes >= 0);
            long now = now();
            this.numReadBytes.add(now, this.current.message().getData().size());
            this.minReadTimestampMsSinceEpoch.add(now, this.current.timestampMsSinceEpoch());
            if (this.current.timestampMsSinceEpoch() < this.lastWatermarkMsSinceEpoch) {
                this.numLateMessages.add(now, 1L);
            }
            this.safeToAckIds.add(this.current.ackId());
            return true;
        }

        /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
        public byte[] m255getCurrent() throws NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return (this.outer.outer.getNeedsMessageId() || this.outer.outer.getNeedsAttributes()) ? this.current.message().toBuilder().setMessageId(this.current.recordId()).build().toByteArray() : this.current.message().getData().toByteArray();
        }

        public Instant getCurrentTimestamp() throws NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return new Instant(this.current.timestampMsSinceEpoch());
        }

        public byte[] getCurrentRecordId() throws NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.current.recordId().getBytes(StandardCharsets.UTF_8);
        }

        public void close() throws IOException {
            this.active.set(false);
            maybeCloseClient();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void maybeCloseClient() throws IOException {
            PubsubClient andSet;
            if (this.active.get() || this.numInFlightCheckpoints.get() != 0 || (andSet = this.pubsubClient.getAndSet(null)) == null) {
                return;
            }
            andSet.close();
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public PubsubSource m254getCurrentSource() {
            return this.outer;
        }

        public Instant getWatermark() {
            if (this.pubsubClient.get().isEOF() && this.notYetRead.isEmpty()) {
                return BoundedWindow.TIMESTAMP_MAX_VALUE;
            }
            long now = now();
            long j = this.minReadTimestampMsSinceEpoch.get(now);
            long j2 = this.minUnreadTimestampMsSinceEpoch.get();
            if (j == Long.MAX_VALUE && j2 == Long.MAX_VALUE && this.lastReceivedMsSinceEpoch >= 0 && now > this.lastReceivedMsSinceEpoch + PubsubUnboundedSource.SAMPLE_PERIOD.getMillis()) {
                this.lastWatermarkMsSinceEpoch = now;
            } else if (this.minReadTimestampMsSinceEpoch.isSignificant() || this.minUnreadTimestampMsSinceEpoch.isSignificant()) {
                this.lastWatermarkMsSinceEpoch = Math.min(j, j2);
            }
            this.minWatermarkMsSinceEpoch.add(now, this.lastWatermarkMsSinceEpoch);
            this.maxWatermarkMsSinceEpoch.add(now, this.lastWatermarkMsSinceEpoch);
            return new Instant(this.lastWatermarkMsSinceEpoch);
        }

        /* renamed from: getCheckpointMark, reason: merged with bridge method [inline-methods] */
        public PubsubCheckpoint m253getCheckpointMark() {
            this.maxInFlightCheckpoints = Math.max(this.maxInFlightCheckpoints, this.numInFlightCheckpoints.incrementAndGet());
            ArrayList newArrayList = Lists.newArrayList(this.safeToAckIds);
            ArrayList arrayList = new ArrayList(this.notYetRead.size());
            Iterator<PubsubClient.IncomingMessage> it = this.notYetRead.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().ackId());
            }
            return this.outer.subscriptionPath == null ? new PubsubCheckpoint(this.subscription.getPath(), this, newArrayList, arrayList) : new PubsubCheckpoint(null, this, newArrayList, arrayList);
        }

        public long getSplitBacklogBytes() {
            return this.notYetReadBytes;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource$PubsubSource.class */
    public static class PubsubSource extends UnboundedSource<byte[], PubsubCheckpoint> {
        public final PubsubUnboundedSource outer;

        @VisibleForTesting
        final ValueProvider<PubsubClient.SubscriptionPath> subscriptionPath;

        public PubsubSource(PubsubUnboundedSource pubsubUnboundedSource) {
            this(pubsubUnboundedSource, pubsubUnboundedSource.getSubscriptionProvider());
        }

        private PubsubSource(PubsubUnboundedSource pubsubUnboundedSource, ValueProvider<PubsubClient.SubscriptionPath> valueProvider) {
            this.outer = pubsubUnboundedSource;
            this.subscriptionPath = valueProvider;
        }

        public List<PubsubSource> split(int i, PipelineOptions pipelineOptions) throws Exception {
            PubsubClient.SubscriptionPath subscriptionPath;
            ArrayList arrayList = new ArrayList(i);
            PubsubSource pubsubSource = this;
            if (this.subscriptionPath == null) {
                pubsubSource = new PubsubSource(this.outer, ValueProvider.StaticValueProvider.of(this.outer.createRandomSubscription(pipelineOptions)));
                PubsubClient.TopicPath topic = this.outer.getTopic();
                if (topic != null) {
                    Lineage.getSources().add("pubsub", "source", topic.getDataCatalogSegments());
                }
            } else if (this.subscriptionPath.equals(this.outer.getSubscriptionProvider()) && (subscriptionPath = (PubsubClient.SubscriptionPath) this.subscriptionPath.get()) != null) {
                Lineage.getSources().add("pubsub", "subscription", subscriptionPath.getDataCatalogSegments());
            }
            for (int i2 = 0; i2 < i * PubsubUnboundedSource.SCALE_OUT; i2++) {
                arrayList.add(pubsubSource);
            }
            return arrayList;
        }

        public PubsubReader createReader(PipelineOptions pipelineOptions, PubsubCheckpoint pubsubCheckpoint) {
            try {
                PubsubReader pubsubReader = new PubsubReader((PubsubOptions) pipelineOptions.as(PubsubOptions.class), this, (this.subscriptionPath == null || this.subscriptionPath.get() == null) ? pubsubCheckpoint == null ? this.outer.createRandomSubscription(pipelineOptions) : pubsubCheckpoint.getSubscription() : (PubsubClient.SubscriptionPath) this.subscriptionPath.get());
                if (pubsubCheckpoint != null) {
                    try {
                        pubsubCheckpoint.nackAll(pubsubReader);
                    } catch (IOException e) {
                        PubsubUnboundedSource.LOG.error("Pubsub {} cannot have {} lost messages NACKed, ignoring exception.", new Object[]{this.subscriptionPath, Integer.valueOf(pubsubCheckpoint.notYetReadIds.size()), e});
                    }
                }
                return pubsubReader;
            } catch (IOException | GeneralSecurityException e2) {
                throw new RuntimeException("Unable to subscribe to " + this.subscriptionPath + ": ", e2);
            }
        }

        public Coder<PubsubCheckpoint> getCheckpointMarkCoder() {
            return PubsubUnboundedSource.CHECKPOINT_CODER;
        }

        public Coder<byte[]> getOutputCoder() {
            return ByteArrayCoder.of();
        }

        public void validate() {
        }

        public boolean requiresDeduping() {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource$StatsFn.class */
    public static class StatsFn extends DoFn<PubsubMessage, PubsubMessage> {
        private final Counter elementCounter = SourceMetrics.elementsRead();
        private final PubsubClient.PubsubClientFactory pubsubFactory;
        private final ValueProvider<PubsubClient.SubscriptionPath> subscription;
        private final ValueProvider<PubsubClient.TopicPath> topic;
        private final String timestampAttribute;
        private final String idAttribute;

        public StatsFn(PubsubClient.PubsubClientFactory pubsubClientFactory, ValueProvider<PubsubClient.SubscriptionPath> valueProvider, ValueProvider<PubsubClient.TopicPath> valueProvider2, String str, String str2) {
            Preconditions.checkArgument(pubsubClientFactory != null, "pubsubFactory should not be null");
            this.pubsubFactory = pubsubClientFactory;
            this.subscription = valueProvider;
            this.topic = valueProvider2;
            this.timestampAttribute = str;
            this.idAttribute = str2;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<PubsubMessage, PubsubMessage>.ProcessContext processContext) throws Exception {
            this.elementCounter.inc();
            processContext.output((PubsubMessage) processContext.element());
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item("subscription", this.subscription)).addIfNotNull(DisplayData.item("topic", this.topic)).add(DisplayData.item("transport", this.pubsubFactory.getKind())).addIfNotNull(DisplayData.item("timestampAttribute", this.timestampAttribute)).addIfNotNull(DisplayData.item("idAttribute", this.idAttribute));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public PubsubUnboundedSource(Clock clock, PubsubClient.PubsubClientFactory pubsubClientFactory, ValueProvider<PubsubClient.ProjectPath> valueProvider, ValueProvider<PubsubClient.TopicPath> valueProvider2, ValueProvider<PubsubClient.SubscriptionPath> valueProvider3, String str, String str2, boolean z, boolean z2, boolean z3) {
        Preconditions.checkArgument((valueProvider2 == null) != (valueProvider3 == null), "Exactly one of topic and subscription must be given");
        this.clock = clock;
        this.pubsubFactory = (PubsubClient.PubsubClientFactory) Preconditions.checkNotNull(pubsubClientFactory);
        this.project = valueProvider;
        this.topic = valueProvider2;
        this.subscription = valueProvider3;
        this.timestampAttribute = str;
        this.idAttribute = str2;
        this.needsAttributes = z;
        this.needsMessageId = z2;
        this.needsOrderingKey = z3;
    }

    public PubsubUnboundedSource(PubsubClient.PubsubClientFactory pubsubClientFactory, ValueProvider<PubsubClient.ProjectPath> valueProvider, ValueProvider<PubsubClient.TopicPath> valueProvider2, ValueProvider<PubsubClient.SubscriptionPath> valueProvider3, String str, String str2, boolean z) {
        this(null, pubsubClientFactory, valueProvider, valueProvider2, valueProvider3, str, str2, z, false, false);
    }

    public PubsubUnboundedSource(Clock clock, PubsubClient.PubsubClientFactory pubsubClientFactory, ValueProvider<PubsubClient.ProjectPath> valueProvider, ValueProvider<PubsubClient.TopicPath> valueProvider2, ValueProvider<PubsubClient.SubscriptionPath> valueProvider3, String str, String str2, boolean z) {
        this(clock, pubsubClientFactory, valueProvider, valueProvider2, valueProvider3, str, str2, z, false, false);
    }

    public PubsubUnboundedSource(PubsubClient.PubsubClientFactory pubsubClientFactory, ValueProvider<PubsubClient.ProjectPath> valueProvider, ValueProvider<PubsubClient.TopicPath> valueProvider2, ValueProvider<PubsubClient.SubscriptionPath> valueProvider3, String str, String str2, boolean z, boolean z2) {
        this(null, pubsubClientFactory, valueProvider, valueProvider2, valueProvider3, str, str2, z, z2, false);
    }

    public PubsubClient.ProjectPath getProject() {
        if (this.project == null) {
            return null;
        }
        return (PubsubClient.ProjectPath) this.project.get();
    }

    public PubsubClient.TopicPath getTopic() {
        if (this.topic == null) {
            return null;
        }
        return (PubsubClient.TopicPath) this.topic.get();
    }

    public ValueProvider<PubsubClient.TopicPath> getTopicProvider() {
        return this.topic;
    }

    public PubsubClient.SubscriptionPath getSubscription() {
        if (this.subscription == null) {
            return null;
        }
        return (PubsubClient.SubscriptionPath) this.subscription.get();
    }

    public ValueProvider<PubsubClient.SubscriptionPath> getSubscriptionProvider() {
        return this.subscription;
    }

    public String getTimestampAttribute() {
        return this.timestampAttribute;
    }

    public String getIdAttribute() {
        return this.idAttribute;
    }

    public boolean getNeedsAttributes() {
        return this.needsAttributes;
    }

    public boolean getNeedsMessageId() {
        return this.needsMessageId;
    }

    public boolean getNeedsOrderingKey() {
        return this.needsOrderingKey;
    }

    public PCollection<PubsubMessage> expand(PBegin pBegin) {
        Coder of;
        SerializableFunction parsePubsubMessageProtoAsPayload = (getNeedsAttributes() || getNeedsMessageId()) ? new PubsubMessages.ParsePubsubMessageProtoAsPayload() : new PubsubMessages.DeserializeBytesIntoPubsubMessagePayloadOnly();
        if (getNeedsOrderingKey()) {
            of = PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.of();
        } else if (getNeedsMessageId()) {
            of = getNeedsAttributes() ? PubsubMessageWithAttributesAndMessageIdCoder.of() : PubsubMessageWithMessageIdCoder.of();
        } else {
            of = getNeedsAttributes() ? PubsubMessageWithAttributesCoder.of() : PubsubMessagePayloadOnlyCoder.of();
        }
        PCollection<PubsubMessage> coder = pBegin.getPipeline().begin().apply(Read.from(new PubsubSource(this))).apply("MapBytesToPubsubMessages", MapElements.into(TypeDescriptor.of(PubsubMessage.class)).via(parsePubsubMessageProtoAsPayload)).setCoder(of);
        if (usesStatsFn(pBegin.getPipeline().getOptions())) {
            coder = (PCollection) coder.apply("PubsubUnboundedSource.Stats", ParDo.of(new StatsFn(this.pubsubFactory, this.subscription, this.topic, this.timestampAttribute, this.idAttribute)));
        }
        return coder;
    }

    private boolean usesStatsFn(PipelineOptions pipelineOptions) {
        return ExperimentalOptions.hasExperiment(pipelineOptions, PubsubIO.ENABLE_CUSTOM_PUBSUB_SOURCE) || !pipelineOptions.getRunner().getName().startsWith("org.apache.beam.runners.dataflow.");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PubsubClient.SubscriptionPath createRandomSubscription(PipelineOptions pipelineOptions) {
        PubsubClient.ProjectPath projectPathFromId;
        PubsubClient.TopicPath topicPath = (PubsubClient.TopicPath) this.topic.get();
        if (this.project != null) {
            projectPathFromId = (PubsubClient.ProjectPath) this.project.get();
        } else {
            Preconditions.checkState(pipelineOptions.as(GcpOptions.class).getProject() != null, "Cannot create subscription to topic %s because pipeline option 'project' not specified", topicPath);
            projectPathFromId = PubsubClient.projectPathFromId(pipelineOptions.as(GcpOptions.class).getProject());
        }
        try {
            PubsubClient newClient = this.pubsubFactory.newClient(this.timestampAttribute, this.idAttribute, (PubsubOptions) pipelineOptions.as(PubsubOptions.class));
            Throwable th = null;
            try {
                try {
                    PubsubClient.SubscriptionPath createRandomSubscription = newClient.createRandomSubscription(projectPathFromId, topicPath, DEAULT_ACK_TIMEOUT_SEC);
                    LOG.warn("Created subscription {} to topic {}. Note this subscription WILL NOT be deleted when the pipeline terminates", createRandomSubscription, this.topic);
                    if (newClient != null) {
                        if (0 != 0) {
                            try {
                                newClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            newClient.close();
                        }
                    }
                    return createRandomSubscription;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(String.format("Failed to create subscription to topic %s on project %s: %s", topicPath, projectPathFromId, e.getMessage()), e);
        }
    }
}
