/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.plugin.stream.kinesis;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.pinot.plugin.stream.kinesis.KinesisConnectionHandler;
import org.apache.pinot.plugin.stream.kinesis.KinesisPartitionGroupOffset;
import org.apache.pinot.plugin.stream.kinesis.KinesisRecordsBatch;
import org.apache.pinot.plugin.stream.kinesis.KinesisStreamMetadataProvider;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

public class KinesisStreamMetadataProviderTest {
    private static final String STREAM_NAME = "kinesis-test";
    private static final String AWS_REGION = "us-west-2";
    private static final String SHARD_ID_0 = "0";
    private static final String SHARD_ID_1 = "1";
    private static final String CLIENT_ID = "dummy";
    private static final int TIMEOUT = 1000;
    private KinesisConnectionHandler _kinesisConnectionHandler;
    private KinesisStreamMetadataProvider _kinesisStreamMetadataProvider;
    private StreamConsumerFactory _streamConsumerFactory;
    private PartitionGroupConsumer _partitionGroupConsumer;

    private StreamConfig getStreamConfig() {
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("region", AWS_REGION);
        props.put("maxRecordsToFetch", "10");
        props.put("shardIteratorType", ShardIteratorType.AT_SEQUENCE_NUMBER.toString());
        props.put("streamType", "kinesis");
        props.put("stream.kinesis.consumer.type", "lowLevel");
        props.put("stream.kinesis.topic.name", STREAM_NAME);
        props.put("stream.kinesis.decoder.class.name", "ABCD");
        props.put("stream.kinesis.consumer.factory.class.name", "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory");
        return new StreamConfig("", props);
    }

    @BeforeMethod
    public void setupTest() {
        this._kinesisConnectionHandler = (KinesisConnectionHandler)EasyMock.createMock(KinesisConnectionHandler.class);
        this._streamConsumerFactory = (StreamConsumerFactory)EasyMock.createMock(StreamConsumerFactory.class);
        this._partitionGroupConsumer = (PartitionGroupConsumer)EasyMock.createNiceMock(PartitionGroupConsumer.class);
        this._kinesisStreamMetadataProvider = new KinesisStreamMetadataProvider(CLIENT_ID, this.getStreamConfig(), this._kinesisConnectionHandler, this._streamConsumerFactory);
    }

    @Test
    public void getPartitionsGroupInfoListTest() throws Exception {
        Shard shard0 = (Shard)Shard.builder().shardId(SHARD_ID_0).sequenceNumberRange((SequenceNumberRange)SequenceNumberRange.builder().startingSequenceNumber(SHARD_ID_1).build()).build();
        Shard shard1 = (Shard)Shard.builder().shardId(SHARD_ID_1).sequenceNumberRange((SequenceNumberRange)SequenceNumberRange.builder().startingSequenceNumber(SHARD_ID_1).build()).build();
        EasyMock.expect((Object)this._kinesisConnectionHandler.getShards()).andReturn((Object)ImmutableList.of((Object)shard0, (Object)shard1)).anyTimes();
        EasyMock.replay((Object[])new Object[]{this._kinesisConnectionHandler});
        List result = this._kinesisStreamMetadataProvider.computePartitionGroupMetadata(CLIENT_ID, this.getStreamConfig(), new ArrayList(), 1000);
        Assert.assertEquals((int)result.size(), (int)2);
        Assert.assertEquals((int)((PartitionGroupMetadata)result.get(0)).getPartitionGroupId(), (int)0);
        Assert.assertEquals((int)((PartitionGroupMetadata)result.get(1)).getPartitionGroupId(), (int)1);
    }

    @Test
    public void getPartitionsGroupInfoEndOfShardTest() throws Exception {
        ArrayList<PartitionGroupConsumptionStatus> currentPartitionGroupMeta = new ArrayList<PartitionGroupConsumptionStatus>();
        HashMap<String, String> shardToSequenceMap = new HashMap<String, String>();
        shardToSequenceMap.put(SHARD_ID_0, SHARD_ID_1);
        KinesisPartitionGroupOffset kinesisPartitionGroupOffset = new KinesisPartitionGroupOffset(shardToSequenceMap);
        currentPartitionGroupMeta.add(new PartitionGroupConsumptionStatus(0, 1, (StreamPartitionMsgOffset)kinesisPartitionGroupOffset, (StreamPartitionMsgOffset)kinesisPartitionGroupOffset, "CONSUMING"));
        Capture checkpointArgs = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Capture partitionGroupMetadataCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Capture intArguments = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Capture stringCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Shard shard0 = (Shard)Shard.builder().shardId(SHARD_ID_0).sequenceNumberRange((SequenceNumberRange)SequenceNumberRange.builder().startingSequenceNumber(SHARD_ID_1).endingSequenceNumber(SHARD_ID_1).build()).build();
        Shard shard1 = (Shard)Shard.builder().shardId(SHARD_ID_1).sequenceNumberRange((SequenceNumberRange)SequenceNumberRange.builder().startingSequenceNumber(SHARD_ID_1).build()).build();
        EasyMock.expect((Object)this._kinesisConnectionHandler.getShards()).andReturn((Object)ImmutableList.of((Object)shard0, (Object)shard1)).anyTimes();
        EasyMock.expect((Object)this._streamConsumerFactory.createPartitionGroupConsumer((String)EasyMock.capture((Capture)stringCapture), (PartitionGroupConsumptionStatus)EasyMock.capture((Capture)partitionGroupMetadataCapture))).andReturn((Object)this._partitionGroupConsumer).anyTimes();
        EasyMock.expect((Object)this._partitionGroupConsumer.fetchMessages((StreamPartitionMsgOffset)EasyMock.capture((Capture)checkpointArgs), (StreamPartitionMsgOffset)EasyMock.capture((Capture)checkpointArgs), EasyMock.captureInt((Capture)intArguments))).andReturn((Object)new KinesisRecordsBatch(new ArrayList(), SHARD_ID_0, true)).anyTimes();
        EasyMock.replay((Object[])new Object[]{this._kinesisConnectionHandler, this._streamConsumerFactory, this._partitionGroupConsumer});
        List result = this._kinesisStreamMetadataProvider.computePartitionGroupMetadata(CLIENT_ID, this.getStreamConfig(), currentPartitionGroupMeta, 1000);
        Assert.assertEquals((int)result.size(), (int)1);
        Assert.assertEquals((int)((PartitionGroupMetadata)result.get(0)).getPartitionGroupId(), (int)1);
    }

    @Test
    public void getPartitionsGroupInfoChildShardsest() throws Exception {
        ArrayList<PartitionGroupConsumptionStatus> currentPartitionGroupMeta = new ArrayList<PartitionGroupConsumptionStatus>();
        HashMap<String, String> shardToSequenceMap = new HashMap<String, String>();
        shardToSequenceMap.put(SHARD_ID_1, SHARD_ID_1);
        KinesisPartitionGroupOffset kinesisPartitionGroupOffset = new KinesisPartitionGroupOffset(shardToSequenceMap);
        currentPartitionGroupMeta.add(new PartitionGroupConsumptionStatus(0, 1, (StreamPartitionMsgOffset)kinesisPartitionGroupOffset, (StreamPartitionMsgOffset)kinesisPartitionGroupOffset, "CONSUMING"));
        Capture checkpointArgs = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Capture partitionGroupMetadataCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Capture intArguments = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Capture stringCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Shard shard0 = (Shard)Shard.builder().shardId(SHARD_ID_0).parentShardId(SHARD_ID_1).sequenceNumberRange((SequenceNumberRange)SequenceNumberRange.builder().startingSequenceNumber(SHARD_ID_1).build()).build();
        Shard shard1 = (Shard)Shard.builder().shardId(SHARD_ID_1).sequenceNumberRange((SequenceNumberRange)SequenceNumberRange.builder().startingSequenceNumber(SHARD_ID_1).endingSequenceNumber(SHARD_ID_1).build()).build();
        EasyMock.expect((Object)this._kinesisConnectionHandler.getShards()).andReturn((Object)ImmutableList.of((Object)shard0, (Object)shard1)).anyTimes();
        EasyMock.expect((Object)this._streamConsumerFactory.createPartitionGroupConsumer((String)EasyMock.capture((Capture)stringCapture), (PartitionGroupConsumptionStatus)EasyMock.capture((Capture)partitionGroupMetadataCapture))).andReturn((Object)this._partitionGroupConsumer).anyTimes();
        EasyMock.expect((Object)this._partitionGroupConsumer.fetchMessages((StreamPartitionMsgOffset)EasyMock.capture((Capture)checkpointArgs), (StreamPartitionMsgOffset)EasyMock.capture((Capture)checkpointArgs), EasyMock.captureInt((Capture)intArguments))).andReturn((Object)new KinesisRecordsBatch(new ArrayList(), SHARD_ID_0, true)).anyTimes();
        EasyMock.replay((Object[])new Object[]{this._kinesisConnectionHandler, this._streamConsumerFactory, this._partitionGroupConsumer});
        List result = this._kinesisStreamMetadataProvider.computePartitionGroupMetadata(CLIENT_ID, this.getStreamConfig(), currentPartitionGroupMeta, 1000);
        Assert.assertEquals((int)result.size(), (int)1);
        Assert.assertEquals((int)((PartitionGroupMetadata)result.get(0)).getPartitionGroupId(), (int)0);
    }
}

