/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.io.aws2.kinesis.GetKinesisRecordsResult;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisShardClosedException;
import org.apache.beam.sdk.io.aws2.kinesis.RecordFilter;
import org.apache.beam.sdk.io.aws2.kinesis.ShardCheckpoint;
import org.apache.beam.sdk.io.aws2.kinesis.ShardRecordsIterator;
import org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient;
import org.apache.beam.sdk.io.aws2.kinesis.TransientKinesisException;
import org.apache.beam.sdk.io.aws2.kinesis.WatermarkPolicyFactory;
import org.assertj.core.api.Assertions;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;

@RunWith(value=MockitoJUnitRunner.Silent.class)
public class ShardRecordsIteratorTest {
    private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR";
    private static final String SECOND_ITERATOR = "SECOND_ITERATOR";
    private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR";
    private static final String THIRD_ITERATOR = "THIRD_ITERATOR";
    private static final String STREAM_NAME = "STREAM_NAME";
    private static final String SHARD_ID = "SHARD_ID";
    private static final Instant NOW = Instant.now();
    @Mock
    private SimplifiedKinesisClient kinesisClient;
    @Mock
    private ShardCheckpoint firstCheckpoint;
    @Mock
    private ShardCheckpoint aCheckpoint;
    @Mock
    private ShardCheckpoint bCheckpoint;
    @Mock
    private ShardCheckpoint cCheckpoint;
    @Mock
    private ShardCheckpoint dCheckpoint;
    @Mock
    private GetKinesisRecordsResult firstResult;
    @Mock
    private GetKinesisRecordsResult secondResult;
    @Mock
    private GetKinesisRecordsResult thirdResult;
    @Mock
    private KinesisRecord a;
    @Mock
    private KinesisRecord b;
    @Mock
    private KinesisRecord c;
    @Mock
    private KinesisRecord d;
    @Mock
    private RecordFilter recordFilter;
    private ShardRecordsIterator iterator;

    @Before
    public void setUp() throws IOException, TransientKinesisException {
        Mockito.when((Object)this.firstCheckpoint.getShardIterator(this.kinesisClient)).thenReturn((Object)INITIAL_ITERATOR);
        Mockito.when((Object)this.firstCheckpoint.getStreamName()).thenReturn((Object)STREAM_NAME);
        Mockito.when((Object)this.firstCheckpoint.getShardId()).thenReturn((Object)SHARD_ID);
        Mockito.when((Object)this.firstCheckpoint.moveAfter(this.a)).thenReturn((Object)this.aCheckpoint);
        Mockito.when((Object)this.aCheckpoint.moveAfter(this.b)).thenReturn((Object)this.bCheckpoint);
        Mockito.when((Object)this.aCheckpoint.getStreamName()).thenReturn((Object)STREAM_NAME);
        Mockito.when((Object)this.aCheckpoint.getShardId()).thenReturn((Object)SHARD_ID);
        Mockito.when((Object)this.bCheckpoint.moveAfter(this.c)).thenReturn((Object)this.cCheckpoint);
        Mockito.when((Object)this.bCheckpoint.getStreamName()).thenReturn((Object)STREAM_NAME);
        Mockito.when((Object)this.bCheckpoint.getShardId()).thenReturn((Object)SHARD_ID);
        Mockito.when((Object)this.cCheckpoint.moveAfter(this.d)).thenReturn((Object)this.dCheckpoint);
        Mockito.when((Object)this.cCheckpoint.getStreamName()).thenReturn((Object)STREAM_NAME);
        Mockito.when((Object)this.cCheckpoint.getShardId()).thenReturn((Object)SHARD_ID);
        Mockito.when((Object)this.dCheckpoint.getStreamName()).thenReturn((Object)STREAM_NAME);
        Mockito.when((Object)this.dCheckpoint.getShardId()).thenReturn((Object)SHARD_ID);
        Mockito.when((Object)this.kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID)).thenReturn((Object)this.firstResult);
        Mockito.when((Object)this.kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID)).thenReturn((Object)this.secondResult);
        Mockito.when((Object)this.kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID)).thenReturn((Object)this.thirdResult);
        Mockito.when((Object)this.firstResult.getNextShardIterator()).thenReturn((Object)SECOND_ITERATOR);
        Mockito.when((Object)this.secondResult.getNextShardIterator()).thenReturn((Object)THIRD_ITERATOR);
        Mockito.when((Object)this.thirdResult.getNextShardIterator()).thenReturn((Object)THIRD_ITERATOR);
        Mockito.when((Object)this.firstResult.getRecords()).thenReturn(Collections.emptyList());
        Mockito.when((Object)this.secondResult.getRecords()).thenReturn(Collections.emptyList());
        Mockito.when((Object)this.thirdResult.getRecords()).thenReturn(Collections.emptyList());
        Mockito.when((Object)this.recordFilter.apply(ArgumentMatchers.anyList(), (ShardCheckpoint)ArgumentMatchers.any(ShardCheckpoint.class))).thenAnswer((Answer)new IdentityAnswer());
        WatermarkPolicyFactory watermarkPolicyFactory = WatermarkPolicyFactory.withArrivalTimePolicy();
        this.iterator = new ShardRecordsIterator(this.firstCheckpoint, this.kinesisClient, watermarkPolicyFactory, this.recordFilter);
    }

    @Test
    public void goesThroughAvailableRecords() throws IOException, TransientKinesisException, KinesisShardClosedException {
        Mockito.when((Object)this.firstResult.getRecords()).thenReturn(Arrays.asList(this.a, this.b, this.c));
        Mockito.when((Object)this.secondResult.getRecords()).thenReturn(Collections.singletonList(this.d));
        Mockito.when((Object)this.thirdResult.getRecords()).thenReturn(Collections.emptyList());
        Assertions.assertThat((Object)this.iterator.getCheckpoint()).isEqualTo((Object)this.firstCheckpoint);
        Assertions.assertThat((List)this.iterator.readNextBatch()).isEqualTo(Arrays.asList(this.a, this.b, this.c));
        Assertions.assertThat((List)this.iterator.readNextBatch()).isEqualTo(Collections.singletonList(this.d));
        Assertions.assertThat((List)this.iterator.readNextBatch()).isEqualTo(Collections.emptyList());
    }

    @Test
    public void conformingRecordsMovesCheckpoint() throws IOException, TransientKinesisException {
        Mockito.when((Object)this.firstResult.getRecords()).thenReturn(Arrays.asList(this.a, this.b, this.c));
        Mockito.when((Object)this.secondResult.getRecords()).thenReturn(Collections.singletonList(this.d));
        Mockito.when((Object)this.thirdResult.getRecords()).thenReturn(Collections.emptyList());
        Mockito.when((Object)this.a.getApproximateArrivalTimestamp()).thenReturn((Object)NOW);
        Mockito.when((Object)this.b.getApproximateArrivalTimestamp()).thenReturn((Object)NOW.plus((ReadableDuration)Duration.standardSeconds((long)1L)));
        Mockito.when((Object)this.c.getApproximateArrivalTimestamp()).thenReturn((Object)NOW.plus((ReadableDuration)Duration.standardSeconds((long)2L)));
        Mockito.when((Object)this.d.getApproximateArrivalTimestamp()).thenReturn((Object)NOW.plus((ReadableDuration)Duration.standardSeconds((long)3L)));
        this.iterator.ackRecord(this.a);
        Assertions.assertThat((Object)this.iterator.getCheckpoint()).isEqualTo((Object)this.aCheckpoint);
        this.iterator.ackRecord(this.b);
        Assertions.assertThat((Object)this.iterator.getCheckpoint()).isEqualTo((Object)this.bCheckpoint);
        this.iterator.ackRecord(this.c);
        Assertions.assertThat((Object)this.iterator.getCheckpoint()).isEqualTo((Object)this.cCheckpoint);
        this.iterator.ackRecord(this.d);
        Assertions.assertThat((Object)this.iterator.getCheckpoint()).isEqualTo((Object)this.dCheckpoint);
    }

    @Test
    public void refreshesExpiredIterator() throws IOException, TransientKinesisException, KinesisShardClosedException {
        Mockito.when((Object)this.firstResult.getRecords()).thenReturn(Collections.singletonList(this.a));
        Mockito.when((Object)this.secondResult.getRecords()).thenReturn(Collections.singletonList(this.b));
        Mockito.when((Object)this.a.getApproximateArrivalTimestamp()).thenReturn((Object)NOW);
        Mockito.when((Object)this.b.getApproximateArrivalTimestamp()).thenReturn((Object)NOW.plus((ReadableDuration)Duration.standardSeconds((long)1L)));
        Mockito.when((Object)this.kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID)).thenThrow(ExpiredIteratorException.class);
        Mockito.when((Object)this.aCheckpoint.getShardIterator(this.kinesisClient)).thenReturn((Object)SECOND_REFRESHED_ITERATOR);
        Mockito.when((Object)this.kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID)).thenReturn((Object)this.secondResult);
        Assertions.assertThat((List)this.iterator.readNextBatch()).isEqualTo(Collections.singletonList(this.a));
        this.iterator.ackRecord(this.a);
        Assertions.assertThat((List)this.iterator.readNextBatch()).isEqualTo(Collections.singletonList(this.b));
        Assertions.assertThat((List)this.iterator.readNextBatch()).isEqualTo(Collections.emptyList());
    }

    @Test
    public void tracksLatestRecordTimestamp() {
        Mockito.when((Object)this.firstResult.getRecords()).thenReturn(Collections.singletonList(this.a));
        Mockito.when((Object)this.secondResult.getRecords()).thenReturn(Arrays.asList(this.b, this.c));
        Mockito.when((Object)this.thirdResult.getRecords()).thenReturn(Collections.singletonList(this.c));
        Mockito.when((Object)this.a.getApproximateArrivalTimestamp()).thenReturn((Object)NOW);
        Mockito.when((Object)this.b.getApproximateArrivalTimestamp()).thenReturn((Object)NOW.plus((ReadableDuration)Duration.standardSeconds((long)4L)));
        Mockito.when((Object)this.c.getApproximateArrivalTimestamp()).thenReturn((Object)NOW.plus((ReadableDuration)Duration.standardSeconds((long)2L)));
        Mockito.when((Object)this.d.getApproximateArrivalTimestamp()).thenReturn((Object)NOW.plus((ReadableDuration)Duration.standardSeconds((long)6L)));
        this.iterator.ackRecord(this.a);
        Assertions.assertThat((Comparable)this.iterator.getLatestRecordTimestamp()).isEqualTo((Object)NOW);
        this.iterator.ackRecord(this.b);
        Assertions.assertThat((Comparable)this.iterator.getLatestRecordTimestamp()).isEqualTo((Object)NOW.plus((ReadableDuration)Duration.standardSeconds((long)4L)));
        this.iterator.ackRecord(this.c);
        Assertions.assertThat((Comparable)this.iterator.getLatestRecordTimestamp()).isEqualTo((Object)NOW.plus((ReadableDuration)Duration.standardSeconds((long)4L)));
        this.iterator.ackRecord(this.d);
        Assertions.assertThat((Comparable)this.iterator.getLatestRecordTimestamp()).isEqualTo((Object)NOW.plus((ReadableDuration)Duration.standardSeconds((long)6L)));
    }

    private static class IdentityAnswer
    implements Answer<Object> {
        private IdentityAnswer() {
        }

        public Object answer(InvocationOnMock invocation) throws Throwable {
            return invocation.getArguments()[0];
        }
    }
}

