/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIOOptions;
import org.apache.beam.sdk.io.aws2.kinesis.RecordsAggregator;
import org.apache.beam.sdk.io.aws2.kinesis.TimeUtil;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.StartingPosition;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.kinesis.common.InitialPositionInStream;

class TestHelpers {
    static final int SHARD_EVENTS = 100;
    private static final String STREAM = "stream-01";
    private static final String CONSUMER = "consumer-01";

    TestHelpers() {
    }

    static void mockShards(KinesisClient client, int count) {
        IntFunction<Shard> shard = i -> (Shard)Shard.builder().shardId(Integer.toString(i)).build();
        List shards = IntStream.range(0, count).mapToObj(shard).collect(Collectors.toList());
        Mockito.when((Object)client.listShards((ListShardsRequest)ArgumentMatchers.any(ListShardsRequest.class))).thenReturn((Object)((ListShardsResponse)ListShardsResponse.builder().shards(shards).build()));
    }

    static void mockShardIterators(KinesisClient client, List<List<Record>> data) {
        for (int id = 0; id < data.size(); ++id) {
            Mockito.when((Object)client.getShardIterator((GetShardIteratorRequest)ArgumentMatchers.argThat(TestHelpers.hasShardId(id)))).thenReturn((Object)((GetShardIteratorResponse)GetShardIteratorResponse.builder().shardIterator(id + ":0").build()));
        }
    }

    static void mockRecords(KinesisClient client, List<List<Record>> data, int limit) {
        BiFunction<List, String, GetRecordsResponse.Builder> resp = (recs, it) -> GetRecordsResponse.builder().millisBehindLatest(Long.valueOf(0L)).records((Collection)recs).nextShardIterator(it);
        for (int shard = 0; shard < data.size(); ++shard) {
            List<Record> records = data.get(shard);
            for (int i = 0; i < records.size(); i += limit) {
                int to = Math.max(i + limit, records.size());
                String nextIt = to == records.size() ? "done" : shard + ":" + to;
                Mockito.when((Object)client.getRecords((GetRecordsRequest)ArgumentMatchers.argThat(TestHelpers.hasShardIterator(shard + ":" + i)))).thenReturn((Object)((GetRecordsResponse)resp.apply(records.subList(i, to), nextIt).build()));
            }
        }
        Mockito.when((Object)client.getRecords((GetRecordsRequest)ArgumentMatchers.argThat(TestHelpers.hasShardIterator("done")))).thenReturn((Object)((GetRecordsResponse)resp.apply((List)ImmutableList.of(), "done").build()));
    }

    static List<List<Record>> createRecords(int shards, int events) {
        Instant now = DateTime.now().toInstant();
        Function<Integer, List> dataStream = shard -> IntStream.range(0, events).mapToObj(off -> TestHelpers.record(now, shard, off)).collect(Collectors.toList());
        return IntStream.range(0, shards).boxed().map(dataStream).collect(Collectors.toList());
    }

    static List<List<Record>> createAggregatedRecords(int shards, int events) {
        Instant now = DateTime.now().toInstant();
        Function<Integer, List> dataStream = shard -> {
            RecordsAggregator aggregator = new RecordsAggregator(1024, new Instant());
            List records = IntStream.range(0, events).mapToObj(off -> TestHelpers.record(now, shard, off)).collect(Collectors.toList());
            for (Record record : records) {
                aggregator.addRecord(record.partitionKey(), null, record.data().asByteArray());
            }
            return ImmutableList.of((Object)TestHelpers.recordWithCustomPayload(now, shard, 0, aggregator.toBytes()));
        };
        return IntStream.range(0, shards).boxed().map(dataStream).collect(Collectors.toList());
    }

    static Record record(Instant arrival, byte[] data, String seqNum) {
        return (Record)Record.builder().approximateArrivalTimestamp(TimeUtil.toJava((Instant)arrival)).data(SdkBytes.fromByteArray((byte[])data)).sequenceNumber(seqNum).partitionKey("foo-part-key").build();
    }

    static KinesisIO.Read createReadSpec() {
        return KinesisIO.read().withStreamName(STREAM).withInitialPositionInStream(InitialPositionInStream.LATEST);
    }

    static KinesisIOOptions createIOOptions(String ... args) {
        return (KinesisIOOptions)PipelineOptionsFactory.fromArgs((String[])args).as(KinesisIOOptions.class);
    }

    static SubscribeToShardRequest subscribeLatest(String shardId) {
        return (SubscribeToShardRequest)SubscribeToShardRequest.builder().consumerARN(CONSUMER).shardId(shardId).startingPosition((StartingPosition)StartingPosition.builder().type(ShardIteratorType.LATEST).build()).build();
    }

    static SubscribeToShardRequest subscribeAfterSeqNumber(String shardId, String seqNumber) {
        return (SubscribeToShardRequest)SubscribeToShardRequest.builder().consumerARN(CONSUMER).shardId(shardId).startingPosition((StartingPosition)StartingPosition.builder().type(ShardIteratorType.AFTER_SEQUENCE_NUMBER).sequenceNumber(seqNumber).build()).build();
    }

    static SubscribeToShardRequest subscribeAtSeqNumber(String shardId, String seqNumber) {
        return (SubscribeToShardRequest)SubscribeToShardRequest.builder().consumerARN(CONSUMER).shardId(shardId).startingPosition((StartingPosition)StartingPosition.builder().type(ShardIteratorType.AT_SEQUENCE_NUMBER).sequenceNumber(seqNumber).build()).build();
    }

    static SubscribeToShardRequest subscribeTrimHorizon(String shardId) {
        return (SubscribeToShardRequest)SubscribeToShardRequest.builder().consumerARN(CONSUMER).shardId(shardId).startingPosition((StartingPosition)StartingPosition.builder().type(ShardIteratorType.TRIM_HORIZON).build()).build();
    }

    static SubscribeToShardRequest subscribeAtTs(String shardId, Instant ts) {
        return (SubscribeToShardRequest)SubscribeToShardRequest.builder().consumerARN(CONSUMER).shardId(shardId).startingPosition((StartingPosition)StartingPosition.builder().timestamp(TimeUtil.toJava((Instant)ts)).type(ShardIteratorType.AT_TIMESTAMP).build()).build();
    }

    static SubscribeToShardEvent[] eventsWithoutRecords(int startingSeqNum, int numEvent) {
        ArrayList<SubscribeToShardEvent> events = new ArrayList<SubscribeToShardEvent>();
        for (int i = 0; i < numEvent; ++i) {
            events.add(TestHelpers.eventWithOutRecords(startingSeqNum));
            ++startingSeqNum;
        }
        return events.toArray(new SubscribeToShardEvent[numEvent]);
    }

    static SubscribeToShardEvent eventWithOutRecords(int sequenceNumber) {
        return (SubscribeToShardEvent)SubscribeToShardEvent.builder().millisBehindLatest(Long.valueOf(0L)).continuationSequenceNumber(String.valueOf(sequenceNumber)).build();
    }

    static SubscribeToShardEvent eventWithRecords(int numRecords) {
        return TestHelpers.eventWithRecords(0, numRecords);
    }

    static SubscribeToShardEvent reShardEvent(List<String> parentShardsIds, List<String> childShardsIds) {
        List childShards = childShardsIds.stream().map(s -> (ChildShard)ChildShard.builder().shardId(s).parentShards((Collection)parentShardsIds).build()).collect(Collectors.toList());
        return (SubscribeToShardEvent)SubscribeToShardEvent.builder().continuationSequenceNumber(null).childShards(childShards).build();
    }

    static SubscribeToShardEvent reShardEventWithRecords(int startSeqNumber, int numRecords, List<String> parentShardsIds, List<String> childShardsIds) {
        ArrayList<Record> records = new ArrayList<Record>();
        for (int i = startSeqNumber; i < startSeqNumber + numRecords; ++i) {
            records.add(TestHelpers.record(String.valueOf(i)));
        }
        List childShards = childShardsIds.stream().map(s -> (ChildShard)ChildShard.builder().shardId(s).parentShards((Collection)parentShardsIds).build()).collect(Collectors.toList());
        return (SubscribeToShardEvent)SubscribeToShardEvent.builder().records(records).continuationSequenceNumber(null).childShards(childShards).build();
    }

    static SubscribeToShardEvent[] eventsWithRecords(int startSeqNumber, int numRecords) {
        ArrayList<SubscribeToShardEvent> events = new ArrayList<SubscribeToShardEvent>();
        for (int i = startSeqNumber; i < startSeqNumber + numRecords; ++i) {
            events.add(TestHelpers.eventWithRecords((List<Record>)ImmutableList.of((Object)TestHelpers.record(String.valueOf(i)))));
        }
        return events.toArray(new SubscribeToShardEvent[0]);
    }

    static SubscribeToShardEvent eventWithRecords(int startSeqNumber, int numRecords) {
        ArrayList<Record> records = new ArrayList<Record>();
        for (int i = startSeqNumber; i < startSeqNumber + numRecords; ++i) {
            records.add(TestHelpers.record(String.valueOf(i)));
        }
        return TestHelpers.eventWithRecords(records);
    }

    static SubscribeToShardEvent eventWithAggRecords(int startSeqNumber, int numRecords) {
        RecordsAggregator aggregator = new RecordsAggregator(1024, new Instant());
        for (int i = startSeqNumber; i < startSeqNumber + numRecords; ++i) {
            aggregator.addRecord("foo", null, String.valueOf(i).getBytes(StandardCharsets.UTF_8));
        }
        Record record = TestHelpers.record(Instant.now(), aggregator.toBytes(), String.valueOf(startSeqNumber));
        return (SubscribeToShardEvent)SubscribeToShardEvent.builder().continuationSequenceNumber(String.valueOf(startSeqNumber)).records(new Record[]{record}).build();
    }

    static SubscribeToShardEvent eventWithRecords(List<Record> recordList) {
        String lastSeqNum = "0";
        for (Record r : recordList) {
            lastSeqNum = r.sequenceNumber();
        }
        return (SubscribeToShardEvent)SubscribeToShardEvent.builder().millisBehindLatest(Long.valueOf(0L)).records(recordList).continuationSequenceNumber(lastSeqNum).build();
    }

    static List<Record> recordWithMinutesAgo(int minutesAgo) {
        return ImmutableList.of((Object)TestHelpers.record(Instant.now().minus((ReadableDuration)Duration.standardMinutes((long)minutesAgo)), new byte[0], "0"));
    }

    private static Record record(String seqNum) {
        return TestHelpers.record(Instant.now(), seqNum.getBytes(StandardCharsets.UTF_8), seqNum);
    }

    private static Record record(Instant now, int shard, int offset) {
        String seqNum = Integer.toString(shard * 100 + offset);
        return TestHelpers.record(now.plus((ReadableDuration)Duration.standardSeconds((long)offset)), seqNum.getBytes(StandardCharsets.UTF_8), seqNum);
    }

    private static Record recordWithCustomPayload(Instant now, int shard, int offset, byte[] payload) {
        String seqNum = Integer.toString(shard * 100 + offset);
        return TestHelpers.record(now.plus((ReadableDuration)Duration.standardSeconds((long)offset)), payload, seqNum);
    }

    private static ArgumentMatcher<GetShardIteratorRequest> hasShardId(int id) {
        return req -> req != null && req.shardId().equals("" + id);
    }

    private static ArgumentMatcher<GetRecordsRequest> hasShardIterator(String id) {
        return req -> req != null && req.shardIterator().equals(id);
    }
}

