package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.PartitionReader;
import repackaged.com.google.common.annotations.VisibleForTesting;
import repackaged.com.google.common.base.Preconditions;
import repackaged.com.google.common.flogger.GoogleLogger;

/* loaded from: input_file:com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReader.class */
public class PslMicroBatchInputPartitionReader implements PartitionReader<InternalRow> {
    private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
    private static final Duration SUBSCRIBER_PULL_TIMEOUT = Duration.ofSeconds(10);
    private final SubscriptionPath subscriptionPath;
    private final SparkPartitionOffset endOffset;
    private final BlockingPullSubscriber subscriber;

    @Nullable
    private SequencedMessage currentMsg = null;
    private boolean batchFulfilled = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public PslMicroBatchInputPartitionReader(SubscriptionPath subscriptionPath, SparkPartitionOffset sparkPartitionOffset, BlockingPullSubscriber blockingPullSubscriber) {
        this.subscriptionPath = subscriptionPath;
        this.subscriber = blockingPullSubscriber;
        this.endOffset = sparkPartitionOffset;
    }

    public boolean next() {
        Optional<SequencedMessage> messageIfAvailable;
        if (this.batchFulfilled) {
            return false;
        }
        while (true) {
            try {
                this.subscriber.onData().get(SUBSCRIBER_PULL_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
                messageIfAvailable = this.subscriber.messageIfAvailable();
                break;
            } catch (TimeoutException e) {
                log.atWarning().log("Unable to get any messages in last %s. Partition: %d; Current message offset: %s; End message offset: %d.", SUBSCRIBER_PULL_TIMEOUT, Long.valueOf(this.endOffset.partition().value()), this.currentMsg == null ? "null" : Long.valueOf(this.currentMsg.offset().value()), Long.valueOf(this.endOffset.offset()));
            } catch (Throwable th) {
                throw new IllegalStateException("Failed to retrieve messages.", th);
            }
        }
        Preconditions.checkState(messageIfAvailable.isPresent());
        this.currentMsg = messageIfAvailable.get();
        if (this.currentMsg.offset().value() == this.endOffset.offset()) {
            this.batchFulfilled = true;
            return true;
        }
        if (this.currentMsg.offset().value() <= this.endOffset.offset()) {
            return true;
        }
        this.batchFulfilled = true;
        return false;
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public InternalRow m3137get() {
        Preconditions.checkState(this.currentMsg != null);
        return PslSparkUtils.toInternalRow(this.currentMsg, this.subscriptionPath, this.endOffset.partition());
    }

    public void close() {
        this.subscriber.close();
    }
}
