/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.IOException;
import java.util.Arrays;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.io.aws2.kinesis.EFOKinesisReader;
import org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscribersPool;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisReaderCheckpoint;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisSource;
import org.apache.beam.sdk.io.aws2.kinesis.ShardCheckpoint;
import org.apache.beam.sdk.io.aws2.kinesis.WatermarkPolicyFactory;
import org.assertj.core.api.Assertions;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;

@RunWith(value=MockitoJUnitRunner.Silent.class)
public class EFOKinesisReaderTest {
    @Mock
    private KinesisAsyncClient kinesis;
    @Mock
    private KinesisIO.Read read;
    @Mock
    private KinesisReaderCheckpoint checkpoint;
    @Mock
    private ShardCheckpoint firstCheckpoint;
    @Mock
    private ShardCheckpoint secondCheckpoint;
    @Mock
    private KinesisRecord a;
    @Mock
    private KinesisRecord b;
    @Mock
    private KinesisRecord c;
    @Mock
    private KinesisRecord d;
    @Mock
    private KinesisSource kinesisSource;
    @Mock
    private EFOShardSubscribersPool subscribersPool;
    private EFOKinesisReader reader;

    @Before
    public void setUp() throws IOException {
        Mockito.when((Object)this.read.getWatermarkPolicyFactory()).thenReturn((Object)WatermarkPolicyFactory.withArrivalTimePolicy());
        Mockito.when((Object)this.read.getStreamName()).thenReturn((Object)"stream1");
        Mockito.when((Object)this.read.getConsumerArn()).thenReturn((Object)"consumer1");
        this.checkpoint = new KinesisReaderCheckpoint(Arrays.asList(this.firstCheckpoint, this.secondCheckpoint));
        Mockito.when((Object)this.subscribersPool.getNextRecord()).thenReturn(null);
        Mockito.when((Object)this.subscribersPool.getCheckpointMark()).thenReturn((Object)this.checkpoint);
        Mockito.when((Object)this.a.getApproximateArrivalTimestamp()).thenReturn((Object)Instant.now());
        Mockito.when((Object)this.b.getApproximateArrivalTimestamp()).thenReturn((Object)Instant.now());
        Mockito.when((Object)this.c.getApproximateArrivalTimestamp()).thenReturn((Object)Instant.now());
        Mockito.when((Object)this.d.getApproximateArrivalTimestamp()).thenReturn((Object)Instant.now());
        this.reader = new EFOKinesisReader(this.read, this.read.getConsumerArn(), this.kinesis, this.checkpoint, this.kinesisSource){

            EFOShardSubscribersPool createPool() {
                return EFOKinesisReaderTest.this.subscribersPool;
            }
        };
    }

    @After
    public void after() throws IOException {
        this.reader.close();
        ((KinesisAsyncClient)Mockito.verify((Object)this.kinesis)).close();
    }

    @Test
    public void getCheckpointMarkReturnsCheckpoints() throws IOException {
        Throwable e = Assert.assertThrows(IllegalStateException.class, () -> this.reader.getCheckpointMark());
        Assertions.assertThat((String)e.getMessage()).isEqualTo((Object)"Reader was not started");
        this.reader.start();
        Assertions.assertThat((Object)this.reader.getCheckpointMark()).isEqualTo((Object)this.checkpoint);
    }

    @Test
    public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException {
        Assertions.assertThat((boolean)this.reader.start()).isFalse();
    }

    @Test(expected=NoSuchElementException.class)
    public void throwsNoSuchElementExceptionIfNoData() throws IOException {
        this.reader.start();
        this.reader.getCurrent();
    }

    @Test
    public void startReturnsTrueIfSomeDataAvailable() throws IOException {
        Mockito.when((Object)this.subscribersPool.getNextRecord()).thenReturn((Object)this.a).thenReturn(null);
        Assertions.assertThat((boolean)this.reader.start()).isTrue();
    }

    @Test
    public void readsThroughAllDataAvailable() throws IOException {
        byte[] cId = new byte[]{1, 2};
        Instant cTs = Instant.now();
        Mockito.when((Object)this.c.getUniqueId()).thenReturn((Object)cId);
        Mockito.when((Object)this.c.getApproximateArrivalTimestamp()).thenReturn((Object)cTs);
        Mockito.when((Object)this.subscribersPool.getNextRecord()).thenReturn((Object)this.c).thenReturn(null).thenReturn((Object)this.a).thenReturn(null).thenReturn((Object)this.d).thenReturn((Object)this.b).thenReturn(null);
        Assertions.assertThat((boolean)this.reader.start()).isTrue();
        Assertions.assertThat((Object)this.reader.getCurrent()).isEqualTo((Object)this.c);
        Assertions.assertThat((byte[])this.reader.getCurrentRecordId()).isEqualTo((Object)cId);
        Assertions.assertThat((Comparable)this.reader.getCurrentTimestamp()).isEqualTo((Object)cTs);
        Assertions.assertThat((boolean)this.reader.advance()).isFalse();
        Assertions.assertThat((boolean)this.reader.advance()).isTrue();
        Assertions.assertThat((Object)this.reader.getCurrent()).isEqualTo((Object)this.a);
        Assertions.assertThat((boolean)this.reader.advance()).isFalse();
        Assertions.assertThat((boolean)this.reader.advance()).isTrue();
        Assertions.assertThat((Object)this.reader.getCurrent()).isEqualTo((Object)this.d);
        Assertions.assertThat((boolean)this.reader.advance()).isTrue();
        Assertions.assertThat((Object)this.reader.getCurrent()).isEqualTo((Object)this.b);
        Assertions.assertThat((boolean)this.reader.advance()).isFalse();
    }

    @Test
    public void returnsCurrentWatermark() throws IOException {
        Instant expectedWatermark = new Instant(123456L);
        Mockito.when((Object)this.subscribersPool.getWatermark()).thenReturn((Object)expectedWatermark);
        this.reader.start();
        Instant currentWatermark = this.reader.getWatermark();
        Assertions.assertThat((Comparable)currentWatermark).isEqualTo((Object)expectedWatermark);
    }
}

