/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.IOException;
import java.util.Arrays;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.io.aws2.kinesis.CheckpointGenerator;
import org.apache.beam.sdk.io.aws2.kinesis.CustomOptional;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisReader;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisReaderCheckpoint;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisSource;
import org.apache.beam.sdk.io.aws2.kinesis.RateLimitPolicyFactory;
import org.apache.beam.sdk.io.aws2.kinesis.ShardCheckpoint;
import org.apache.beam.sdk.io.aws2.kinesis.ShardReadersPool;
import org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient;
import org.apache.beam.sdk.io.aws2.kinesis.TransientKinesisException;
import org.apache.beam.sdk.io.aws2.kinesis.WatermarkPolicyFactory;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.assertj.core.api.Assertions;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(value=MockitoJUnitRunner.Silent.class)
public class KinesisReaderTest {
    @Mock
    private SimplifiedKinesisClient kinesis;
    @Mock
    private CheckpointGenerator generator;
    @Mock
    private ShardCheckpoint firstCheckpoint;
    @Mock
    private ShardCheckpoint secondCheckpoint;
    @Mock
    private KinesisRecord a;
    @Mock
    private KinesisRecord b;
    @Mock
    private KinesisRecord c;
    @Mock
    private KinesisRecord d;
    @Mock
    private KinesisSource kinesisSource;
    @Mock
    private ShardReadersPool shardReadersPool;
    private KinesisReader reader;

    @Before
    public void setUp() throws TransientKinesisException {
        Mockito.when((Object)this.generator.generate(this.kinesis)).thenReturn((Object)new KinesisReaderCheckpoint(Arrays.asList(this.firstCheckpoint, this.secondCheckpoint)));
        Mockito.when((Object)this.shardReadersPool.nextRecord()).thenReturn((Object)CustomOptional.absent());
        Mockito.when((Object)this.a.getApproximateArrivalTimestamp()).thenReturn((Object)Instant.now());
        Mockito.when((Object)this.b.getApproximateArrivalTimestamp()).thenReturn((Object)Instant.now());
        Mockito.when((Object)this.c.getApproximateArrivalTimestamp()).thenReturn((Object)Instant.now());
        Mockito.when((Object)this.d.getApproximateArrivalTimestamp()).thenReturn((Object)Instant.now());
        this.reader = (KinesisReader)Mockito.spy((Object)this.createReader(Duration.ZERO));
    }

    private KinesisReader createReader(Duration backlogBytesCheckThreshold) {
        return new KinesisReader(this.kinesis, this.generator, this.kinesisSource, WatermarkPolicyFactory.withArrivalTimePolicy(), RateLimitPolicyFactory.withoutLimiter(), Duration.ZERO, backlogBytesCheckThreshold, 10000){

            ShardReadersPool createShardReadersPool() {
                return KinesisReaderTest.this.shardReadersPool;
            }
        };
    }

    @Test
    public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException {
        Assertions.assertThat((boolean)this.reader.start()).isFalse();
    }

    @Test(expected=NoSuchElementException.class)
    public void throwsNoSuchElementExceptionIfNoData() throws IOException {
        this.reader.start();
        this.reader.getCurrent();
    }

    @Test
    public void startReturnsTrueIfSomeDataAvailable() throws IOException {
        Mockito.when((Object)this.shardReadersPool.nextRecord()).thenReturn((Object)CustomOptional.of((Object)this.a)).thenReturn((Object)CustomOptional.absent());
        Assertions.assertThat((boolean)this.reader.start()).isTrue();
    }

    @Test
    public void readsThroughAllDataAvailable() throws IOException {
        Mockito.when((Object)this.shardReadersPool.nextRecord()).thenReturn((Object)CustomOptional.of((Object)this.c)).thenReturn((Object)CustomOptional.absent()).thenReturn((Object)CustomOptional.of((Object)this.a)).thenReturn((Object)CustomOptional.absent()).thenReturn((Object)CustomOptional.of((Object)this.d)).thenReturn((Object)CustomOptional.of((Object)this.b)).thenReturn((Object)CustomOptional.absent());
        Assertions.assertThat((boolean)this.reader.start()).isTrue();
        Assertions.assertThat((Object)this.reader.getCurrent()).isEqualTo((Object)this.c);
        Assertions.assertThat((boolean)this.reader.advance()).isFalse();
        Assertions.assertThat((boolean)this.reader.advance()).isTrue();
        Assertions.assertThat((Object)this.reader.getCurrent()).isEqualTo((Object)this.a);
        Assertions.assertThat((boolean)this.reader.advance()).isFalse();
        Assertions.assertThat((boolean)this.reader.advance()).isTrue();
        Assertions.assertThat((Object)this.reader.getCurrent()).isEqualTo((Object)this.d);
        Assertions.assertThat((boolean)this.reader.advance()).isTrue();
        Assertions.assertThat((Object)this.reader.getCurrent()).isEqualTo((Object)this.b);
        Assertions.assertThat((boolean)this.reader.advance()).isFalse();
    }

    @Test
    public void returnsCurrentWatermark() throws IOException {
        Instant expectedWatermark = new Instant(123456L);
        Mockito.when((Object)this.shardReadersPool.getWatermark()).thenReturn((Object)expectedWatermark);
        this.reader.start();
        Instant currentWatermark = this.reader.getWatermark();
        Assertions.assertThat((Comparable)currentWatermark).isEqualTo((Object)expectedWatermark);
    }

    @Test
    public void getSplitBacklogBytesShouldReturnLastSeenValueWhenKinesisExceptionsOccur() throws TransientKinesisException, IOException {
        this.reader.start();
        Mockito.when((Object)this.kinesisSource.getStreamName()).thenReturn((Object)"stream1");
        Mockito.when((Object)this.shardReadersPool.getLatestRecordTimestamp()).thenReturn((Object)Instant.now().minus((ReadableDuration)Duration.standardMinutes((long)1L)));
        Mockito.when((Object)this.kinesis.getBacklogBytes((String)ArgumentMatchers.eq((Object)"stream1"), (Instant)ArgumentMatchers.any(Instant.class))).thenReturn((Object)10L).thenThrow(TransientKinesisException.class).thenReturn((Object)20L);
        Assertions.assertThat((long)this.reader.getSplitBacklogBytes()).isEqualTo(10L);
        Assertions.assertThat((long)this.reader.getSplitBacklogBytes()).isEqualTo(10L);
        Assertions.assertThat((long)this.reader.getSplitBacklogBytes()).isEqualTo(20L);
    }

    @Test
    public void getSplitBacklogBytesShouldReturnLastSeenValueWhenCalledFrequently() throws TransientKinesisException, IOException {
        KinesisReader backlogCachingReader = (KinesisReader)Mockito.spy((Object)this.createReader(Duration.standardSeconds((long)30L)));
        backlogCachingReader.start();
        Mockito.when((Object)this.shardReadersPool.getLatestRecordTimestamp()).thenReturn((Object)Instant.now().minus((ReadableDuration)Duration.standardMinutes((long)1L)));
        Mockito.when((Object)this.kinesisSource.getStreamName()).thenReturn((Object)"stream1");
        Mockito.when((Object)this.kinesis.getBacklogBytes((String)ArgumentMatchers.eq((Object)"stream1"), (Instant)ArgumentMatchers.any(Instant.class))).thenReturn((Object)10L).thenReturn((Object)20L);
        Assertions.assertThat((long)backlogCachingReader.getSplitBacklogBytes()).isEqualTo(10L);
        Assertions.assertThat((long)backlogCachingReader.getSplitBacklogBytes()).isEqualTo(10L);
    }

    @Test
    public void getSplitBacklogBytesShouldReturnBacklogUnknown() throws IOException, TransientKinesisException {
        this.reader.start();
        Mockito.when((Object)this.kinesisSource.getStreamName()).thenReturn((Object)"stream1");
        Mockito.when((Object)this.shardReadersPool.getLatestRecordTimestamp()).thenReturn((Object)BoundedWindow.TIMESTAMP_MIN_VALUE).thenReturn((Object)Instant.now().minus((ReadableDuration)Duration.standardMinutes((long)1L)));
        Mockito.when((Object)this.kinesis.getBacklogBytes((String)ArgumentMatchers.eq((Object)"stream1"), (Instant)ArgumentMatchers.any(Instant.class))).thenReturn((Object)10L);
        Assertions.assertThat((long)this.reader.getSplitBacklogBytes()).isEqualTo(-1L);
        Assertions.assertThat((long)this.reader.getSplitBacklogBytes()).isEqualTo(10L);
    }
}

