package org.apache.beam.sdk.io.kinesis;

import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import org.assertj.core.api.Assertions;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

@RunWith(MockitoJUnitRunner.Silent.class)
/* loaded from: input_file:org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.class */
public class ShardRecordsIteratorTest {
    private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR";
    private static final String SECOND_ITERATOR = "SECOND_ITERATOR";
    private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR";
    private static final String THIRD_ITERATOR = "THIRD_ITERATOR";
    private static final String STREAM_NAME = "STREAM_NAME";
    private static final String SHARD_ID = "SHARD_ID";
    private static final Instant NOW = Instant.now();

    @Mock
    private SimplifiedKinesisClient kinesisClient;

    @Mock
    private ShardCheckpoint firstCheckpoint;

    @Mock
    private ShardCheckpoint aCheckpoint;

    @Mock
    private ShardCheckpoint bCheckpoint;

    @Mock
    private ShardCheckpoint cCheckpoint;

    @Mock
    private ShardCheckpoint dCheckpoint;

    @Mock
    private GetKinesisRecordsResult firstResult;

    @Mock
    private GetKinesisRecordsResult secondResult;

    @Mock
    private GetKinesisRecordsResult thirdResult;

    @Mock
    private KinesisRecord a;

    @Mock
    private KinesisRecord b;

    @Mock
    private KinesisRecord c;

    @Mock
    private KinesisRecord d;

    @Mock
    private RecordFilter recordFilter;
    private ShardRecordsIterator iterator;

    /* loaded from: input_file:org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest$IdentityAnswer.class */
    private static class IdentityAnswer implements Answer<Object> {
        private IdentityAnswer() {
        }

        public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
            return invocationOnMock.getArguments()[0];
        }
    }

    @Before
    public void setUp() throws IOException, TransientKinesisException {
        Mockito.when(this.firstCheckpoint.getShardIterator(this.kinesisClient)).thenReturn(INITIAL_ITERATOR);
        Mockito.when(this.firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
        Mockito.when(this.firstCheckpoint.getShardId()).thenReturn(SHARD_ID);
        Mockito.when(this.firstCheckpoint.moveAfter(this.a)).thenReturn(this.aCheckpoint);
        Mockito.when(this.aCheckpoint.moveAfter(this.b)).thenReturn(this.bCheckpoint);
        Mockito.when(this.aCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
        Mockito.when(this.aCheckpoint.getShardId()).thenReturn(SHARD_ID);
        Mockito.when(this.bCheckpoint.moveAfter(this.c)).thenReturn(this.cCheckpoint);
        Mockito.when(this.bCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
        Mockito.when(this.bCheckpoint.getShardId()).thenReturn(SHARD_ID);
        Mockito.when(this.cCheckpoint.moveAfter(this.d)).thenReturn(this.dCheckpoint);
        Mockito.when(this.cCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
        Mockito.when(this.cCheckpoint.getShardId()).thenReturn(SHARD_ID);
        Mockito.when(this.dCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
        Mockito.when(this.dCheckpoint.getShardId()).thenReturn(SHARD_ID);
        Mockito.when(this.kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID)).thenReturn(this.firstResult);
        Mockito.when(this.kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID)).thenReturn(this.secondResult);
        Mockito.when(this.kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID)).thenReturn(this.thirdResult);
        Mockito.when(this.firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR);
        Mockito.when(this.secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
        Mockito.when(this.thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
        Mockito.when(this.firstResult.getRecords()).thenReturn(Collections.emptyList());
        Mockito.when(this.secondResult.getRecords()).thenReturn(Collections.emptyList());
        Mockito.when(this.thirdResult.getRecords()).thenReturn(Collections.emptyList());
        Mockito.when(this.recordFilter.apply(Matchers.anyListOf(KinesisRecord.class), (ShardCheckpoint) Matchers.any(ShardCheckpoint.class))).thenAnswer(new IdentityAnswer());
        this.iterator = new ShardRecordsIterator(this.firstCheckpoint, this.kinesisClient, WatermarkPolicyFactory.withArrivalTimePolicy(), this.recordFilter);
    }

    @Test
    public void goesThroughAvailableRecords() throws IOException, TransientKinesisException, KinesisShardClosedException {
        Mockito.when(this.firstResult.getRecords()).thenReturn(Arrays.asList(this.a, this.b, this.c));
        Mockito.when(this.secondResult.getRecords()).thenReturn(Collections.singletonList(this.d));
        Mockito.when(this.thirdResult.getRecords()).thenReturn(Collections.emptyList());
        Assertions.assertThat(this.iterator.getCheckpoint()).isEqualTo(this.firstCheckpoint);
        Assertions.assertThat(this.iterator.readNextBatch()).isEqualTo(Arrays.asList(this.a, this.b, this.c));
        Assertions.assertThat(this.iterator.readNextBatch()).isEqualTo(Collections.singletonList(this.d));
        Assertions.assertThat(this.iterator.readNextBatch()).isEqualTo(Collections.emptyList());
    }

    @Test
    public void conformingRecordsMovesCheckpoint() throws IOException, TransientKinesisException {
        Mockito.when(this.firstResult.getRecords()).thenReturn(Arrays.asList(this.a, this.b, this.c));
        Mockito.when(this.secondResult.getRecords()).thenReturn(Collections.singletonList(this.d));
        Mockito.when(this.thirdResult.getRecords()).thenReturn(Collections.emptyList());
        Mockito.when(this.a.getApproximateArrivalTimestamp()).thenReturn(NOW);
        Mockito.when(this.b.getApproximateArrivalTimestamp()).thenReturn(NOW.plus(Duration.standardSeconds(1L)));
        Mockito.when(this.c.getApproximateArrivalTimestamp()).thenReturn(NOW.plus(Duration.standardSeconds(2L)));
        Mockito.when(this.d.getApproximateArrivalTimestamp()).thenReturn(NOW.plus(Duration.standardSeconds(3L)));
        this.iterator.ackRecord(this.a);
        Assertions.assertThat(this.iterator.getCheckpoint()).isEqualTo(this.aCheckpoint);
        this.iterator.ackRecord(this.b);
        Assertions.assertThat(this.iterator.getCheckpoint()).isEqualTo(this.bCheckpoint);
        this.iterator.ackRecord(this.c);
        Assertions.assertThat(this.iterator.getCheckpoint()).isEqualTo(this.cCheckpoint);
        this.iterator.ackRecord(this.d);
        Assertions.assertThat(this.iterator.getCheckpoint()).isEqualTo(this.dCheckpoint);
    }

    @Test
    public void refreshesExpiredIterator() throws IOException, TransientKinesisException, KinesisShardClosedException {
        Mockito.when(this.firstResult.getRecords()).thenReturn(Collections.singletonList(this.a));
        Mockito.when(this.secondResult.getRecords()).thenReturn(Collections.singletonList(this.b));
        Mockito.when(this.a.getApproximateArrivalTimestamp()).thenReturn(NOW);
        Mockito.when(this.b.getApproximateArrivalTimestamp()).thenReturn(NOW.plus(Duration.standardSeconds(1L)));
        Mockito.when(this.kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID)).thenThrow(ExpiredIteratorException.class);
        Mockito.when(this.aCheckpoint.getShardIterator(this.kinesisClient)).thenReturn(SECOND_REFRESHED_ITERATOR);
        Mockito.when(this.kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID)).thenReturn(this.secondResult);
        Assertions.assertThat(this.iterator.readNextBatch()).isEqualTo(Collections.singletonList(this.a));
        this.iterator.ackRecord(this.a);
        Assertions.assertThat(this.iterator.readNextBatch()).isEqualTo(Collections.singletonList(this.b));
        Assertions.assertThat(this.iterator.readNextBatch()).isEqualTo(Collections.emptyList());
    }
}
