package org.apache.beam.sdk.io.kinesis;

import java.io.IOException;
import java.util.Arrays;
import java.util.NoSuchElementException;
import java.util.function.BooleanSupplier;
import org.assertj.core.api.Assertions;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/apache/beam/sdk/io/kinesis/KinesisReaderTest.class */
public class KinesisReaderTest {

    @Mock
    private SimplifiedKinesisClient kinesis;

    @Mock
    private CheckpointGenerator generator;

    @Mock
    private ShardCheckpoint firstCheckpoint;

    @Mock
    private ShardCheckpoint secondCheckpoint;

    @Mock
    private KinesisRecord a;

    @Mock
    private KinesisRecord b;

    @Mock
    private KinesisRecord c;

    @Mock
    private KinesisRecord d;

    @Mock
    private KinesisSource kinesisSource;

    @Mock
    private ShardReadersPool shardReadersPool;

    @Spy
    private KinesisWatermark watermark = new KinesisWatermark();
    private KinesisReader reader;

    @Before
    public void setUp() throws TransientKinesisException {
        Mockito.when(this.generator.generate(this.kinesis)).thenReturn(new KinesisReaderCheckpoint(Arrays.asList(this.firstCheckpoint, this.secondCheckpoint)));
        Mockito.when(this.shardReadersPool.nextRecord()).thenReturn(CustomOptional.absent());
        Mockito.when(this.a.getApproximateArrivalTimestamp()).thenReturn(Instant.now());
        Mockito.when(this.b.getApproximateArrivalTimestamp()).thenReturn(Instant.now());
        Mockito.when(this.c.getApproximateArrivalTimestamp()).thenReturn(Instant.now());
        Mockito.when(this.d.getApproximateArrivalTimestamp()).thenReturn(Instant.now());
        this.reader = createReader(Duration.ZERO);
    }

    private KinesisReader createReader(Duration duration) {
        return new KinesisReader(this.kinesis, this.generator, this.kinesisSource, this.watermark, Duration.ZERO, duration) { // from class: org.apache.beam.sdk.io.kinesis.KinesisReaderTest.1
            ShardReadersPool createShardReadersPool() {
                return KinesisReaderTest.this.shardReadersPool;
            }
        };
    }

    @Test
    public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException {
        Assertions.assertThat(this.reader.start()).isFalse();
    }

    @Test(expected = NoSuchElementException.class)
    public void throwsNoSuchElementExceptionIfNoData() throws IOException {
        this.reader.start();
        this.reader.getCurrent();
    }

    @Test
    public void startReturnsTrueIfSomeDataAvailable() throws IOException {
        Mockito.when(this.shardReadersPool.nextRecord()).thenReturn(CustomOptional.of(this.a)).thenReturn(CustomOptional.absent());
        Assertions.assertThat(this.reader.start()).isTrue();
    }

    @Test
    public void readsThroughAllDataAvailable() throws IOException {
        Mockito.when(this.shardReadersPool.nextRecord()).thenReturn(CustomOptional.of(this.c)).thenReturn(CustomOptional.absent()).thenReturn(CustomOptional.of(this.a)).thenReturn(CustomOptional.absent()).thenReturn(CustomOptional.of(this.d)).thenReturn(CustomOptional.of(this.b)).thenReturn(CustomOptional.absent());
        Assertions.assertThat(this.reader.start()).isTrue();
        Assertions.assertThat(this.reader.getCurrent()).isEqualTo(this.c);
        Assertions.assertThat(this.reader.advance()).isFalse();
        Assertions.assertThat(this.reader.advance()).isTrue();
        Assertions.assertThat(this.reader.getCurrent()).isEqualTo(this.a);
        Assertions.assertThat(this.reader.advance()).isFalse();
        Assertions.assertThat(this.reader.advance()).isTrue();
        Assertions.assertThat(this.reader.getCurrent()).isEqualTo(this.d);
        Assertions.assertThat(this.reader.advance()).isTrue();
        Assertions.assertThat(this.reader.getCurrent()).isEqualTo(this.b);
        Assertions.assertThat(this.reader.advance()).isFalse();
    }

    @Test
    public void doesNotUpdateWatermarkWhenRecordsNotAvailable() throws IOException {
        Assertions.assertThat(this.reader.start()).isFalse();
        ((KinesisWatermark) Mockito.verify(this.watermark, Mockito.never())).update((Instant) Matchers.any());
    }

    @Test
    public void updatesWatermarkWhenRecordsAvailable() throws IOException {
        Mockito.when(this.shardReadersPool.nextRecord()).thenReturn(CustomOptional.of(this.c)).thenReturn(CustomOptional.absent());
        Assertions.assertThat(this.reader.start()).isTrue();
        ((KinesisWatermark) Mockito.verify(this.watermark)).update(this.c.getApproximateArrivalTimestamp());
    }

    @Test
    public void returnsCurrentWatermark() throws IOException {
        Instant instant = new Instant(123456L);
        ((KinesisWatermark) Mockito.doReturn(instant).when(this.watermark)).getCurrent((BooleanSupplier) Matchers.any());
        this.reader.start();
        Assertions.assertThat(this.reader.getWatermark()).isEqualTo(instant);
    }

    @Test
    public void getTotalBacklogBytesShouldReturnLastSeenValueWhenKinesisExceptionsOccur() throws TransientKinesisException, IOException {
        this.reader.start();
        Mockito.when(this.kinesisSource.getStreamName()).thenReturn("stream1");
        Mockito.when(Long.valueOf(this.kinesis.getBacklogBytes((String) Matchers.eq("stream1"), (Instant) Matchers.any(Instant.class)))).thenReturn(10L).thenThrow(new Class[]{TransientKinesisException.class}).thenReturn(20L);
        Assertions.assertThat(this.reader.getTotalBacklogBytes()).isEqualTo(10L);
        Assertions.assertThat(this.reader.getTotalBacklogBytes()).isEqualTo(10L);
        Assertions.assertThat(this.reader.getTotalBacklogBytes()).isEqualTo(20L);
    }

    @Test
    public void getTotalBacklogBytesShouldReturnLastSeenValueWhenCalledFrequently() throws TransientKinesisException, IOException {
        KinesisReader createReader = createReader(Duration.standardSeconds(30L));
        createReader.start();
        Mockito.when(this.kinesisSource.getStreamName()).thenReturn("stream1");
        Mockito.when(Long.valueOf(this.kinesis.getBacklogBytes((String) Matchers.eq("stream1"), (Instant) Matchers.any(Instant.class)))).thenReturn(10L).thenReturn(20L);
        Assertions.assertThat(createReader.getTotalBacklogBytes()).isEqualTo(10L);
        Assertions.assertThat(createReader.getTotalBacklogBytes()).isEqualTo(10L);
    }
}
