package org.apache.beam.sdk.io.kinesis;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.base.Stopwatch;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.class */
public class ShardReadersPoolTest {

    @Mock
    private ShardRecordsIterator firstIterator;

    @Mock
    private ShardRecordsIterator secondIterator;

    @Mock
    private ShardCheckpoint firstCheckpoint;

    @Mock
    private ShardCheckpoint secondCheckpoint;

    @Mock
    private SimplifiedKinesisClient kinesis;

    @Mock
    private KinesisRecord a;

    @Mock
    private KinesisRecord b;

    @Mock
    private KinesisRecord c;

    @Mock
    private KinesisRecord d;
    private ShardReadersPool shardReadersPool;

    @Before
    public void setUp() throws TransientKinesisException {
        Mockito.when(this.a.getShardId()).thenReturn("shard1");
        Mockito.when(this.b.getShardId()).thenReturn("shard1");
        Mockito.when(this.c.getShardId()).thenReturn("shard2");
        Mockito.when(this.d.getShardId()).thenReturn("shard2");
        Mockito.when(this.firstCheckpoint.getShardId()).thenReturn("shard1");
        Mockito.when(this.secondCheckpoint.getShardId()).thenReturn("shard2");
        this.shardReadersPool = (ShardReadersPool) Mockito.spy(new ShardReadersPool(this.kinesis, new KinesisReaderCheckpoint(Arrays.asList(this.firstCheckpoint, this.secondCheckpoint))));
        ((ShardReadersPool) Mockito.doReturn(this.firstIterator).when(this.shardReadersPool)).createShardIterator(this.kinesis, this.firstCheckpoint);
        ((ShardReadersPool) Mockito.doReturn(this.secondIterator).when(this.shardReadersPool)).createShardIterator(this.kinesis, this.secondCheckpoint);
    }

    @Test
    public void shouldReturnAllRecords() throws TransientKinesisException {
        Mockito.when(this.firstIterator.readNextBatch()).thenReturn(Collections.emptyList()).thenReturn(Arrays.asList(this.a, this.b)).thenReturn(Collections.emptyList());
        Mockito.when(this.secondIterator.readNextBatch()).thenReturn(Collections.singletonList(this.c)).thenReturn(Collections.singletonList(this.d)).thenReturn(Collections.emptyList());
        this.shardReadersPool.start();
        ArrayList arrayList = new ArrayList();
        while (arrayList.size() < 4) {
            CustomOptional nextRecord = this.shardReadersPool.nextRecord();
            if (nextRecord.isPresent()) {
                arrayList.add(nextRecord.get());
            }
        }
        Assertions.assertThat(arrayList).containsExactlyInAnyOrder(new KinesisRecord[]{this.a, this.b, this.c, this.d});
    }

    @Test
    public void shouldReturnAbsentOptionalWhenNoRecords() throws TransientKinesisException {
        Mockito.when(this.firstIterator.readNextBatch()).thenReturn(Collections.emptyList());
        Mockito.when(this.secondIterator.readNextBatch()).thenReturn(Collections.emptyList());
        this.shardReadersPool.start();
        Assertions.assertThat(this.shardReadersPool.nextRecord().isPresent()).isFalse();
    }

    @Test
    public void shouldCheckpointReadRecords() throws TransientKinesisException {
        Mockito.when(this.firstIterator.readNextBatch()).thenReturn(Arrays.asList(this.a, this.b)).thenReturn(Collections.emptyList());
        Mockito.when(this.secondIterator.readNextBatch()).thenReturn(Collections.singletonList(this.c)).thenReturn(Collections.singletonList(this.d)).thenReturn(Collections.emptyList());
        this.shardReadersPool.start();
        int i = 0;
        while (i < 4) {
            CustomOptional nextRecord = this.shardReadersPool.nextRecord();
            if (nextRecord.isPresent()) {
                i++;
                KinesisRecord kinesisRecord = (KinesisRecord) nextRecord.get();
                if (kinesisRecord.getShardId().equals("shard1")) {
                    ((ShardRecordsIterator) Mockito.verify(this.firstIterator)).ackRecord(kinesisRecord);
                } else {
                    ((ShardRecordsIterator) Mockito.verify(this.secondIterator)).ackRecord(kinesisRecord);
                }
            }
        }
    }

    @Test
    public void shouldInterruptKinesisReadingAndStopShortly() throws TransientKinesisException {
        Mockito.when(this.firstIterator.readNextBatch()).thenAnswer(invocationOnMock -> {
            Thread.sleep(TimeUnit.MINUTES.toMillis(1L));
            return Collections.emptyList();
        });
        this.shardReadersPool.start();
        Stopwatch createStarted = Stopwatch.createStarted();
        this.shardReadersPool.stop();
        Assertions.assertThat(createStarted.elapsed(TimeUnit.MILLISECONDS)).isLessThan(TimeUnit.SECONDS.toMillis(1L));
    }

    @Test
    public void shouldInterruptPuttingRecordsToQueueAndStopShortly() throws TransientKinesisException {
        Mockito.when(this.firstIterator.readNextBatch()).thenReturn(Arrays.asList(this.a, this.b, this.c));
        ShardReadersPool shardReadersPool = new ShardReadersPool(this.kinesis, new KinesisReaderCheckpoint(Arrays.asList(this.firstCheckpoint, this.secondCheckpoint)), 2);
        shardReadersPool.start();
        Stopwatch createStarted = Stopwatch.createStarted();
        shardReadersPool.stop();
        Assertions.assertThat(createStarted.elapsed(TimeUnit.MILLISECONDS)).isLessThan(TimeUnit.SECONDS.toMillis(1L));
    }

    @Test
    public void shouldDetectThatNotAllShardsAreUpToDate() throws TransientKinesisException {
        Mockito.when(Boolean.valueOf(this.firstIterator.isUpToDate())).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.secondIterator.isUpToDate())).thenReturn(false);
        this.shardReadersPool.start();
        Assertions.assertThat(this.shardReadersPool.allShardsUpToDate()).isFalse();
    }

    @Test
    public void shouldDetectThatAllShardsAreUpToDate() throws TransientKinesisException {
        Mockito.when(Boolean.valueOf(this.firstIterator.isUpToDate())).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.secondIterator.isUpToDate())).thenReturn(true);
        this.shardReadersPool.start();
        Assertions.assertThat(this.shardReadersPool.allShardsUpToDate()).isTrue();
    }
}
