package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.util.Timestamps;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.class */
class SubscriptionPartitionProcessorImpl implements SubscriptionPartitionProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionPartitionProcessorImpl.class);
    private final SubscriptionPartition subscriptionPartition;
    private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;
    private final DoFn.OutputReceiver<SequencedMessage> receiver;
    private final MemoryBufferedSubscriber subscriber;
    private Optional<Offset> lastClaimedOffset = Optional.empty();

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubscriptionPartitionProcessorImpl(SubscriptionPartition subscriptionPartition, RestrictionTracker<OffsetByteRange, OffsetByteProgress> restrictionTracker, DoFn.OutputReceiver<SequencedMessage> outputReceiver, Supplier<MemoryBufferedSubscriber> supplier) {
        this.subscriptionPartition = subscriptionPartition;
        this.tracker = restrictionTracker;
        this.receiver = outputReceiver;
        this.subscriber = getReadySubscriber(supplier);
    }

    @Override // org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscriptionPartitionProcessor
    public DoFn.ProcessContinuation run() {
        Optional<SequencedMessage> peek = this.subscriber.peek();
        while (true) {
            Optional<SequencedMessage> optional = peek;
            if (!optional.isPresent()) {
                return DoFn.ProcessContinuation.resume();
            }
            SequencedMessage sequencedMessage = optional.get();
            Offset of = Offset.of(sequencedMessage.getCursor().getOffset());
            if (!this.tracker.tryClaim(OffsetByteProgress.of(of, sequencedMessage.getSizeBytes()))) {
                return DoFn.ProcessContinuation.stop();
            }
            this.subscriber.pop();
            this.lastClaimedOffset = Optional.of(of);
            this.receiver.outputWithTimestamp(sequencedMessage, new Instant(Timestamps.toMillis(sequencedMessage.getPublishTime())));
            peek = this.subscriber.peek();
        }
    }

    @Override // org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscriptionPartitionProcessor
    public Optional<Offset> lastClaimed() {
        return this.lastClaimedOffset;
    }

    private MemoryBufferedSubscriber getReadySubscriber(Supplier<MemoryBufferedSubscriber> supplier) {
        Offset of = Offset.of(((OffsetByteRange) this.tracker.currentRestriction()).getRange().getFrom());
        while (true) {
            MemoryBufferedSubscriber memoryBufferedSubscriber = supplier.get();
            Offset fetchOffset = memoryBufferedSubscriber.fetchOffset();
            if (of.equals(fetchOffset)) {
                memoryBufferedSubscriber.rebuffer();
                return memoryBufferedSubscriber;
            }
            LOG.info("Discarding subscriber due to mismatch, this should be rare. {}, start: {} fetch: {}", new Object[]{this.subscriptionPartition, of, fetchOffset});
            try {
                memoryBufferedSubscriber.stopAsync().awaitTerminated();
            } catch (Exception e) {
            }
        }
    }
}
