package com.google.cloud.pubsublite.spark;

import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
import com.google.cloud.pubsublite.spark.internal.MultiPartitionCommitter;
import com.google.cloud.pubsublite.spark.internal.PartitionSubscriberFactory;
import com.google.cloud.pubsublite.spark.internal.PerTopicHeadOffsetReader;
import com.google.common.collect.ImmutableMap;
import com.google.common.truth.Truth;
import java.util.Optional;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:com/google/cloud/pubsublite/spark/PslMicroBatchReaderTest.class */
public class PslMicroBatchReaderTest {
    private static final PslReadDataSourceOptions OPTIONS = PslReadDataSourceOptions.builder().setSubscriptionPath(UnitTestExamples.exampleSubscriptionPath()).build();
    private static final long MAX_MESSAGES_PER_BATCH = 20000;
    private final CursorClient cursorClient = (CursorClient) Mockito.mock(CursorClient.class);
    private final MultiPartitionCommitter committer = (MultiPartitionCommitter) Mockito.mock(MultiPartitionCommitter.class);
    private final PartitionSubscriberFactory partitionSubscriberFactory = (PartitionSubscriberFactory) Mockito.mock(PartitionSubscriberFactory.class);
    private final PerTopicHeadOffsetReader headOffsetReader = (PerTopicHeadOffsetReader) Mockito.mock(PerTopicHeadOffsetReader.class);
    private final PslMicroBatchReader reader = new PslMicroBatchReader(this.cursorClient, this.committer, this.partitionSubscriberFactory, this.headOffsetReader, UnitTestExamples.exampleSubscriptionPath(), OPTIONS.flowControlSettings(), MAX_MESSAGES_PER_BATCH);

    @Test
    public void testNoCommitCursors() {
        Mockito.when(this.cursorClient.listPartitionCursors(UnitTestExamples.exampleSubscriptionPath())).thenReturn(ApiFutures.immediateFuture(ImmutableMap.of()));
        Mockito.when(this.headOffsetReader.getHeadOffset()).thenReturn(TestingUtils.createPslSourceOffset(301, 200));
        this.reader.setOffsetRange(Optional.empty(), Optional.empty());
        Truth.assertThat(this.reader.getStartOffset().getPartitionOffsetMap()).containsExactly(Partition.of(0L), SparkPartitionOffset.create(Partition.of(0L), -1L), new Object[]{Partition.of(1L), SparkPartitionOffset.create(Partition.of(1L), -1L)});
        Truth.assertThat(this.reader.getEndOffset().getPartitionOffsetMap()).containsExactly(Partition.of(0L), SparkPartitionOffset.create(Partition.of(0L), 300L), new Object[]{Partition.of(1L), SparkPartitionOffset.create(Partition.of(1L), 199L)});
    }

    @Test
    public void testEmptyOffsets() {
        Mockito.when(this.cursorClient.listPartitionCursors(UnitTestExamples.exampleSubscriptionPath())).thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(0L), Offset.of(100L))));
        Mockito.when(this.headOffsetReader.getHeadOffset()).thenReturn(TestingUtils.createPslSourceOffset(301, 0));
        this.reader.setOffsetRange(Optional.empty(), Optional.empty());
        Truth.assertThat(this.reader.getStartOffset().getPartitionOffsetMap()).containsExactly(Partition.of(0L), SparkPartitionOffset.create(Partition.of(0L), 99L), new Object[]{Partition.of(1L), SparkPartitionOffset.create(Partition.of(1L), -1L)});
        Truth.assertThat(this.reader.getEndOffset().getPartitionOffsetMap()).containsExactly(Partition.of(0L), SparkPartitionOffset.create(Partition.of(0L), 300L), new Object[]{Partition.of(1L), SparkPartitionOffset.create(Partition.of(1L), -1L)});
    }

    @Test
    public void testValidOffsets() {
        SparkSourceOffset createSparkSourceOffset = TestingUtils.createSparkSourceOffset(10, 100);
        SparkSourceOffset createSparkSourceOffset2 = TestingUtils.createSparkSourceOffset(20, 300);
        this.reader.setOffsetRange(Optional.of(createSparkSourceOffset), Optional.of(createSparkSourceOffset2));
        Truth.assertThat(this.reader.getStartOffset()).isEqualTo(createSparkSourceOffset);
        Truth.assertThat(this.reader.getEndOffset()).isEqualTo(createSparkSourceOffset2);
    }

    @Test
    public void testDeserializeOffset() {
        SparkSourceOffset sparkSourceOffset = new SparkSourceOffset(ImmutableMap.of(Partition.of(1L), SparkPartitionOffset.create(Partition.of(1L), 10L)));
        Truth.assertThat(this.reader.deserializeOffset(sparkSourceOffset.json())).isEqualTo(sparkSourceOffset);
    }

    @Test
    public void testCommit() {
        SparkSourceOffset createSparkSourceOffset = TestingUtils.createSparkSourceOffset(10, 50);
        PslSourceOffset createPslSourceOffset = TestingUtils.createPslSourceOffset(11, 51);
        this.reader.commit(createSparkSourceOffset);
        ((MultiPartitionCommitter) Mockito.verify(this.committer, Mockito.times(1))).commit((PslSourceOffset) ArgumentMatchers.eq(createPslSourceOffset));
    }

    @Test
    public void testPlanInputPartitionNoMessage() {
        this.reader.setOffsetRange(Optional.of(TestingUtils.createSparkSourceOffset(10, 100)), Optional.of(TestingUtils.createSparkSourceOffset(20, 100)));
        Truth.assertThat(this.reader.planInputPartitions()).hasSize(1);
    }

    @Test
    public void testMaxMessagesPerBatch() {
        Mockito.when(this.cursorClient.listPartitionCursors(UnitTestExamples.exampleSubscriptionPath())).thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(0L), Offset.of(100L))));
        Mockito.when(this.headOffsetReader.getHeadOffset()).thenReturn(TestingUtils.createPslSourceOffset(10000000, 0));
        this.reader.setOffsetRange(Optional.empty(), Optional.empty());
        Truth.assertThat(this.reader.getEndOffset().getPartitionOffsetMap()).containsExactly(Partition.of(0L), SparkPartitionOffset.create(Partition.of(0L), 20099L), new Object[]{Partition.of(1L), SparkPartitionOffset.create(Partition.of(1L), -1L)});
    }

    @Test
    public void testPartitionIncreasedRetry() {
        SparkSourceOffset createSparkSourceOffset = TestingUtils.createSparkSourceOffset(10, 100);
        SparkSourceOffset createSparkSourceOffset2 = TestingUtils.createSparkSourceOffset(20, 300, 100);
        this.reader.setOffsetRange(Optional.of(createSparkSourceOffset), Optional.of(createSparkSourceOffset2));
        Truth.assertThat(this.reader.getStartOffset()).isEqualTo(createSparkSourceOffset);
        Truth.assertThat(this.reader.getEndOffset()).isEqualTo(createSparkSourceOffset2);
        Truth.assertThat(this.reader.planInputPartitions()).hasSize(3);
    }

    @Test
    public void testPartitionIncreasedNewQuery() {
        Mockito.when(this.cursorClient.listPartitionCursors(UnitTestExamples.exampleSubscriptionPath())).thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(0L), Offset.of(100L))));
        SparkSourceOffset createSparkSourceOffset = TestingUtils.createSparkSourceOffset(301, 200);
        Mockito.when(this.headOffsetReader.getHeadOffset()).thenReturn(PslSparkUtils.toPslSourceOffset(createSparkSourceOffset));
        this.reader.setOffsetRange(Optional.empty(), Optional.empty());
        Truth.assertThat(this.reader.getStartOffset()).isEqualTo(TestingUtils.createSparkSourceOffset(99, -1));
        Truth.assertThat(this.reader.getEndOffset()).isEqualTo(createSparkSourceOffset);
        Truth.assertThat(this.reader.planInputPartitions()).hasSize(2);
    }

    @Test
    public void testPartitionIncreasedBeforeSetOffsets() {
        SparkSourceOffset createSparkSourceOffset = TestingUtils.createSparkSourceOffset(301, 200);
        SparkSourceOffset createSparkSourceOffset2 = TestingUtils.createSparkSourceOffset(100);
        Mockito.when(this.headOffsetReader.getHeadOffset()).thenReturn(PslSparkUtils.toPslSourceOffset(createSparkSourceOffset));
        this.reader.setOffsetRange(Optional.of(createSparkSourceOffset2), Optional.empty());
        Truth.assertThat(this.reader.getStartOffset()).isEqualTo(createSparkSourceOffset2);
        Truth.assertThat(this.reader.getEndOffset()).isEqualTo(createSparkSourceOffset);
        Truth.assertThat(this.reader.planInputPartitions()).hasSize(2);
    }

    @Test
    public void testPartitionIncreasedBetweenSetOffsetsAndPlan() {
        SparkSourceOffset createSparkSourceOffset = TestingUtils.createSparkSourceOffset(100);
        SparkSourceOffset createSparkSourceOffset2 = TestingUtils.createSparkSourceOffset(301);
        SparkSourceOffset createSparkSourceOffset3 = TestingUtils.createSparkSourceOffset(600, 300);
        Mockito.when(this.headOffsetReader.getHeadOffset()).thenReturn(PslSparkUtils.toPslSourceOffset(createSparkSourceOffset2));
        this.reader.setOffsetRange(Optional.of(createSparkSourceOffset), Optional.empty());
        Truth.assertThat(this.reader.getStartOffset()).isEqualTo(createSparkSourceOffset);
        Truth.assertThat(this.reader.getEndOffset()).isEqualTo(createSparkSourceOffset2);
        Mockito.when(this.headOffsetReader.getHeadOffset()).thenReturn(PslSparkUtils.toPslSourceOffset(createSparkSourceOffset3));
        Truth.assertThat(this.reader.planInputPartitions()).hasSize(1);
    }
}
