package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.IOException;
import java.util.Arrays;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.assertj.core.api.Assertions;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;

@RunWith(MockitoJUnitRunner.Silent.class)
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReaderTest.class */
public class EFOKinesisReaderTest {

    @Mock
    private KinesisAsyncClient kinesis;

    @Mock
    private KinesisIO.Read read;

    @Mock
    private KinesisReaderCheckpoint checkpoint;

    @Mock
    private ShardCheckpoint firstCheckpoint;

    @Mock
    private ShardCheckpoint secondCheckpoint;

    @Mock
    private KinesisRecord a;

    @Mock
    private KinesisRecord b;

    @Mock
    private KinesisRecord c;

    @Mock
    private KinesisRecord d;

    @Mock
    private KinesisSource kinesisSource;

    @Mock
    private EFOShardSubscribersPool subscribersPool;
    private EFOKinesisReader reader;

    @Before
    public void setUp() throws IOException {
        Mockito.when(this.read.getWatermarkPolicyFactory()).thenReturn(WatermarkPolicyFactory.withArrivalTimePolicy());
        Mockito.when(this.read.getStreamName()).thenReturn("stream1");
        Mockito.when(this.read.getConsumerArn()).thenReturn("consumer1");
        this.checkpoint = new KinesisReaderCheckpoint(Arrays.asList(this.firstCheckpoint, this.secondCheckpoint));
        Mockito.when(this.subscribersPool.getNextRecord()).thenReturn((Object) null);
        Mockito.when(this.subscribersPool.getCheckpointMark()).thenReturn(this.checkpoint);
        Mockito.when(this.a.getApproximateArrivalTimestamp()).thenReturn(Instant.now());
        Mockito.when(this.b.getApproximateArrivalTimestamp()).thenReturn(Instant.now());
        Mockito.when(this.c.getApproximateArrivalTimestamp()).thenReturn(Instant.now());
        Mockito.when(this.d.getApproximateArrivalTimestamp()).thenReturn(Instant.now());
        this.reader = new EFOKinesisReader(this.read, this.read.getConsumerArn(), this.kinesis, this.checkpoint, this.kinesisSource) { // from class: org.apache.beam.sdk.io.aws2.kinesis.EFOKinesisReaderTest.1
            EFOShardSubscribersPool createPool() {
                return EFOKinesisReaderTest.this.subscribersPool;
            }
        };
    }

    @After
    public void after() throws IOException {
        this.reader.close();
        ((KinesisAsyncClient) Mockito.verify(this.kinesis)).close();
    }

    @Test
    public void getCheckpointMarkReturnsCheckpoints() throws IOException {
        Assertions.assertThat(Assert.assertThrows(IllegalStateException.class, () -> {
            this.reader.getCheckpointMark();
        }).getMessage()).isEqualTo("Reader was not started");
        this.reader.start();
        Assertions.assertThat(this.reader.getCheckpointMark()).isEqualTo(this.checkpoint);
    }

    @Test
    public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException {
        Assertions.assertThat(this.reader.start()).isFalse();
    }

    @Test(expected = NoSuchElementException.class)
    public void throwsNoSuchElementExceptionIfNoData() throws IOException {
        this.reader.start();
        this.reader.getCurrent();
    }

    @Test
    public void startReturnsTrueIfSomeDataAvailable() throws IOException {
        Mockito.when(this.subscribersPool.getNextRecord()).thenReturn(this.a).thenReturn((Object) null);
        Assertions.assertThat(this.reader.start()).isTrue();
    }

    @Test
    public void readsThroughAllDataAvailable() throws IOException {
        byte[] bArr = {1, 2};
        Instant now = Instant.now();
        Mockito.when(this.c.getUniqueId()).thenReturn(bArr);
        Mockito.when(this.c.getApproximateArrivalTimestamp()).thenReturn(now);
        Mockito.when(this.subscribersPool.getNextRecord()).thenReturn(this.c).thenReturn((Object) null).thenReturn(this.a).thenReturn((Object) null).thenReturn(this.d).thenReturn(this.b).thenReturn((Object) null);
        Assertions.assertThat(this.reader.start()).isTrue();
        Assertions.assertThat(this.reader.getCurrent()).isEqualTo(this.c);
        Assertions.assertThat(this.reader.getCurrentRecordId()).isEqualTo(bArr);
        Assertions.assertThat(this.reader.getCurrentTimestamp()).isEqualTo(now);
        Assertions.assertThat(this.reader.advance()).isFalse();
        Assertions.assertThat(this.reader.advance()).isTrue();
        Assertions.assertThat(this.reader.getCurrent()).isEqualTo(this.a);
        Assertions.assertThat(this.reader.advance()).isFalse();
        Assertions.assertThat(this.reader.advance()).isTrue();
        Assertions.assertThat(this.reader.getCurrent()).isEqualTo(this.d);
        Assertions.assertThat(this.reader.advance()).isTrue();
        Assertions.assertThat(this.reader.getCurrent()).isEqualTo(this.b);
        Assertions.assertThat(this.reader.advance()).isFalse();
    }

    @Test
    public void returnsCurrentWatermark() throws IOException {
        Instant instant = new Instant(123456L);
        Mockito.when(this.subscribersPool.getWatermark()).thenReturn(instant);
        this.reader.start();
        Assertions.assertThat(this.reader.getWatermark()).isEqualTo(instant);
    }
}
