package org.apache.beam.sdk.io.aws2.kinesis;

import java.lang.invoke.SerializedLambda;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.common.RetryConfiguration;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowableAssert;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.AdditionalMatchers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.Shard;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisIOWriteTest.class */
public class KinesisIOWriteTest extends PutRecordsHelpers {
    private static final String STREAM = "test";

    @Rule
    public final TestPipeline pipeline = TestPipeline.create();

    @Mock
    public KinesisAsyncClient client;
    private static KinesisPartitioner<TestRow> partitionByName = testRow -> {
        return testRow.name();
    };
    private static SerializableFunction<TestRow, byte[]> bytesOfId = testRow -> {
        return bytesOf(testRow.id().intValue());
    };

    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisIOWriteTest$GenerateTestRows.class */
    private static class GenerateTestRows extends DoFn<Integer, TestRow> {
        private GenerateTestRows() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element Integer num, DoFn.OutputReceiver<TestRow> outputReceiver) {
            Iterator it = TestRow.getExpectedValues(0, num.intValue()).iterator();
            while (it.hasNext()) {
                outputReceiver.output((TestRow) it.next());
            }
        }
    }

    @Before
    public void configure() {
        MockClientBuilderFactory.set(this.pipeline, KinesisAsyncClientBuilder.class, this.client);
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.completeExceptionally(new RuntimeException("Unavailable, retried later"));
        Mockito.when(this.client.listShards((ListShardsRequest) ArgumentMatchers.any(ListShardsRequest.class))).thenReturn(completableFuture);
    }

    @After
    public void resetTimeSource() {
        DateTimeUtils.setCurrentMillisSystem();
    }

    private KinesisIO.Write<TestRow> kinesisWrite() {
        return KinesisIO.write().withStreamName(STREAM).withPartitioner(partitionByName).withSerializer(bytesOfId);
    }

    @Test
    public void testWrite() {
        Supplier<List<List<TestRow>>> captureBatchRecords = captureBatchRecords(this.client);
        this.pipeline.apply(GenerateSequence.from(0L).to(100L)).apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())).apply(kinesisWrite().withRecordAggregationDisabled());
        this.pipeline.run().waitUntilFinish();
        Assertions.assertThat(Iterables.concat(captureBatchRecords.get())).containsExactlyInAnyOrderElementsOf(TestRow.getExpectedValues(0, 100));
    }

    @Test
    public void testWriteWithBatchMaxRecords() {
        Supplier<List<List<TestRow>>> captureBatchRecords = captureBatchRecords(this.client);
        this.pipeline.apply(GenerateSequence.from(0L).to(100L)).apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())).apply(kinesisWrite().withBatchMaxRecords(1).withRecordAggregationDisabled());
        this.pipeline.run().waitUntilFinish();
        List<List<TestRow>> list = captureBatchRecords.get();
        Iterator<List<TestRow>> it = list.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(it.next().size()).isEqualTo(1);
        }
        Assertions.assertThat(Iterables.concat(list)).containsExactlyInAnyOrderElementsOf(TestRow.getExpectedValues(0, 100));
    }

    @Test
    public void testWriteWithBatchMaxBytes() {
        Supplier<List<List<TestRow>>> captureBatchRecords = captureBatchRecords(this.client);
        this.pipeline.apply(GenerateSequence.from(0L).to(100L)).apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())).apply(kinesisWrite().withBatchMaxBytes(15).withRecordAggregationDisabled());
        this.pipeline.run().waitUntilFinish();
        List<List<TestRow>> list = captureBatchRecords.get();
        Iterator<List<TestRow>> it = list.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(it.next().size()).isEqualTo(1);
        }
        Assertions.assertThat(Iterables.concat(list)).containsExactlyInAnyOrderElementsOf(TestRow.getExpectedValues(0, 100));
    }

    @Test
    public void testWriteFailure() {
        Mockito.when(this.client.putRecords(anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse), new CompletableFuture[]{CompletableFuture.supplyAsync(() -> {
            return (PutRecordsResponse) Preconditions.checkNotNull((Object) null, "putRecords failed");
        }), CompletableFuture.completedFuture(this.successResponse)});
        this.pipeline.apply(GenerateSequence.from(0L).to(100L)).apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())).apply(kinesisWrite().withRecordAggregationDisabled());
        Assertions.assertThatThrownBy(() -> {
            this.pipeline.run().waitUntilFinish();
        }).isInstanceOf(Pipeline.PipelineExecutionException.class).hasMessageContaining("putRecords failed");
    }

    @Test
    public void testWriteWithPartialSuccess() {
        Mockito.when(this.client.putRecords(anyRequest())).thenReturn(CompletableFuture.completedFuture(partialSuccessResponse(70, 30))).thenReturn(CompletableFuture.completedFuture(partialSuccessResponse(10, 20))).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        this.pipeline.apply(Create.of(100, new Integer[0])).apply(ParDo.of(new GenerateTestRows())).apply(kinesisWrite().withClientConfiguration(ClientConfiguration.builder().retry(RetryConfiguration.builder().maxBackoff(Duration.millis(1L)).build()).build()).withRecordAggregationDisabled());
        this.pipeline.run().waitUntilFinish();
        InOrder inOrder = Mockito.inOrder(new Object[]{this.client});
        ((KinesisAsyncClient) inOrder.verify(this.client)).putRecords((PutRecordsRequest) ArgumentMatchers.argThat(containsAll(TestRow.getExpectedValues(0, 100))));
        ((KinesisAsyncClient) inOrder.verify(this.client)).putRecords((PutRecordsRequest) ArgumentMatchers.argThat(containsAll(TestRow.getExpectedValues(70, 100))));
        ((KinesisAsyncClient) inOrder.verify(this.client)).putRecords((PutRecordsRequest) ArgumentMatchers.argThat(containsAll(TestRow.getExpectedValues(80, 100))));
    }

    @Test
    public void testWriteAggregatedByDefault() {
        Mockito.when(this.client.putRecords(anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        this.pipeline.apply(Create.of(100, new Integer[0])).apply(ParDo.of(new GenerateTestRows())).apply(kinesisWrite().withPartitioner(testRow -> {
            return "a";
        }));
        this.pipeline.run().waitUntilFinish();
        ((KinesisAsyncClient) Mockito.verify(this.client)).putRecords((PutRecordsRequest) ArgumentMatchers.argThat(hasSize(1)));
    }

    @Test
    public void testWriteAggregatedShardAware() {
        mockShardRanges(KinesisPartitioner.MIN_HASH_KEY, KinesisPartitioner.MAX_HASH_KEY.shiftRight(1));
        Mockito.when(this.client.putRecords(anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        this.pipeline.apply(Create.of(100, new Integer[0])).apply(ParDo.of(new GenerateTestRows())).apply(kinesisWrite().withPartitioner(testRow -> {
            return testRow.id().toString();
        }));
        this.pipeline.run().waitUntilFinish();
        ((KinesisAsyncClient) Mockito.verify(this.client)).putRecords((PutRecordsRequest) ArgumentMatchers.argThat(hasSize(2)));
        ((KinesisAsyncClient) Mockito.verify(this.client)).listShards((ListShardsRequest) ArgumentMatchers.any(ListShardsRequest.class));
    }

    @Test
    public void testWriteAggregatedShardRefreshPending() {
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(this.client.listShards((ListShardsRequest) ArgumentMatchers.any(ListShardsRequest.class))).thenReturn(completableFuture);
        Mockito.when(this.client.putRecords(anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        this.pipeline.apply(Create.of(100, new Integer[0])).apply(ParDo.of(new GenerateTestRows())).apply(kinesisWrite().withPartitioner(testRow -> {
            return testRow.id().toString();
        }));
        this.pipeline.run().waitUntilFinish();
        completableFuture.complete((ListShardsResponse) ListShardsResponse.builder().build());
        ((KinesisAsyncClient) Mockito.verify(this.client)).putRecords((PutRecordsRequest) ArgumentMatchers.argThat(hasSize(100)));
        ((KinesisAsyncClient) Mockito.verify(this.client)).listShards((ListShardsRequest) ArgumentMatchers.any(ListShardsRequest.class));
    }

    @Test
    public void testWriteAggregatedShardRefreshDisabled() {
        Mockito.when(this.client.putRecords(anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        this.pipeline.apply(Create.of(100, new Integer[0])).apply(ParDo.of(new GenerateTestRows())).apply(kinesisWrite().withRecordAggregation(builder -> {
            builder.shardRefreshInterval(Duration.ZERO);
        }).withPartitioner(testRow -> {
            return testRow.id().toString();
        }));
        this.pipeline.run().waitUntilFinish();
        ((KinesisAsyncClient) Mockito.verify(this.client)).putRecords((PutRecordsRequest) ArgumentMatchers.argThat(hasSize(100)));
        ((KinesisAsyncClient) Mockito.verify(this.client, Mockito.times(0))).listShards((ListShardsRequest) ArgumentMatchers.any(ListShardsRequest.class));
    }

    @Test
    public void testWriteAggregatedUsingExplicitPartitioner() {
        Mockito.when(this.client.putRecords(anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        this.pipeline.apply(Create.of(100, new Integer[0])).apply(ParDo.of(new GenerateTestRows())).apply(kinesisWrite().withPartitioner(KinesisPartitioner.explicitRandomPartitioner(2)));
        this.pipeline.run().waitUntilFinish();
        ((KinesisAsyncClient) Mockito.verify(this.client)).putRecords((PutRecordsRequest) ArgumentMatchers.argThat(hasSize(2)));
        ((KinesisAsyncClient) Mockito.verify(this.client, Mockito.times(0))).listShards((ListShardsRequest) ArgumentMatchers.any(ListShardsRequest.class));
    }

    @Test
    public void testWriteAggregatedWithMaxBytes() {
        Mockito.when(this.client.putRecords(anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        this.pipeline.apply(Create.of(1000, new Integer[0])).apply(ParDo.of(new GenerateTestRows())).apply(kinesisWrite().withPartitioner(testRow -> {
            return "a";
        }).withRecordAggregation(builder -> {
            builder.maxBytes(5023).maxBufferedTime(Duration.standardSeconds(5L));
        }));
        this.pipeline.run().waitUntilFinish();
        ((KinesisAsyncClient) Mockito.verify(this.client)).putRecords((PutRecordsRequest) AdditionalMatchers.and((PutRecordsRequest) ArgumentMatchers.argThat(hasSize(2)), (PutRecordsRequest) ArgumentMatchers.argThat(hasRecordSize(5023))));
    }

    @Test
    public void testWriteAggregatedWithMaxBytesAndBatchMaxBytes() {
        Mockito.when(this.client.putRecords(anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        this.pipeline.apply(Create.of(1000, new Integer[0])).apply(ParDo.of(new GenerateTestRows())).apply(kinesisWrite().withPartitioner(testRow -> {
            return "a";
        }).withBatchMaxBytes(5024).withRecordAggregation(builder -> {
            builder.maxBytes(5023).maxBufferedTime(Duration.standardSeconds(5L));
        }));
        this.pipeline.run().waitUntilFinish();
        ((KinesisAsyncClient) Mockito.verify(this.client, Mockito.times(2))).putRecords((PutRecordsRequest) AdditionalMatchers.and((PutRecordsRequest) ArgumentMatchers.argThat(hasSize(1)), (PutRecordsRequest) ArgumentMatchers.argThat(hasRecordSize(5023))));
    }

    @Test
    public void testWriteAggregatedWithMaxBytesAndBatchMaxRecords() {
        Mockito.when(this.client.putRecords(anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        this.pipeline.apply(Create.of(1000, new Integer[0])).apply(ParDo.of(new GenerateTestRows())).apply(kinesisWrite().withPartitioner(testRow -> {
            return "a";
        }).withBatchMaxRecords(1).withRecordAggregation(builder -> {
            builder.maxBytes(5023).maxBufferedTime(Duration.standardSeconds(5L));
        }));
        this.pipeline.run().waitUntilFinish();
        ((KinesisAsyncClient) Mockito.verify(this.client, Mockito.times(2))).putRecords((PutRecordsRequest) AdditionalMatchers.and((PutRecordsRequest) ArgumentMatchers.argThat(hasSize(1)), (PutRecordsRequest) ArgumentMatchers.argThat(hasRecordSize(5023))));
    }

    @Test
    public void testWriteAggregatedWithMaxBufferTime() throws Throwable {
        Mockito.when(this.client.putRecords(anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        KinesisIO.Write withRecordAggregation = kinesisWrite().withPartitioner(testRow -> {
            return testRow.id().toString();
        }).withRecordAggregation(builder -> {
            builder.maxBufferedTime(Duration.millis(100L)).maxBufferedTimeJitter(0.2d);
        });
        DateTimeUtils.setCurrentMillisFixed(0L);
        KinesisIO.Write.AggregatedWriter aggregatedWriter = new KinesisIO.Write.AggregatedWriter(this.pipeline.getOptions(), withRecordAggregation, withRecordAggregation.recordAggregation());
        aggregatedWriter.startBundle();
        for (int i = 1; i <= 3; i++) {
            aggregatedWriter.write(TestRow.fromSeed(Integer.valueOf(i)));
        }
        DateTimeUtils.setCurrentMillisFixed(50L);
        aggregatedWriter.write(TestRow.fromSeed(4));
        DateTimeUtils.setCurrentMillisFixed(100L);
        aggregatedWriter.write(TestRow.fromSeed(5));
        DateTimeUtils.setCurrentMillisFixed(200L);
        aggregatedWriter.write(TestRow.fromSeed(6));
        aggregatedWriter.finishBundle();
        aggregatedWriter.close();
        InOrder inOrder = Mockito.inOrder(new Object[]{this.client});
        ((KinesisAsyncClient) inOrder.verify(this.client)).putRecords((PutRecordsRequest) ArgumentMatchers.argThat(hasPartitions("1", "2", "3")));
        ((KinesisAsyncClient) inOrder.verify(this.client)).putRecords((PutRecordsRequest) ArgumentMatchers.argThat(hasPartitions("4", "5")));
        ((KinesisAsyncClient) inOrder.verify(this.client)).putRecords((PutRecordsRequest) ArgumentMatchers.argThat(hasPartitions("6")));
        ((KinesisAsyncClient) inOrder.verify(this.client)).close();
        Mockito.verifyNoMoreInteractions(new Object[]{this.client});
    }

    @Test
    public void testWriteAggregatedWithShardsRefresh() throws Throwable {
        Mockito.when(this.client.putRecords(anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        KinesisIO.Write withRecordAggregation = kinesisWrite().withPartitioner(testRow -> {
            return testRow.id().toString();
        }).withRecordAggregation(builder -> {
            builder.shardRefreshInterval(Duration.millis(1000L));
        });
        DateTimeUtils.setCurrentMillisFixed(1L);
        KinesisIO.Write.AggregatedWriter aggregatedWriter = new KinesisIO.Write.AggregatedWriter(this.pipeline.getOptions(), withRecordAggregation, withRecordAggregation.recordAggregation());
        for (int i = 1; i <= 3; i++) {
            aggregatedWriter.write(TestRow.fromSeed(Integer.valueOf(i)));
        }
        DateTimeUtils.setCurrentMillisFixed(1500L);
        mockShardRanges(KinesisPartitioner.MIN_HASH_KEY);
        for (int i2 = 1; i2 <= 10; i2++) {
            aggregatedWriter.write(TestRow.fromSeed(Integer.valueOf(i2)));
        }
        aggregatedWriter.finishBundle();
        aggregatedWriter.close();
        InOrder inOrder = Mockito.inOrder(new Object[]{this.client});
        ((KinesisAsyncClient) inOrder.verify(this.client)).putRecords((PutRecordsRequest) ArgumentMatchers.argThat(hasPartitions("1", "2", "3")));
        ((KinesisAsyncClient) inOrder.verify(this.client)).putRecords((PutRecordsRequest) ArgumentMatchers.argThat(hasExplicitPartitions(KinesisPartitioner.MIN_HASH_KEY.toString())));
        ((KinesisAsyncClient) inOrder.verify(this.client)).close();
        ((KinesisAsyncClient) Mockito.verify(this.client, Mockito.times(2))).listShards((ListShardsRequest) ArgumentMatchers.any(ListShardsRequest.class));
        Mockito.verifyNoMoreInteractions(new Object[]{this.client});
    }

    @Test
    public void testShardRangesRefresh() {
        BigInteger bigInteger = KinesisPartitioner.MIN_HASH_KEY;
        BigInteger shiftRight = KinesisPartitioner.MAX_HASH_KEY.shiftRight(2);
        BigInteger shiftRight2 = KinesisPartitioner.MAX_HASH_KEY.shiftRight(1);
        Mockito.when(this.client.listShards((ListShardsRequest) ArgumentMatchers.argThat(isRequest(STREAM, null)))).thenReturn(CompletableFuture.completedFuture(listShardsResponse("a", shard(bigInteger))));
        Mockito.when(this.client.listShards((ListShardsRequest) ArgumentMatchers.argThat(isRequest(null, "a")))).thenReturn(CompletableFuture.completedFuture(listShardsResponse("b", shard(shiftRight))));
        Mockito.when(this.client.listShards((ListShardsRequest) ArgumentMatchers.argThat(isRequest(null, "b")))).thenReturn(CompletableFuture.completedFuture(listShardsResponse(null, shard(shiftRight2))));
        KinesisIO.Write.PartitionKeyHasher partitionKeyHasher = new KinesisIO.Write.PartitionKeyHasher();
        KinesisIO.Write.ShardRanges of = KinesisIO.Write.ShardRanges.of(STREAM);
        of.refreshPeriodically(this.client, Instant::now);
        ((KinesisAsyncClient) Mockito.verify(this.client, Mockito.times(3))).listShards((ListShardsRequest) ArgumentMatchers.any(ListShardsRequest.class));
        BigInteger hashKey = partitionKeyHasher.hashKey("a");
        Assertions.assertThat(of.shardAwareHashKey(hashKey)).isEqualTo(bigInteger);
        Assertions.assertThat(hashKey).isBetween(bigInteger, shiftRight.subtract(BigInteger.ONE));
        BigInteger hashKey2 = partitionKeyHasher.hashKey("b");
        Assertions.assertThat(of.shardAwareHashKey(hashKey2)).isEqualTo(shiftRight2);
        Assertions.assertThat(hashKey2).isBetween(shiftRight2, KinesisPartitioner.MAX_HASH_KEY);
        BigInteger hashKey3 = partitionKeyHasher.hashKey("c");
        Assertions.assertThat(of.shardAwareHashKey(hashKey3)).isEqualTo(shiftRight);
        Assertions.assertThat(hashKey3).isBetween(shiftRight, shiftRight2.subtract(BigInteger.ONE));
    }

    @Test
    public void validateMissingStreamName() {
        assertThrown(Function.identity()).isInstanceOf(IllegalArgumentException.class).hasMessage("streamName is required");
    }

    @Test
    public void validateEmptyStreamName() {
        assertThrown(write -> {
            return write.withStreamName("");
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("streamName cannot be empty");
    }

    @Test
    public void validateMissingPartitioner() {
        assertThrown(write -> {
            return write.withStreamName(STREAM).withSerializer(bytesOfId);
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("partitioner is required");
    }

    @Test
    public void validateMissingSerializer() {
        assertThrown(write -> {
            return write.withStreamName(STREAM).withPartitioner(partitionByName);
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("serializer is required");
    }

    @Test
    public void validateInvalidConcurrentRequests() {
        assertThrown(write -> {
            return write.withConcurrentRequests(0);
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("concurrentRequests must be > 0");
    }

    @Test
    public void validateBatchMaxRecordsTooLow() {
        assertThrown(write -> {
            return write.withBatchMaxRecords(0);
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("batchMaxRecords must be in [1,500]");
    }

    @Test
    public void validateBatchMaxRecordsTooHigh() {
        assertThrown(write -> {
            return write.withBatchMaxRecords(501);
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("batchMaxRecords must be in [1,500]");
    }

    @Test
    public void validateBatchMaxBytesTooLow() {
        assertThrown(write -> {
            return write.withBatchMaxBytes(0);
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("batchMaxBytes must be in [1,5242880]");
    }

    @Test
    public void validateBatchMaxBytesTooHigh() {
        assertThrown(write -> {
            return write.withBatchMaxBytes(5242881);
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("batchMaxBytes must be in [1,5242880]");
    }

    @Test
    public void validateRecordAggregationMaxBytesAboveLimit() {
        assertThrown(write -> {
            return write.withRecordAggregation(builder -> {
                builder.maxBytes(1048577);
            });
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("maxBytes must be positive and <= 1048576");
    }

    private Shard shard(BigInteger bigInteger) {
        return (Shard) Shard.builder().hashKeyRange((HashKeyRange) HashKeyRange.builder().startingHashKey(bigInteger.toString()).build()).build();
    }

    private ListShardsResponse listShardsResponse(String str, Shard... shardArr) {
        return (ListShardsResponse) ListShardsResponse.builder().shards(shardArr).nextToken(str).build();
    }

    protected ArgumentMatcher<ListShardsRequest> isRequest(String str, String str2) {
        return listShardsRequest -> {
            return listShardsRequest != null && Objects.equal(str, listShardsRequest.streamName()) && Objects.equal(str2, listShardsRequest.nextToken());
        };
    }

    private void mockShardRanges(BigInteger... bigIntegerArr) {
        Mockito.when(this.client.listShards((ListShardsRequest) ArgumentMatchers.any(ListShardsRequest.class))).thenReturn(CompletableFuture.completedFuture((ListShardsResponse) ListShardsResponse.builder().shards((List) Arrays.stream(bigIntegerArr).map(bigInteger -> {
            return shard(bigInteger);
        }).collect(Collectors.toList())).build()));
    }

    private ThrowableAssert assertThrown(Function<KinesisIO.Write<TestRow>, KinesisIO.Write<TestRow>> function) {
        this.pipeline.enableAbandonedNodeEnforcement(false);
        PCollection pCollection = (PCollection) Mockito.mock(PCollection.class);
        Mockito.when(pCollection.getPipeline()).thenReturn(this.pipeline);
        pCollection.getPipeline();
        return Assertions.assertThatThrownBy(() -> {
            ((KinesisIO.Write) function.apply(KinesisIO.write())).expand(pCollection);
        });
    }

    private Supplier<List<List<TestRow>>> captureBatchRecords(KinesisAsyncClient kinesisAsyncClient) {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(PutRecordsRequest.class);
        Mockito.when(kinesisAsyncClient.putRecords((PutRecordsRequest) forClass.capture())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        return () -> {
            return Lists.transform(forClass.getAllValues(), putRecordsRequest -> {
                return Lists.transform(putRecordsRequest.records(), this::toTestRow);
            });
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1390365461:
                if (implMethodName.equals("lambda$testWriteAggregatedWithMaxBytesAndBatchMaxBytes$7006ee18$1")) {
                    z = 6;
                    break;
                }
                break;
            case -1197941387:
                if (implMethodName.equals("lambda$testWriteAggregatedWithMaxBytes$7006ee18$1")) {
                    z = 9;
                    break;
                }
                break;
            case -1116339198:
                if (implMethodName.equals("lambda$testWriteAggregatedByDefault$7006ee18$1")) {
                    z = true;
                    break;
                }
                break;
            case -1015699319:
                if (implMethodName.equals("lambda$testWriteAggregatedShardRefreshDisabled$7006ee18$1")) {
                    z = 8;
                    break;
                }
                break;
            case 148052978:
                if (implMethodName.equals("lambda$testWriteAggregatedShardRefreshPending$7006ee18$1")) {
                    z = false;
                    break;
                }
                break;
            case 790743079:
                if (implMethodName.equals("lambda$static$b8c800c9$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1427181300:
                if (implMethodName.equals("lambda$testWriteAggregatedWithMaxBytesAndBatchMaxRecords$7006ee18$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1750187294:
                if (implMethodName.equals("lambda$testWriteAggregatedWithMaxBufferTime$a5baa056$1")) {
                    z = 10;
                    break;
                }
                break;
            case 1853004009:
                if (implMethodName.equals("lambda$testWriteAggregatedWithShardsRefresh$a5baa056$1")) {
                    z = 7;
                    break;
                }
                break;
            case 1853130114:
                if (implMethodName.equals("lambda$testWriteAggregatedShardAware$7006ee18$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1953518534:
                if (implMethodName.equals("lambda$static$d495c964$1")) {
                    z = 5;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisPartitioner") && serializedLambda.getFunctionalInterfaceMethodName().equals("getPartitionKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/String;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisIOWriteTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/common/TestRow;)Ljava/lang/String;")) {
                    return testRow -> {
                        return testRow.id().toString();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisPartitioner") && serializedLambda.getFunctionalInterfaceMethodName().equals("getPartitionKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/String;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisIOWriteTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/common/TestRow;)Ljava/lang/String;")) {
                    return testRow2 -> {
                        return "a";
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisPartitioner") && serializedLambda.getFunctionalInterfaceMethodName().equals("getPartitionKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/String;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisIOWriteTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/common/TestRow;)Ljava/lang/String;")) {
                    return testRow3 -> {
                        return "a";
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisPartitioner") && serializedLambda.getFunctionalInterfaceMethodName().equals("getPartitionKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/String;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisIOWriteTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/common/TestRow;)Ljava/lang/String;")) {
                    return testRow4 -> {
                        return testRow4.id().toString();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisPartitioner") && serializedLambda.getFunctionalInterfaceMethodName().equals("getPartitionKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/String;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisIOWriteTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/common/TestRow;)Ljava/lang/String;")) {
                    return testRow5 -> {
                        return testRow5.name();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisIOWriteTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/common/TestRow;)[B")) {
                    return testRow6 -> {
                        return bytesOf(testRow6.id().intValue());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisPartitioner") && serializedLambda.getFunctionalInterfaceMethodName().equals("getPartitionKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/String;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisIOWriteTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/common/TestRow;)Ljava/lang/String;")) {
                    return testRow7 -> {
                        return "a";
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisPartitioner") && serializedLambda.getFunctionalInterfaceMethodName().equals("getPartitionKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/String;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisIOWriteTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/common/TestRow;)Ljava/lang/String;")) {
                    return testRow8 -> {
                        return testRow8.id().toString();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisPartitioner") && serializedLambda.getFunctionalInterfaceMethodName().equals("getPartitionKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/String;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisIOWriteTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/common/TestRow;)Ljava/lang/String;")) {
                    return testRow9 -> {
                        return testRow9.id().toString();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisPartitioner") && serializedLambda.getFunctionalInterfaceMethodName().equals("getPartitionKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/String;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisIOWriteTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/common/TestRow;)Ljava/lang/String;")) {
                    return testRow10 -> {
                        return "a";
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisPartitioner") && serializedLambda.getFunctionalInterfaceMethodName().equals("getPartitionKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/String;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisIOWriteTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/common/TestRow;)Ljava/lang/String;")) {
                    return testRow11 -> {
                        return testRow11.id().toString();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
