/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.plugin.stream.kinesis;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.pinot.plugin.stream.kinesis.KinesisConfig;
import org.apache.pinot.plugin.stream.kinesis.KinesisConnectionHandler;
import org.apache.pinot.plugin.stream.kinesis.KinesisConsumer;
import org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory;
import org.apache.pinot.plugin.stream.kinesis.KinesisPartitionGroupOffset;
import org.apache.pinot.plugin.stream.kinesis.KinesisRecordsBatch;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

public class KinesisConsumerTest {
    private static final String STREAM_TYPE = "kinesis";
    private static final String TABLE_NAME_WITH_TYPE = "kinesisTest_REALTIME";
    private static final String STREAM_NAME = "kinesis-test";
    private static final String AWS_REGION = "us-west-2";
    private static final int TIMEOUT = 1000;
    private static final int NUM_RECORDS = 10;
    private static final String DUMMY_RECORD_PREFIX = "DUMMY_RECORD-";
    private static final String PARTITION_KEY_PREFIX = "PARTITION_KEY-";
    private static final String PLACEHOLDER = "DUMMY";
    private static final int MAX_RECORDS_TO_FETCH = 20;
    private KinesisConnectionHandler _kinesisConnectionHandler;
    private StreamConsumerFactory _streamConsumerFactory;
    private KinesisClient _kinesisClient;
    private List<Record> _recordList;

    private KinesisConfig getKinesisConfig() {
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("streamType", STREAM_TYPE);
        props.put(StreamConfigProperties.constructStreamProperty((String)STREAM_TYPE, (String)"topic.name"), STREAM_NAME);
        props.put(StreamConfigProperties.constructStreamProperty((String)STREAM_TYPE, (String)"consumer.type"), StreamConfig.ConsumerType.LOWLEVEL.toString());
        props.put(StreamConfigProperties.constructStreamProperty((String)STREAM_TYPE, (String)"consumer.factory.class.name"), KinesisConsumerFactory.class.getName());
        props.put(StreamConfigProperties.constructStreamProperty((String)STREAM_TYPE, (String)"decoder.class.name"), "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder");
        props.put("region", AWS_REGION);
        props.put("maxRecordsToFetch", String.valueOf(20));
        props.put("shardIteratorType", ShardIteratorType.AT_SEQUENCE_NUMBER.toString());
        return new KinesisConfig(new StreamConfig(TABLE_NAME_WITH_TYPE, props));
    }

    @BeforeMethod
    public void setupTest() {
        this._kinesisConnectionHandler = (KinesisConnectionHandler)EasyMock.createMock(KinesisConnectionHandler.class);
        this._kinesisClient = (KinesisClient)EasyMock.createMock(KinesisClient.class);
        this._streamConsumerFactory = (StreamConsumerFactory)EasyMock.createMock(StreamConsumerFactory.class);
        this._recordList = new ArrayList<Record>();
        for (int i = 0; i < 10; ++i) {
            Record record = (Record)Record.builder().data(SdkBytes.fromUtf8String((String)(DUMMY_RECORD_PREFIX + i))).partitionKey(PARTITION_KEY_PREFIX + i).sequenceNumber(String.valueOf(i + 1)).build();
            this._recordList.add(record);
        }
    }

    @Test
    public void testBasicConsumer() {
        Capture getRecordsRequestCapture = Capture.newInstance();
        Capture getShardIteratorRequestCapture = Capture.newInstance();
        GetRecordsResponse getRecordsResponse = (GetRecordsResponse)GetRecordsResponse.builder().nextShardIterator(null).records(this._recordList).build();
        GetShardIteratorResponse getShardIteratorResponse = (GetShardIteratorResponse)GetShardIteratorResponse.builder().shardIterator(PLACEHOLDER).build();
        EasyMock.expect((Object)this._kinesisClient.getRecords((GetRecordsRequest)EasyMock.capture((Capture)getRecordsRequestCapture))).andReturn((Object)getRecordsResponse).anyTimes();
        EasyMock.expect((Object)this._kinesisClient.getShardIterator((GetShardIteratorRequest)EasyMock.capture((Capture)getShardIteratorRequestCapture))).andReturn((Object)getShardIteratorResponse).anyTimes();
        EasyMock.replay((Object[])new Object[]{this._kinesisClient});
        KinesisConsumer kinesisConsumer = new KinesisConsumer(this.getKinesisConfig(), this._kinesisClient);
        HashMap<String, String> shardToSequenceMap = new HashMap<String, String>();
        shardToSequenceMap.put("0", "1");
        KinesisPartitionGroupOffset kinesisPartitionGroupOffset = new KinesisPartitionGroupOffset(shardToSequenceMap);
        KinesisRecordsBatch kinesisRecordsBatch = kinesisConsumer.fetchMessages((StreamPartitionMsgOffset)kinesisPartitionGroupOffset, null, 1000);
        Assert.assertEquals((int)kinesisRecordsBatch.getMessageCount(), (int)10);
        for (int i = 0; i < 10; ++i) {
            Assert.assertEquals((String)this.baToString(kinesisRecordsBatch.getMessageAtIndex(i)), (String)(DUMMY_RECORD_PREFIX + i));
        }
        Assert.assertFalse((boolean)kinesisRecordsBatch.isEndOfPartitionGroup());
    }

    @Test
    public void testBasicConsumerWithMaxRecordsLimit() {
        Capture getRecordsRequestCapture = Capture.newInstance();
        Capture getShardIteratorRequestCapture = Capture.newInstance();
        GetRecordsResponse getRecordsResponse = (GetRecordsResponse)GetRecordsResponse.builder().nextShardIterator(PLACEHOLDER).records(this._recordList).build();
        GetShardIteratorResponse getShardIteratorResponse = (GetShardIteratorResponse)GetShardIteratorResponse.builder().shardIterator(PLACEHOLDER).build();
        EasyMock.expect((Object)this._kinesisClient.getRecords((GetRecordsRequest)EasyMock.capture((Capture)getRecordsRequestCapture))).andReturn((Object)getRecordsResponse).anyTimes();
        EasyMock.expect((Object)this._kinesisClient.getShardIterator((GetShardIteratorRequest)EasyMock.capture((Capture)getShardIteratorRequestCapture))).andReturn((Object)getShardIteratorResponse).anyTimes();
        EasyMock.replay((Object[])new Object[]{this._kinesisClient});
        KinesisConfig kinesisConfig = this.getKinesisConfig();
        KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, this._kinesisClient);
        HashMap<String, String> shardToSequenceMap = new HashMap<String, String>();
        shardToSequenceMap.put("0", "1");
        KinesisPartitionGroupOffset kinesisPartitionGroupOffset = new KinesisPartitionGroupOffset(shardToSequenceMap);
        KinesisRecordsBatch kinesisRecordsBatch = kinesisConsumer.fetchMessages((StreamPartitionMsgOffset)kinesisPartitionGroupOffset, null, 1000);
        Assert.assertEquals((int)kinesisRecordsBatch.getMessageCount(), (int)20);
        for (int i = 0; i < 10; ++i) {
            Assert.assertEquals((String)this.baToString(kinesisRecordsBatch.getMessageAtIndex(i)), (String)(DUMMY_RECORD_PREFIX + i));
        }
    }

    @Test
    public void testBasicConsumerWithChildShard() {
        ArrayList<ChildShard> shardList = new ArrayList<ChildShard>();
        shardList.add((ChildShard)ChildShard.builder().shardId(PLACEHOLDER).parentShards(new String[]{"0"}).build());
        Capture getRecordsRequestCapture = Capture.newInstance();
        Capture getShardIteratorRequestCapture = Capture.newInstance();
        GetRecordsResponse getRecordsResponse = (GetRecordsResponse)GetRecordsResponse.builder().nextShardIterator(null).records(this._recordList).childShards(shardList).build();
        GetShardIteratorResponse getShardIteratorResponse = (GetShardIteratorResponse)GetShardIteratorResponse.builder().shardIterator(PLACEHOLDER).build();
        EasyMock.expect((Object)this._kinesisClient.getRecords((GetRecordsRequest)EasyMock.capture((Capture)getRecordsRequestCapture))).andReturn((Object)getRecordsResponse).anyTimes();
        EasyMock.expect((Object)this._kinesisClient.getShardIterator((GetShardIteratorRequest)EasyMock.capture((Capture)getShardIteratorRequestCapture))).andReturn((Object)getShardIteratorResponse).anyTimes();
        EasyMock.replay((Object[])new Object[]{this._kinesisClient});
        KinesisConfig kinesisConfig = this.getKinesisConfig();
        KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, this._kinesisClient);
        HashMap<String, String> shardToSequenceMap = new HashMap<String, String>();
        shardToSequenceMap.put("0", "1");
        KinesisPartitionGroupOffset kinesisPartitionGroupOffset = new KinesisPartitionGroupOffset(shardToSequenceMap);
        KinesisRecordsBatch kinesisRecordsBatch = kinesisConsumer.fetchMessages((StreamPartitionMsgOffset)kinesisPartitionGroupOffset, null, 1000);
        Assert.assertTrue((boolean)kinesisRecordsBatch.isEndOfPartitionGroup());
        Assert.assertEquals((int)kinesisRecordsBatch.getMessageCount(), (int)10);
        for (int i = 0; i < 10; ++i) {
            Assert.assertEquals((String)this.baToString(kinesisRecordsBatch.getMessageAtIndex(i)), (String)(DUMMY_RECORD_PREFIX + i));
        }
    }

    public String baToString(byte[] bytes) {
        return SdkBytes.fromByteArray((byte[])bytes).asUtf8String();
    }
}

