package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.util.Timestamps;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImpl.class */
public class UnboundedReaderImpl extends UnboundedSource.UnboundedReader<SequencedMessage> {
    private final UnboundedSource<SequencedMessage, CheckpointMarkImpl> source;
    private final MemoryBufferedSubscriber subscriber;
    private final TopicBacklogReader backlogReader;
    private final Supplier<BlockingCommitter> committer;
    private Offset fetchOffset;
    private Optional<Instant> lastMessageTimestamp = Optional.empty();
    private boolean advanced = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public UnboundedReaderImpl(UnboundedSource<SequencedMessage, CheckpointMarkImpl> unboundedSource, MemoryBufferedSubscriber memoryBufferedSubscriber, TopicBacklogReader topicBacklogReader, Supplier<BlockingCommitter> supplier, Offset offset) {
        Preconditions.checkArgument(offset.equals(memoryBufferedSubscriber.fetchOffset()));
        this.source = unboundedSource;
        this.subscriber = memoryBufferedSubscriber;
        this.backlogReader = topicBacklogReader;
        this.committer = supplier;
        this.fetchOffset = offset;
    }

    /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
    public SequencedMessage m281getCurrent() throws NoSuchElementException {
        if (this.advanced) {
            return this.subscriber.peek().get();
        }
        throw new NoSuchElementException();
    }

    public Instant getCurrentTimestamp() throws NoSuchElementException {
        return getTimestamp(m281getCurrent());
    }

    private static Instant getTimestamp(SequencedMessage sequencedMessage) {
        return Instant.ofEpochMilli(Timestamps.toMillis(sequencedMessage.getPublishTime()));
    }

    public void close() throws IOException {
        try {
            TopicBacklogReader topicBacklogReader = this.backlogReader;
            Throwable th = null;
            try {
                AutoCloseable asCloseable = ApiServices.asCloseable(this.subscriber);
                Throwable th2 = null;
                if (asCloseable != null) {
                    if (0 != 0) {
                        try {
                            asCloseable.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        asCloseable.close();
                    }
                }
                if (topicBacklogReader != null) {
                    if (0 != 0) {
                        try {
                            topicBacklogReader.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        topicBacklogReader.close();
                    }
                }
            } finally {
            }
        } catch (Exception e) {
            throw new IOException("Failed when closing reader.", e);
        }
    }

    public boolean start() throws IOException {
        try {
            this.subscriber.startAsync().awaitRunning(1L, TimeUnit.MINUTES);
            return advance();
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    public boolean advance() throws IOException {
        if (!this.subscriber.state().equals(ApiService.State.RUNNING)) {
            Throwable failureCause = this.subscriber.failureCause();
            if ("DUPLICATE_SUBSCRIBER_CONNECTIONS".equals(ExtractStatus.getErrorInfoReason(ExtractStatus.toCanonical(failureCause)))) {
                throw new IOException("Partition reassigned to a different worker- this is expected and can be ignored.", failureCause);
            }
            throw new IOException("Subscriber failed when trying to advance.", failureCause);
        }
        if (this.advanced) {
            this.subscriber.pop();
        }
        Optional<SequencedMessage> peek = this.subscriber.peek();
        this.advanced = peek.isPresent();
        if (!this.advanced) {
            return false;
        }
        Offset of = Offset.of(peek.get().getCursor().getOffset() + 1);
        Preconditions.checkState(of.value() > this.fetchOffset.value());
        this.fetchOffset = of;
        this.lastMessageTimestamp = Optional.of(getTimestamp(peek.get()));
        return true;
    }

    public Instant getWatermark() {
        return this.lastMessageTimestamp.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
    }

    /* renamed from: getCheckpointMark, reason: merged with bridge method [inline-methods] */
    public CheckpointMarkImpl m279getCheckpointMark() {
        this.subscriber.rebuffer();
        return new CheckpointMarkImpl(this.fetchOffset, this.committer);
    }

    /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
    public UnboundedSource<SequencedMessage, CheckpointMarkImpl> m280getCurrentSource() {
        return this.source;
    }

    public long getSplitBacklogBytes() {
        return this.backlogReader.computeMessageStats(this.fetchOffset).getMessageBytes();
    }
}
