package com.google.cloud.pubsublite.spark;

import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
import com.google.common.collect.ImmutableMap;
import com.google.common.truth.Truth;
import java.util.Optional;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:com/google/cloud/pubsublite/spark/PslMicroBatchReaderTest.class */
public class PslMicroBatchReaderTest {
    private static final PslDataSourceOptions OPTIONS = PslDataSourceOptions.builder().setSubscriptionPath(UnitTestExamples.exampleSubscriptionPath()).build();
    private static final long MAX_MESSAGES_PER_BATCH = 20000;
    private final CursorClient cursorClient = (CursorClient) Mockito.mock(CursorClient.class);
    private final MultiPartitionCommitter committer = (MultiPartitionCommitter) Mockito.mock(MultiPartitionCommitter.class);
    private final PartitionSubscriberFactory partitionSubscriberFactory = (PartitionSubscriberFactory) Mockito.mock(PartitionSubscriberFactory.class);
    private final PerTopicHeadOffsetReader headOffsetReader = (PerTopicHeadOffsetReader) Mockito.mock(PerTopicHeadOffsetReader.class);
    private final PslMicroBatchReader reader = new PslMicroBatchReader(this.cursorClient, this.committer, this.partitionSubscriberFactory, this.headOffsetReader, UnitTestExamples.exampleSubscriptionPath(), OPTIONS.flowControlSettings(), MAX_MESSAGES_PER_BATCH, 2);

    private PslSourceOffset createPslSourceOffsetTwoPartition(long j, long j2) {
        return PslSourceOffset.builder().partitionOffsetMap(ImmutableMap.of(Partition.of(0L), Offset.of(j), Partition.of(1L), Offset.of(j2))).build();
    }

    private SparkSourceOffset createSparkSourceOffsetTwoPartition(long j, long j2) {
        return new SparkSourceOffset(ImmutableMap.of(Partition.of(0L), SparkPartitionOffset.create(Partition.of(0L), j), Partition.of(1L), SparkPartitionOffset.create(Partition.of(1L), j2)));
    }

    @Test
    public void testEmptyOffsets() {
        Mockito.when(this.cursorClient.listPartitionCursors(UnitTestExamples.exampleSubscriptionPath())).thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(0L), Offset.of(100L))));
        Mockito.when(this.headOffsetReader.getHeadOffset()).thenReturn(createPslSourceOffsetTwoPartition(301L, 0L));
        this.reader.setOffsetRange(Optional.empty(), Optional.empty());
        Truth.assertThat(this.reader.getStartOffset().getPartitionOffsetMap()).containsExactly(Partition.of(0L), SparkPartitionOffset.create(Partition.of(0L), 99L), new Object[]{Partition.of(1L), SparkPartitionOffset.create(Partition.of(1L), -1L)});
        Truth.assertThat(this.reader.getEndOffset().getPartitionOffsetMap()).containsExactly(Partition.of(0L), SparkPartitionOffset.create(Partition.of(0L), 300L), new Object[]{Partition.of(1L), SparkPartitionOffset.create(Partition.of(1L), -1L)});
    }

    @Test
    public void testValidOffsets() {
        SparkSourceOffset createSparkSourceOffsetTwoPartition = createSparkSourceOffsetTwoPartition(10L, 100L);
        SparkSourceOffset createSparkSourceOffsetTwoPartition2 = createSparkSourceOffsetTwoPartition(20L, 300L);
        this.reader.setOffsetRange(Optional.of(createSparkSourceOffsetTwoPartition), Optional.of(createSparkSourceOffsetTwoPartition2));
        Truth.assertThat(this.reader.getStartOffset()).isEqualTo(createSparkSourceOffsetTwoPartition);
        Truth.assertThat(this.reader.getEndOffset()).isEqualTo(createSparkSourceOffsetTwoPartition2);
    }

    @Test
    public void testDeserializeOffset() {
        SparkSourceOffset sparkSourceOffset = new SparkSourceOffset(ImmutableMap.of(Partition.of(1L), SparkPartitionOffset.create(Partition.of(1L), 10L)));
        Truth.assertThat(this.reader.deserializeOffset(sparkSourceOffset.json())).isEqualTo(sparkSourceOffset);
    }

    @Test
    public void testCommit() {
        SparkSourceOffset createSparkSourceOffsetTwoPartition = createSparkSourceOffsetTwoPartition(10L, 50L);
        PslSourceOffset createPslSourceOffsetTwoPartition = createPslSourceOffsetTwoPartition(11L, 51L);
        this.reader.commit(createSparkSourceOffsetTwoPartition);
        ((MultiPartitionCommitter) Mockito.verify(this.committer, Mockito.times(1))).commit((PslSourceOffset) ArgumentMatchers.eq(createPslSourceOffsetTwoPartition));
    }

    @Test
    public void testPlanInputPartitionNoMessage() {
        this.reader.setOffsetRange(Optional.of(createSparkSourceOffsetTwoPartition(10L, 100L)), Optional.of(createSparkSourceOffsetTwoPartition(20L, 100L)));
        Truth.assertThat(this.reader.planInputPartitions()).hasSize(1);
    }

    @Test
    public void testMaxMessagesPerBatch() {
        Mockito.when(this.cursorClient.listPartitionCursors(UnitTestExamples.exampleSubscriptionPath())).thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(0L), Offset.of(100L))));
        Mockito.when(this.headOffsetReader.getHeadOffset()).thenReturn(createPslSourceOffsetTwoPartition(10000000L, 0L));
        this.reader.setOffsetRange(Optional.empty(), Optional.empty());
        Truth.assertThat(this.reader.getEndOffset().getPartitionOffsetMap()).containsExactly(Partition.of(0L), SparkPartitionOffset.create(Partition.of(0L), 20099L), new Object[]{Partition.of(1L), SparkPartitionOffset.create(Partition.of(1L), -1L)});
    }
}
