package com.google.cloud.pubsublite.spark;

import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
import com.google.common.truth.Truth;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import java.util.Optional;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.PartitionReader;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReaderTest.class */
public class PslMicroBatchInputPartitionReaderTest {
    private final BlockingPullSubscriberImpl subscriber = (BlockingPullSubscriberImpl) Mockito.mock(BlockingPullSubscriberImpl.class);
    private PartitionReader<InternalRow> reader;

    private static SequencedMessage newMessage(long j) {
        return SequencedMessage.of(Message.builder().setData(ByteString.copyFromUtf8("text")).build(), Timestamps.EPOCH, Offset.of(j), 10000L);
    }

    private static void verifyInternalRow(InternalRow internalRow, long j) {
        Truth.assertThat(internalRow.getString(0)).isEqualTo(UnitTestExamples.exampleSubscriptionPath().toString());
        Truth.assertThat(Long.valueOf(internalRow.getLong(1))).isEqualTo(Long.valueOf(UnitTestExamples.examplePartition().value()));
        Truth.assertThat(Long.valueOf(internalRow.getLong(2))).isEqualTo(Long.valueOf(j));
    }

    private void createReader(long j) {
        this.reader = new PslMicroBatchInputPartitionReader(UnitTestExamples.exampleSubscriptionPath(), SparkPartitionOffset.builder().partition(UnitTestExamples.examplePartition()).offset(j).build(), this.subscriber);
    }

    @Test
    public void testPartitionReader() throws Exception {
        createReader(14L);
        SequencedMessage newMessage = newMessage(10L);
        SequencedMessage newMessage2 = newMessage(14L);
        Mockito.when(this.subscriber.onData()).thenReturn(ApiFutures.immediateFuture((Object) null));
        Mockito.when(this.subscriber.messageIfAvailable()).thenReturn(Optional.of(newMessage));
        Truth.assertThat(Boolean.valueOf(this.reader.next())).isTrue();
        verifyInternalRow((InternalRow) this.reader.get(), 10L);
        verifyInternalRow((InternalRow) this.reader.get(), 10L);
        Mockito.when(this.subscriber.onData()).thenReturn(ApiFutures.immediateFuture((Object) null));
        Mockito.when(this.subscriber.messageIfAvailable()).thenReturn(Optional.of(newMessage2));
        Truth.assertThat(Boolean.valueOf(this.reader.next())).isTrue();
        verifyInternalRow((InternalRow) this.reader.get(), 14L);
        Truth.assertThat(Boolean.valueOf(this.reader.next())).isFalse();
    }

    @Test
    public void testPartitionReaderNewMessageExceedsRange() throws Exception {
        createReader(14L);
        SequencedMessage newMessage = newMessage(10L);
        SequencedMessage newMessage2 = newMessage(14 + 1);
        Mockito.when(this.subscriber.onData()).thenReturn(ApiFutures.immediateFuture((Object) null));
        Mockito.when(this.subscriber.messageIfAvailable()).thenReturn(Optional.of(newMessage));
        Truth.assertThat(Boolean.valueOf(this.reader.next())).isTrue();
        verifyInternalRow((InternalRow) this.reader.get(), 10L);
        verifyInternalRow((InternalRow) this.reader.get(), 10L);
        Mockito.when(this.subscriber.onData()).thenReturn(ApiFutures.immediateFuture((Object) null));
        Mockito.when(this.subscriber.messageIfAvailable()).thenReturn(Optional.of(newMessage2));
        Truth.assertThat(Boolean.valueOf(this.reader.next())).isFalse();
    }
}
