package com.google.cloud.pubsublite.spark.internal;

import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.spark.PslSourceOffset;
import com.google.cloud.pubsublite.spark.TestingUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:com/google/cloud/pubsublite/spark/internal/MultiPartitionCommitterImplTest.class */
public class MultiPartitionCommitterImplTest {
    private Runnable task;
    private List<Committer> committerList;

    private MultiPartitionCommitterImpl createCommitter(int i, int i2) {
        this.committerList = new ArrayList();
        for (int i3 = 0; i3 < i2; i3++) {
            Committer committer = (Committer) Mockito.mock(Committer.class);
            Mockito.when(committer.startAsync()).thenReturn(committer).thenThrow(new Throwable[]{new IllegalStateException("should only init once")});
            Mockito.when(committer.commitOffset((Offset) ArgumentMatchers.eq(Offset.of(10L)))).thenReturn(SettableApiFuture.create());
            this.committerList.add(committer);
        }
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Runnable.class);
        Mockito.when(scheduledExecutorService.scheduleWithFixedDelay((Runnable) forClass.capture(), Mockito.anyLong(), Mockito.anyLong(), (TimeUnit) Mockito.any(TimeUnit.class))).thenReturn((Object) null);
        MultiPartitionCommitterImpl multiPartitionCommitterImpl = new MultiPartitionCommitterImpl(i, partition -> {
            return this.committerList.get((int) partition.value());
        }, scheduledExecutorService);
        this.task = (Runnable) forClass.getValue();
        return multiPartitionCommitterImpl;
    }

    private MultiPartitionCommitterImpl createCommitter(int i) {
        return createCommitter(i, i);
    }

    @Test
    public void testCommit() {
        MultiPartitionCommitterImpl createCommitter = createCommitter(2);
        ((Committer) Mockito.verify(this.committerList.get(0))).startAsync();
        ((Committer) Mockito.verify(this.committerList.get(1))).startAsync();
        PslSourceOffset createPslSourceOffset = TestingUtils.createPslSourceOffset(10, 8);
        SettableApiFuture create = SettableApiFuture.create();
        SettableApiFuture create2 = SettableApiFuture.create();
        Mockito.when(this.committerList.get(0).commitOffset((Offset) ArgumentMatchers.eq(Offset.of(10L)))).thenReturn(create);
        Mockito.when(this.committerList.get(1).commitOffset((Offset) ArgumentMatchers.eq(Offset.of(8L)))).thenReturn(create2);
        createCommitter.commit(createPslSourceOffset);
        ((Committer) Mockito.verify(this.committerList.get(0))).commitOffset((Offset) ArgumentMatchers.eq(Offset.of(10L)));
        ((Committer) Mockito.verify(this.committerList.get(1))).commitOffset((Offset) ArgumentMatchers.eq(Offset.of(8L)));
    }

    @Test
    public void testClose() {
        MultiPartitionCommitterImpl createCommitter = createCommitter(1);
        PslSourceOffset createPslSourceOffset = TestingUtils.createPslSourceOffset(10);
        Mockito.when(this.committerList.get(0).commitOffset((Offset) ArgumentMatchers.eq(Offset.of(10L)))).thenReturn(SettableApiFuture.create());
        createCommitter.commit(createPslSourceOffset);
        Mockito.when(this.committerList.get(0).stopAsync()).thenReturn(this.committerList.get(0));
        createCommitter.close();
        ((Committer) Mockito.verify(this.committerList.get(0))).stopAsync();
    }

    @Test
    public void testPartitionChange() {
        MultiPartitionCommitterImpl createCommitter = createCommitter(2, 4);
        for (int i = 0; i < 2; i++) {
            ((Committer) Mockito.verify(this.committerList.get(i))).startAsync();
        }
        for (int i2 = 2; i2 < 4; i2++) {
            ((Committer) Mockito.verify(this.committerList.get(i2), Mockito.times(0))).startAsync();
        }
        createCommitter.commit(TestingUtils.createPslSourceOffset(10, 10, 10, 10));
        for (int i3 = 0; i3 < 2; i3++) {
            ((Committer) Mockito.verify(this.committerList.get(i3))).commitOffset((Offset) ArgumentMatchers.eq(Offset.of(10L)));
        }
        for (int i4 = 2; i4 < 4; i4++) {
            ((Committer) Mockito.verify(this.committerList.get(i4))).startAsync();
            ((Committer) Mockito.verify(this.committerList.get(i4))).commitOffset((Offset) ArgumentMatchers.eq(Offset.of(10L)));
        }
        createCommitter.commit(TestingUtils.createPslSourceOffset(10, 10));
        for (int i5 = 0; i5 < 2; i5++) {
            ((Committer) Mockito.verify(this.committerList.get(i5), Mockito.times(2))).commitOffset((Offset) ArgumentMatchers.eq(Offset.of(10L)));
        }
        this.task.run();
        for (int i6 = 2; i6 < 4; i6++) {
            ((Committer) Mockito.verify(this.committerList.get(i6))).stopAsync();
        }
    }

    @Test
    public void testDelayedPartitionRemoval() {
        MultiPartitionCommitterImpl createCommitter = createCommitter(4);
        createCommitter.commit(TestingUtils.createPslSourceOffset(10, 10));
        createCommitter.commit(TestingUtils.createPslSourceOffset(10, 10, 10));
        this.task.run();
        ((Committer) Mockito.verify(this.committerList.get(2))).startAsync();
        ((Committer) Mockito.verify(this.committerList.get(2), Mockito.times(0))).stopAsync();
        ((Committer) Mockito.verify(this.committerList.get(3))).startAsync();
        ((Committer) Mockito.verify(this.committerList.get(3))).stopAsync();
    }
}
