/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.io.aws2.kinesis.CustomOptional;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisClientThrottledException;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisReaderCheckpoint;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisShardClosedException;
import org.apache.beam.sdk.io.aws2.kinesis.RateLimitPolicy;
import org.apache.beam.sdk.io.aws2.kinesis.ShardCheckpoint;
import org.apache.beam.sdk.io.aws2.kinesis.ShardReadersPool;
import org.apache.beam.sdk.io.aws2.kinesis.ShardRecordsIterator;
import org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient;
import org.apache.beam.sdk.io.aws2.kinesis.TransientKinesisException;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.assertj.core.api.AbstractListAssert;
import org.assertj.core.api.Assertions;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.verification.VerificationMode;

@RunWith(value=MockitoJUnitRunner.Silent.class)
public class ShardReadersPoolTest {
    private static final int TIMEOUT_IN_MILLIS = (int)TimeUnit.SECONDS.toMillis(10L);
    @Mock
    private ShardRecordsIterator firstIterator;
    @Mock
    private ShardRecordsIterator secondIterator;
    @Mock
    private ShardRecordsIterator thirdIterator;
    @Mock
    private ShardRecordsIterator fourthIterator;
    @Mock
    private ShardCheckpoint firstCheckpoint;
    @Mock
    private ShardCheckpoint secondCheckpoint;
    @Mock
    private SimplifiedKinesisClient kinesis;
    @Mock
    private KinesisRecord a;
    @Mock
    private KinesisRecord b;
    @Mock
    private KinesisRecord c;
    @Mock
    private KinesisRecord d;
    @Mock(answer=Answers.RETURNS_DEEP_STUBS)
    private KinesisIO.Read read;
    private ShardReadersPool shardReadersPool;
    private final Instant now = Instant.now();

    @Before
    public void setUp() throws TransientKinesisException {
        Mockito.when((Object)this.a.getShardId()).thenReturn((Object)"shard1");
        Mockito.when((Object)this.b.getShardId()).thenReturn((Object)"shard1");
        Mockito.when((Object)this.c.getShardId()).thenReturn((Object)"shard2");
        Mockito.when((Object)this.d.getShardId()).thenReturn((Object)"shard2");
        Mockito.when((Object)this.firstCheckpoint.getShardId()).thenReturn((Object)"shard1");
        Mockito.when((Object)this.firstCheckpoint.getStreamName()).thenReturn((Object)"testStream");
        Mockito.when((Object)this.secondCheckpoint.getShardId()).thenReturn((Object)"shard2");
        Mockito.when((Object)this.firstIterator.getStreamName()).thenReturn((Object)"testStream");
        Mockito.when((Object)this.firstIterator.getShardId()).thenReturn((Object)"shard1");
        Mockito.when((Object)this.firstIterator.getCheckpoint()).thenReturn((Object)this.firstCheckpoint);
        Mockito.when((Object)this.secondIterator.getShardId()).thenReturn((Object)"shard2");
        Mockito.when((Object)this.secondIterator.getCheckpoint()).thenReturn((Object)this.secondCheckpoint);
        Mockito.when((Object)this.thirdIterator.getShardId()).thenReturn((Object)"shard3");
        Mockito.when((Object)this.fourthIterator.getShardId()).thenReturn((Object)"shard4");
        Mockito.when((Object)this.read.getMaxCapacityPerShard()).thenReturn((Object)100);
        KinesisReaderCheckpoint checkpoint = new KinesisReaderCheckpoint((Iterable)ImmutableList.of((Object)this.firstCheckpoint, (Object)this.secondCheckpoint));
        this.shardReadersPool = (ShardReadersPool)Mockito.spy((Object)new ShardReadersPool(this.read, this.kinesis, checkpoint));
        ((ShardReadersPool)Mockito.doReturn((Object)this.firstIterator).when((Object)this.shardReadersPool)).createShardIterator(this.kinesis, this.firstCheckpoint);
        ((ShardReadersPool)Mockito.doReturn((Object)this.secondIterator).when((Object)this.shardReadersPool)).createShardIterator(this.kinesis, this.secondCheckpoint);
    }

    @After
    public void clean() {
        this.shardReadersPool.stop();
    }

    @Test
    public void shouldReturnAllRecords() throws Exception {
        Mockito.when((Object)this.firstIterator.readNextBatch()).thenReturn(Collections.emptyList()).thenReturn((Object)ImmutableList.of((Object)this.a, (Object)this.b)).thenReturn(Collections.emptyList());
        Mockito.when((Object)this.secondIterator.readNextBatch()).thenReturn(Collections.singletonList(this.c)).thenReturn(Collections.singletonList(this.d)).thenReturn(Collections.emptyList());
        this.shardReadersPool.start();
        ArrayList<KinesisRecord> fetchedRecords = new ArrayList<KinesisRecord>();
        while (fetchedRecords.size() < 4) {
            CustomOptional nextRecord = this.shardReadersPool.nextRecord();
            if (!nextRecord.isPresent()) continue;
            fetchedRecords.add((KinesisRecord)nextRecord.get());
        }
        Assertions.assertThat(fetchedRecords).containsExactlyInAnyOrder((Object[])new KinesisRecord[]{this.a, this.b, this.c, this.d});
        Assertions.assertThat((int)this.shardReadersPool.getRecordsQueue().remainingCapacity()).isEqualTo(200);
    }

    @Test
    public void shouldReturnAbsentOptionalWhenNoRecords() throws Exception {
        Mockito.when((Object)this.firstIterator.readNextBatch()).thenReturn(Collections.emptyList());
        Mockito.when((Object)this.secondIterator.readNextBatch()).thenReturn(Collections.emptyList());
        this.shardReadersPool.start();
        CustomOptional nextRecord = this.shardReadersPool.nextRecord();
        Assertions.assertThat((boolean)nextRecord.isPresent()).isFalse();
    }

    @Test
    public void shouldCheckpointReadRecords() throws Exception {
        Mockito.when((Object)this.firstIterator.readNextBatch()).thenReturn((Object)ImmutableList.of((Object)this.a, (Object)this.b)).thenReturn(Collections.emptyList());
        Mockito.when((Object)this.secondIterator.readNextBatch()).thenReturn(Collections.singletonList(this.c)).thenReturn(Collections.singletonList(this.d)).thenReturn(Collections.emptyList());
        this.shardReadersPool.start();
        int recordsFound = 0;
        while (recordsFound < 4) {
            CustomOptional nextRecord = this.shardReadersPool.nextRecord();
            if (!nextRecord.isPresent()) continue;
            ++recordsFound;
            KinesisRecord kinesisRecord = (KinesisRecord)nextRecord.get();
            if ("shard1".equals(kinesisRecord.getShardId())) {
                ((ShardRecordsIterator)Mockito.verify((Object)this.firstIterator)).ackRecord(kinesisRecord);
                continue;
            }
            ((ShardRecordsIterator)Mockito.verify((Object)this.secondIterator)).ackRecord(kinesisRecord);
        }
    }

    @Test
    public void shouldInterruptKinesisReadingAndStopShortly() throws Exception {
        Mockito.when((Object)this.firstIterator.readNextBatch()).thenAnswer(invocation -> {
            Thread.sleep(TIMEOUT_IN_MILLIS / 2);
            return Collections.emptyList();
        });
        this.shardReadersPool.start();
        Stopwatch stopwatch = Stopwatch.createStarted();
        this.shardReadersPool.stop();
        Assertions.assertThat((long)stopwatch.elapsed(TimeUnit.MILLISECONDS)).isLessThan((long)TIMEOUT_IN_MILLIS);
    }

    @Test
    public void shouldInterruptPuttingRecordsToQueueAndStopShortly() throws Exception {
        Mockito.when((Object)this.read.getMaxCapacityPerShard()).thenReturn((Object)2);
        Mockito.when((Object)this.firstIterator.readNextBatch()).thenReturn((Object)ImmutableList.of((Object)this.a, (Object)this.b, (Object)this.c));
        KinesisReaderCheckpoint checkpoint = new KinesisReaderCheckpoint((Iterable)ImmutableList.of((Object)this.firstCheckpoint, (Object)this.secondCheckpoint));
        ShardReadersPool shardReadersPool = new ShardReadersPool(this.read, this.kinesis, checkpoint);
        shardReadersPool.start();
        Stopwatch stopwatch = Stopwatch.createStarted();
        shardReadersPool.stop();
        Assertions.assertThat((long)stopwatch.elapsed(TimeUnit.MILLISECONDS)).isLessThan((long)TIMEOUT_IN_MILLIS);
    }

    @Test
    public void shouldStopReadingShardAfterReceivingShardClosedException() throws Exception {
        Mockito.when((Object)this.firstIterator.readNextBatch()).thenThrow(KinesisShardClosedException.class);
        Mockito.when((Object)this.firstIterator.findSuccessiveShardRecordIterators()).thenReturn(Collections.emptyList());
        this.shardReadersPool.start();
        ((ShardRecordsIterator)Mockito.verify((Object)this.firstIterator, (VerificationMode)Mockito.timeout((long)TIMEOUT_IN_MILLIS).times(1))).readNextBatch();
        ((ShardRecordsIterator)Mockito.verify((Object)this.secondIterator, (VerificationMode)Mockito.timeout((long)TIMEOUT_IN_MILLIS).atLeast(2))).readNextBatch();
    }

    @Test
    public void shouldStartReadingSuccessiveShardsAfterReceivingShardClosedException() throws Exception {
        Mockito.when((Object)this.firstIterator.readNextBatch()).thenThrow(KinesisShardClosedException.class);
        Mockito.when((Object)this.firstIterator.findSuccessiveShardRecordIterators()).thenReturn((Object)ImmutableList.of((Object)this.thirdIterator, (Object)this.fourthIterator));
        this.shardReadersPool.start();
        ((ShardRecordsIterator)Mockito.verify((Object)this.thirdIterator, (VerificationMode)Mockito.timeout((long)TIMEOUT_IN_MILLIS).atLeast(2))).readNextBatch();
        ((ShardRecordsIterator)Mockito.verify((Object)this.fourthIterator, (VerificationMode)Mockito.timeout((long)TIMEOUT_IN_MILLIS).atLeast(2))).readNextBatch();
    }

    @Test
    public void shouldStopReadersPoolWhenLastShardReaderStopped() throws Exception {
        Mockito.when((Object)this.firstIterator.readNextBatch()).thenThrow(KinesisShardClosedException.class);
        Mockito.when((Object)this.firstIterator.findSuccessiveShardRecordIterators()).thenReturn(Collections.emptyList());
        this.shardReadersPool.start();
        ((ShardRecordsIterator)Mockito.verify((Object)this.firstIterator, (VerificationMode)Mockito.timeout((long)TIMEOUT_IN_MILLIS).times(1))).readNextBatch();
    }

    @Test
    public void shouldStopReadersPoolAlsoWhenExceptionsOccurDuringStopping() throws Exception {
        Mockito.when((Object)this.firstIterator.readNextBatch()).thenThrow(KinesisShardClosedException.class);
        Mockito.when((Object)this.firstIterator.findSuccessiveShardRecordIterators()).thenThrow(TransientKinesisException.class).thenReturn(Collections.emptyList());
        this.shardReadersPool.start();
        ((ShardRecordsIterator)Mockito.verify((Object)this.firstIterator, (VerificationMode)Mockito.timeout((long)TIMEOUT_IN_MILLIS).times(2))).readNextBatch();
    }

    @Test
    public void shouldReturnAbsentOptionalWhenStartedWithNoIterators() throws Exception {
        KinesisReaderCheckpoint checkpoint = new KinesisReaderCheckpoint(Collections.emptyList());
        this.shardReadersPool = (ShardReadersPool)Mockito.spy((Object)new ShardReadersPool(this.read, this.kinesis, checkpoint));
        ((ShardReadersPool)Mockito.doReturn((Object)this.firstIterator).when((Object)this.shardReadersPool)).createShardIterator((SimplifiedKinesisClient)ArgumentMatchers.eq((Object)this.kinesis), (ShardCheckpoint)ArgumentMatchers.any(ShardCheckpoint.class));
        this.shardReadersPool.start();
        Assertions.assertThat((Object)this.shardReadersPool.nextRecord()).isEqualTo((Object)CustomOptional.absent());
    }

    @Test
    public void shouldForgetClosedShardIterator() throws Exception {
        Mockito.when((Object)this.firstIterator.readNextBatch()).thenThrow(KinesisShardClosedException.class);
        List emptyList = Collections.emptyList();
        Mockito.when((Object)this.firstIterator.findSuccessiveShardRecordIterators()).thenReturn(emptyList);
        this.shardReadersPool.start();
        ((ShardReadersPool)Mockito.verify((Object)this.shardReadersPool)).startReadingShards((Iterable)ImmutableList.of((Object)this.firstIterator, (Object)this.secondIterator), "testStream");
        ((ShardReadersPool)Mockito.verify((Object)this.shardReadersPool, (VerificationMode)Mockito.timeout((long)TIMEOUT_IN_MILLIS))).startReadingShards(emptyList, "testStream");
        KinesisReaderCheckpoint checkpointMark = this.shardReadersPool.getCheckpointMark();
        ((AbstractListAssert)Assertions.assertThat((Iterator)checkpointMark.iterator()).extracting("shardId", String.class).containsOnly((Object[])new String[]{"shard2"})).doesNotContain((Object[])new String[]{"shard1"});
    }

    @Test
    public void shouldReturnTheLeastWatermarkOfAllShards() throws Exception {
        Instant threeMin = this.now.minus((ReadableDuration)Duration.standardMinutes((long)3L));
        Instant twoMin = this.now.minus((ReadableDuration)Duration.standardMinutes((long)2L));
        Mockito.when((Object)this.firstIterator.getShardWatermark()).thenReturn((Object)threeMin).thenReturn((Object)this.now);
        Mockito.when((Object)this.secondIterator.getShardWatermark()).thenReturn((Object)twoMin);
        this.shardReadersPool.start();
        Assertions.assertThat((Comparable)this.shardReadersPool.getWatermark()).isEqualTo((Object)threeMin);
        Assertions.assertThat((Comparable)this.shardReadersPool.getWatermark()).isEqualTo((Object)twoMin);
        ((ShardRecordsIterator)Mockito.verify((Object)this.firstIterator, (VerificationMode)Mockito.times((int)2))).getShardWatermark();
        ((ShardRecordsIterator)Mockito.verify((Object)this.secondIterator, (VerificationMode)Mockito.times((int)2))).getShardWatermark();
    }

    @Test
    public void shouldReturnTheOldestFromLatestRecordTimestampOfAllShards() throws Exception {
        Instant threeMin = this.now.minus((ReadableDuration)Duration.standardMinutes((long)3L));
        Instant twoMin = this.now.minus((ReadableDuration)Duration.standardMinutes((long)2L));
        Mockito.when((Object)this.firstIterator.getLatestRecordTimestamp()).thenReturn((Object)threeMin).thenReturn((Object)this.now);
        Mockito.when((Object)this.secondIterator.getLatestRecordTimestamp()).thenReturn((Object)twoMin);
        this.shardReadersPool.start();
        Assertions.assertThat((Comparable)this.shardReadersPool.getLatestRecordTimestamp()).isEqualTo((Object)threeMin);
        Assertions.assertThat((Comparable)this.shardReadersPool.getLatestRecordTimestamp()).isEqualTo((Object)twoMin);
        ((ShardRecordsIterator)Mockito.verify((Object)this.firstIterator, (VerificationMode)Mockito.times((int)2))).getLatestRecordTimestamp();
        ((ShardRecordsIterator)Mockito.verify((Object)this.secondIterator, (VerificationMode)Mockito.times((int)2))).getLatestRecordTimestamp();
    }

    @Test
    public void shouldCallRateLimitPolicy() throws Exception {
        KinesisClientThrottledException e = new KinesisClientThrottledException("", null);
        Mockito.when((Object)this.firstIterator.readNextBatch()).thenThrow(new Throwable[]{e}).thenReturn((Object)ImmutableList.of((Object)this.a, (Object)this.b)).thenReturn(Collections.emptyList());
        Mockito.when((Object)this.secondIterator.readNextBatch()).thenReturn(Collections.singletonList(this.c)).thenReturn(Collections.singletonList(this.d)).thenReturn(Collections.emptyList());
        this.shardReadersPool.start();
        ArrayList<KinesisRecord> fetchedRecords = new ArrayList<KinesisRecord>();
        while (fetchedRecords.size() < 4) {
            CustomOptional nextRecord = this.shardReadersPool.nextRecord();
            if (!nextRecord.isPresent()) continue;
            fetchedRecords.add((KinesisRecord)nextRecord.get());
        }
        RateLimitPolicy rateLimitPolicy = this.read.getRateLimitPolicyFactory().getRateLimitPolicy();
        ((RateLimitPolicy)Mockito.verify((Object)rateLimitPolicy, (VerificationMode)Mockito.timeout((long)TIMEOUT_IN_MILLIS))).onThrottle((KinesisClientThrottledException)ArgumentMatchers.same((Object)e));
        ((RateLimitPolicy)Mockito.verify((Object)rateLimitPolicy, (VerificationMode)Mockito.timeout((long)TIMEOUT_IN_MILLIS))).onSuccess((List)ArgumentMatchers.eq((Object)ImmutableList.of((Object)this.a, (Object)this.b)));
        ((RateLimitPolicy)Mockito.verify((Object)rateLimitPolicy, (VerificationMode)Mockito.timeout((long)TIMEOUT_IN_MILLIS))).onSuccess((List)ArgumentMatchers.eq(Collections.singletonList(this.c)));
        ((RateLimitPolicy)Mockito.verify((Object)rateLimitPolicy, (VerificationMode)Mockito.timeout((long)TIMEOUT_IN_MILLIS))).onSuccess((List)ArgumentMatchers.eq(Collections.singletonList(this.d)));
        ((RateLimitPolicy)Mockito.verify((Object)rateLimitPolicy, (VerificationMode)Mockito.timeout((long)TIMEOUT_IN_MILLIS).atLeastOnce())).onSuccess((List)ArgumentMatchers.eq(Collections.emptyList()));
    }
}

