/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.Serializable;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.common.RetryConfiguration;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisPartitioner;
import org.apache.beam.sdk.io.aws2.kinesis.PutRecordsHelpers;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowableAssert;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.AdditionalMatchers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.verification.VerificationMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.Shard;

@RunWith(value=MockitoJUnitRunner.StrictStubs.class)
public class KinesisIOWriteTest
extends PutRecordsHelpers {
    private static final String STREAM = "test";
    @Rule
    public final TestPipeline pipeline = TestPipeline.create();
    @Mock
    public KinesisAsyncClient client;
    private static KinesisPartitioner<TestRow> partitionByName = (KinesisPartitioner & Serializable)row -> row.name();
    private static SerializableFunction<TestRow, byte[]> bytesOfId = (SerializableFunction & Serializable)row -> KinesisIOWriteTest.bytesOf(row.id());

    @Before
    public void configure() {
        MockClientBuilderFactory.set(this.pipeline, KinesisAsyncClientBuilder.class, this.client);
        CompletableFuture errorResp = new CompletableFuture();
        errorResp.completeExceptionally(new RuntimeException("Unavailable, retried later"));
        Mockito.when((Object)this.client.listShards((ListShardsRequest)ArgumentMatchers.any(ListShardsRequest.class))).thenReturn(errorResp);
    }

    @After
    public void resetTimeSource() {
        DateTimeUtils.setCurrentMillisSystem();
    }

    private KinesisIO.Write<TestRow> kinesisWrite() {
        return KinesisIO.write().withStreamName(STREAM).withPartitioner(partitionByName).withSerializer(bytesOfId);
    }

    @Test
    public void testWrite() {
        Supplier<List<List<TestRow>>> capturedRecords = this.captureBatchRecords(this.client);
        ((PCollection)((PCollection)this.pipeline.apply((PTransform)GenerateSequence.from((long)0L).to(100L))).apply((PTransform)ParDo.of((DoFn)new TestRow.DeterministicallyConstructTestRowFn()))).apply((PTransform)this.kinesisWrite().withRecordAggregationDisabled());
        this.pipeline.run().waitUntilFinish();
        List<List<TestRow>> requests = capturedRecords.get();
        Assertions.assertThat((Iterable)Iterables.concat(requests)).containsExactlyInAnyOrderElementsOf(TestRow.getExpectedValues((int)0, (int)100));
    }

    @Test
    public void testWriteWithBatchMaxRecords() {
        Supplier<List<List<TestRow>>> capturedRecords = this.captureBatchRecords(this.client);
        ((PCollection)((PCollection)this.pipeline.apply((PTransform)GenerateSequence.from((long)0L).to(100L))).apply((PTransform)ParDo.of((DoFn)new TestRow.DeterministicallyConstructTestRowFn()))).apply((PTransform)this.kinesisWrite().withBatchMaxRecords(1).withRecordAggregationDisabled());
        this.pipeline.run().waitUntilFinish();
        List<List<TestRow>> requests = capturedRecords.get();
        for (List<TestRow> request : requests) {
            Assertions.assertThat((int)request.size()).isEqualTo(1);
        }
        Assertions.assertThat((Iterable)Iterables.concat(requests)).containsExactlyInAnyOrderElementsOf(TestRow.getExpectedValues((int)0, (int)100));
    }

    @Test
    public void testWriteWithBatchMaxBytes() {
        Supplier<List<List<TestRow>>> capturedRecords = this.captureBatchRecords(this.client);
        ((PCollection)((PCollection)this.pipeline.apply((PTransform)GenerateSequence.from((long)0L).to(100L))).apply((PTransform)ParDo.of((DoFn)new TestRow.DeterministicallyConstructTestRowFn()))).apply((PTransform)this.kinesisWrite().withBatchMaxBytes(15).withRecordAggregationDisabled());
        this.pipeline.run().waitUntilFinish();
        List<List<TestRow>> requests = capturedRecords.get();
        for (List<TestRow> request : requests) {
            Assertions.assertThat((int)request.size()).isEqualTo(1);
        }
        Assertions.assertThat((Iterable)Iterables.concat(requests)).containsExactlyInAnyOrderElementsOf(TestRow.getExpectedValues((int)0, (int)100));
    }

    @Test
    public void testWriteFailure() {
        Mockito.when((Object)this.client.putRecords(this.anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse), (Object[])new CompletableFuture[]{CompletableFuture.supplyAsync(() -> (PutRecordsResponse)Preconditions.checkNotNull(null, (Object)"putRecords failed")), CompletableFuture.completedFuture(this.successResponse)});
        ((PCollection)((PCollection)this.pipeline.apply((PTransform)GenerateSequence.from((long)0L).to(100L))).apply((PTransform)ParDo.of((DoFn)new TestRow.DeterministicallyConstructTestRowFn()))).apply((PTransform)this.kinesisWrite().withRecordAggregationDisabled());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.pipeline.run().waitUntilFinish()).isInstanceOf(Pipeline.PipelineExecutionException.class)).hasMessageContaining("putRecords failed");
    }

    @Test
    public void testWriteWithPartialSuccess() {
        Mockito.when((Object)this.client.putRecords(this.anyRequest())).thenReturn(CompletableFuture.completedFuture(this.partialSuccessResponse(70, 30))).thenReturn(CompletableFuture.completedFuture(this.partialSuccessResponse(10, 20))).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        RetryConfiguration retry = RetryConfiguration.builder().maxBackoff(Duration.millis((long)1L)).build();
        ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)100, (Object[])new Integer[0]))).apply((PTransform)ParDo.of((DoFn)new GenerateTestRows()))).apply((PTransform)this.kinesisWrite().withClientConfiguration(ClientConfiguration.builder().retry(retry).build()).withRecordAggregationDisabled());
        this.pipeline.run().waitUntilFinish();
        InOrder ordered = Mockito.inOrder((Object[])new Object[]{this.client});
        ((KinesisAsyncClient)ordered.verify((Object)this.client)).putRecords((PutRecordsRequest)ArgumentMatchers.argThat(this.containsAll(TestRow.getExpectedValues((int)0, (int)100))));
        ((KinesisAsyncClient)ordered.verify((Object)this.client)).putRecords((PutRecordsRequest)ArgumentMatchers.argThat(this.containsAll(TestRow.getExpectedValues((int)70, (int)100))));
        ((KinesisAsyncClient)ordered.verify((Object)this.client)).putRecords((PutRecordsRequest)ArgumentMatchers.argThat(this.containsAll(TestRow.getExpectedValues((int)80, (int)100))));
    }

    @Test
    public void testWriteAggregatedByDefault() {
        Mockito.when((Object)this.client.putRecords(this.anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)100, (Object[])new Integer[0]))).apply((PTransform)ParDo.of((DoFn)new GenerateTestRows()))).apply((PTransform)this.kinesisWrite().withPartitioner((KinesisPartitioner & Serializable)row -> "a"));
        this.pipeline.run().waitUntilFinish();
        ((KinesisAsyncClient)Mockito.verify((Object)this.client)).putRecords((PutRecordsRequest)ArgumentMatchers.argThat(this.hasSize(1)));
    }

    @Test
    public void testWriteAggregatedShardAware() {
        this.mockShardRanges(KinesisPartitioner.MIN_HASH_KEY, KinesisPartitioner.MAX_HASH_KEY.shiftRight(1));
        Mockito.when((Object)this.client.putRecords(this.anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)100, (Object[])new Integer[0]))).apply((PTransform)ParDo.of((DoFn)new GenerateTestRows()))).apply((PTransform)this.kinesisWrite().withPartitioner((KinesisPartitioner & Serializable)row -> row.id().toString()));
        this.pipeline.run().waitUntilFinish();
        ((KinesisAsyncClient)Mockito.verify((Object)this.client)).putRecords((PutRecordsRequest)ArgumentMatchers.argThat(this.hasSize(2)));
        ((KinesisAsyncClient)Mockito.verify((Object)this.client)).listShards((ListShardsRequest)ArgumentMatchers.any(ListShardsRequest.class));
    }

    @Test
    public void testWriteAggregatedShardRefreshPending() {
        CompletableFuture<ListShardsResponse> resp = new CompletableFuture<ListShardsResponse>();
        Mockito.when((Object)this.client.listShards((ListShardsRequest)ArgumentMatchers.any(ListShardsRequest.class))).thenReturn(resp);
        Mockito.when((Object)this.client.putRecords(this.anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)100, (Object[])new Integer[0]))).apply((PTransform)ParDo.of((DoFn)new GenerateTestRows()))).apply((PTransform)this.kinesisWrite().withPartitioner((KinesisPartitioner & Serializable)row -> row.id().toString()));
        this.pipeline.run().waitUntilFinish();
        resp.complete((ListShardsResponse)ListShardsResponse.builder().build());
        ((KinesisAsyncClient)Mockito.verify((Object)this.client)).putRecords((PutRecordsRequest)ArgumentMatchers.argThat(this.hasSize(100)));
        ((KinesisAsyncClient)Mockito.verify((Object)this.client)).listShards((ListShardsRequest)ArgumentMatchers.any(ListShardsRequest.class));
    }

    @Test
    public void testWriteAggregatedShardRefreshDisabled() {
        Mockito.when((Object)this.client.putRecords(this.anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)100, (Object[])new Integer[0]))).apply((PTransform)ParDo.of((DoFn)new GenerateTestRows()))).apply((PTransform)this.kinesisWrite().withRecordAggregation(b -> b.shardRefreshInterval(Duration.ZERO)).withPartitioner((KinesisPartitioner & Serializable)row -> row.id().toString()));
        this.pipeline.run().waitUntilFinish();
        ((KinesisAsyncClient)Mockito.verify((Object)this.client)).putRecords((PutRecordsRequest)ArgumentMatchers.argThat(this.hasSize(100)));
        ((KinesisAsyncClient)Mockito.verify((Object)this.client, (VerificationMode)Mockito.times((int)0))).listShards((ListShardsRequest)ArgumentMatchers.any(ListShardsRequest.class));
    }

    @Test
    public void testWriteAggregatedUsingExplicitPartitioner() {
        Mockito.when((Object)this.client.putRecords(this.anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)100, (Object[])new Integer[0]))).apply((PTransform)ParDo.of((DoFn)new GenerateTestRows()))).apply((PTransform)this.kinesisWrite().withPartitioner(KinesisPartitioner.explicitRandomPartitioner((int)2)));
        this.pipeline.run().waitUntilFinish();
        ((KinesisAsyncClient)Mockito.verify((Object)this.client)).putRecords((PutRecordsRequest)ArgumentMatchers.argThat(this.hasSize(2)));
        ((KinesisAsyncClient)Mockito.verify((Object)this.client, (VerificationMode)Mockito.times((int)0))).listShards((ListShardsRequest)ArgumentMatchers.any(ListShardsRequest.class));
    }

    @Test
    public void testWriteAggregatedWithMaxBytes() {
        Mockito.when((Object)this.client.putRecords(this.anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        int expectedBytes = 5023;
        ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)1000, (Object[])new Integer[0]))).apply((PTransform)ParDo.of((DoFn)new GenerateTestRows()))).apply((PTransform)this.kinesisWrite().withPartitioner((KinesisPartitioner & Serializable)row -> "a").withRecordAggregation(b -> b.maxBytes(5023).maxBufferedTime(Duration.standardSeconds((long)5L))));
        this.pipeline.run().waitUntilFinish();
        ((KinesisAsyncClient)Mockito.verify((Object)this.client)).putRecords((PutRecordsRequest)AdditionalMatchers.and((Object)((PutRecordsRequest)ArgumentMatchers.argThat(this.hasSize(2))), (Object)((PutRecordsRequest)ArgumentMatchers.argThat(this.hasRecordSize(5023)))));
    }

    @Test
    public void testWriteAggregatedWithMaxBytesAndBatchMaxBytes() {
        Mockito.when((Object)this.client.putRecords(this.anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        int expectedBytes = 5023;
        ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)1000, (Object[])new Integer[0]))).apply((PTransform)ParDo.of((DoFn)new GenerateTestRows()))).apply((PTransform)this.kinesisWrite().withPartitioner((KinesisPartitioner & Serializable)row -> "a").withBatchMaxBytes(5024).withRecordAggregation(b -> b.maxBytes(5023).maxBufferedTime(Duration.standardSeconds((long)5L))));
        this.pipeline.run().waitUntilFinish();
        ((KinesisAsyncClient)Mockito.verify((Object)this.client, (VerificationMode)Mockito.times((int)2))).putRecords((PutRecordsRequest)AdditionalMatchers.and((Object)((PutRecordsRequest)ArgumentMatchers.argThat(this.hasSize(1))), (Object)((PutRecordsRequest)ArgumentMatchers.argThat(this.hasRecordSize(5023)))));
    }

    @Test
    public void testWriteAggregatedWithMaxBytesAndBatchMaxRecords() {
        Mockito.when((Object)this.client.putRecords(this.anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        int expectedBytes = 5023;
        ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)1000, (Object[])new Integer[0]))).apply((PTransform)ParDo.of((DoFn)new GenerateTestRows()))).apply((PTransform)this.kinesisWrite().withPartitioner((KinesisPartitioner & Serializable)row -> "a").withBatchMaxRecords(1).withRecordAggregation(b -> b.maxBytes(5023).maxBufferedTime(Duration.standardSeconds((long)5L))));
        this.pipeline.run().waitUntilFinish();
        ((KinesisAsyncClient)Mockito.verify((Object)this.client, (VerificationMode)Mockito.times((int)2))).putRecords((PutRecordsRequest)AdditionalMatchers.and((Object)((PutRecordsRequest)ArgumentMatchers.argThat(this.hasSize(1))), (Object)((PutRecordsRequest)ArgumentMatchers.argThat(this.hasRecordSize(5023)))));
    }

    @Test
    public void testWriteAggregatedWithMaxBufferTime() throws Throwable {
        Mockito.when((Object)this.client.putRecords(this.anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        KinesisIO.Write write = this.kinesisWrite().withPartitioner((KinesisPartitioner & Serializable)r -> r.id().toString()).withRecordAggregation(b -> b.maxBufferedTime(Duration.millis((long)100L)).maxBufferedTimeJitter(0.2));
        DateTimeUtils.setCurrentMillisFixed((long)0L);
        KinesisIO.Write.AggregatedWriter writer = new KinesisIO.Write.AggregatedWriter(this.pipeline.getOptions(), write, write.recordAggregation());
        writer.startBundle();
        for (int i = 1; i <= 3; ++i) {
            writer.write((Object)TestRow.fromSeed((Integer)i));
        }
        DateTimeUtils.setCurrentMillisFixed((long)50L);
        writer.write((Object)TestRow.fromSeed((Integer)4));
        DateTimeUtils.setCurrentMillisFixed((long)100L);
        writer.write((Object)TestRow.fromSeed((Integer)5));
        DateTimeUtils.setCurrentMillisFixed((long)200L);
        writer.write((Object)TestRow.fromSeed((Integer)6));
        writer.finishBundle();
        writer.close();
        InOrder ordered = Mockito.inOrder((Object[])new Object[]{this.client});
        ((KinesisAsyncClient)ordered.verify((Object)this.client)).putRecords((PutRecordsRequest)ArgumentMatchers.argThat(this.hasPartitions("1", "2", "3")));
        ((KinesisAsyncClient)ordered.verify((Object)this.client)).putRecords((PutRecordsRequest)ArgumentMatchers.argThat(this.hasPartitions("4", "5")));
        ((KinesisAsyncClient)ordered.verify((Object)this.client)).putRecords((PutRecordsRequest)ArgumentMatchers.argThat(this.hasPartitions("6")));
        ((KinesisAsyncClient)ordered.verify((Object)this.client)).close();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.client});
    }

    @Test
    public void testWriteAggregatedWithShardsRefresh() throws Throwable {
        int i;
        Mockito.when((Object)this.client.putRecords(this.anyRequest())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        KinesisIO.Write write = this.kinesisWrite().withPartitioner((KinesisPartitioner & Serializable)r -> r.id().toString()).withRecordAggregation(b -> b.shardRefreshInterval(Duration.millis((long)1000L)));
        DateTimeUtils.setCurrentMillisFixed((long)1L);
        KinesisIO.Write.AggregatedWriter writer = new KinesisIO.Write.AggregatedWriter(this.pipeline.getOptions(), write, write.recordAggregation());
        for (i = 1; i <= 3; ++i) {
            writer.write((Object)TestRow.fromSeed((Integer)i));
        }
        DateTimeUtils.setCurrentMillisFixed((long)1500L);
        this.mockShardRanges(KinesisPartitioner.MIN_HASH_KEY);
        for (i = 1; i <= 10; ++i) {
            writer.write((Object)TestRow.fromSeed((Integer)i));
        }
        writer.finishBundle();
        writer.close();
        InOrder ordered = Mockito.inOrder((Object[])new Object[]{this.client});
        ((KinesisAsyncClient)ordered.verify((Object)this.client)).putRecords((PutRecordsRequest)ArgumentMatchers.argThat(this.hasPartitions("1", "2", "3")));
        ((KinesisAsyncClient)ordered.verify((Object)this.client)).putRecords((PutRecordsRequest)ArgumentMatchers.argThat(this.hasExplicitPartitions(KinesisPartitioner.MIN_HASH_KEY.toString())));
        ((KinesisAsyncClient)ordered.verify((Object)this.client)).close();
        ((KinesisAsyncClient)Mockito.verify((Object)this.client, (VerificationMode)Mockito.times((int)2))).listShards((ListShardsRequest)ArgumentMatchers.any(ListShardsRequest.class));
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.client});
    }

    @Test
    public void testShardRangesRefresh() {
        BigInteger shard1 = KinesisPartitioner.MIN_HASH_KEY;
        BigInteger shard2 = KinesisPartitioner.MAX_HASH_KEY.shiftRight(2);
        BigInteger shard3 = KinesisPartitioner.MAX_HASH_KEY.shiftRight(1);
        Mockito.when((Object)this.client.listShards((ListShardsRequest)ArgumentMatchers.argThat(this.isRequest(STREAM, null)))).thenReturn(CompletableFuture.completedFuture(this.listShardsResponse("a", this.shard(shard1))));
        Mockito.when((Object)this.client.listShards((ListShardsRequest)ArgumentMatchers.argThat(this.isRequest(null, "a")))).thenReturn(CompletableFuture.completedFuture(this.listShardsResponse("b", this.shard(shard2))));
        Mockito.when((Object)this.client.listShards((ListShardsRequest)ArgumentMatchers.argThat(this.isRequest(null, "b")))).thenReturn(CompletableFuture.completedFuture(this.listShardsResponse(null, this.shard(shard3))));
        KinesisIO.Write.PartitionKeyHasher pkHasher = new KinesisIO.Write.PartitionKeyHasher();
        KinesisIO.Write.ShardRanges shardRanges = KinesisIO.Write.ShardRanges.of((String)STREAM);
        shardRanges.refreshPeriodically(this.client, Instant::now);
        ((KinesisAsyncClient)Mockito.verify((Object)this.client, (VerificationMode)Mockito.times((int)3))).listShards((ListShardsRequest)ArgumentMatchers.any(ListShardsRequest.class));
        BigInteger hashKeyA = pkHasher.hashKey("a");
        Assertions.assertThat((BigInteger)shardRanges.shardAwareHashKey(hashKeyA)).isEqualTo((Object)shard1);
        Assertions.assertThat((BigInteger)hashKeyA).isBetween(shard1, shard2.subtract(BigInteger.ONE));
        BigInteger hashKeyB = pkHasher.hashKey("b");
        Assertions.assertThat((BigInteger)shardRanges.shardAwareHashKey(hashKeyB)).isEqualTo((Object)shard3);
        Assertions.assertThat((BigInteger)hashKeyB).isBetween(shard3, KinesisPartitioner.MAX_HASH_KEY);
        BigInteger hashKeyC = pkHasher.hashKey("c");
        Assertions.assertThat((BigInteger)shardRanges.shardAwareHashKey(hashKeyC)).isEqualTo((Object)shard2);
        Assertions.assertThat((BigInteger)hashKeyC).isBetween(shard2, shard3.subtract(BigInteger.ONE));
    }

    @Test
    public void validateMissingStreamName() {
        ((ThrowableAssert)this.assertThrown(Function.identity()).isInstanceOf(IllegalArgumentException.class)).hasMessage("streamName is required");
    }

    @Test
    public void validateEmptyStreamName() {
        ((ThrowableAssert)this.assertThrown(w -> w.withStreamName("")).isInstanceOf(IllegalArgumentException.class)).hasMessage("streamName cannot be empty");
    }

    @Test
    public void validateMissingPartitioner() {
        ((ThrowableAssert)this.assertThrown(w -> w.withStreamName(STREAM).withSerializer(bytesOfId)).isInstanceOf(IllegalArgumentException.class)).hasMessage("partitioner is required");
    }

    @Test
    public void validateMissingSerializer() {
        ((ThrowableAssert)this.assertThrown(w -> w.withStreamName(STREAM).withPartitioner(partitionByName)).isInstanceOf(IllegalArgumentException.class)).hasMessage("serializer is required");
    }

    @Test
    public void validateInvalidConcurrentRequests() {
        ((ThrowableAssert)this.assertThrown(w -> w.withConcurrentRequests(0)).isInstanceOf(IllegalArgumentException.class)).hasMessage("concurrentRequests must be > 0");
    }

    @Test
    public void validateBatchMaxRecordsTooLow() {
        ((ThrowableAssert)this.assertThrown(w -> w.withBatchMaxRecords(0)).isInstanceOf(IllegalArgumentException.class)).hasMessage("batchMaxRecords must be in [1,500]");
    }

    @Test
    public void validateBatchMaxRecordsTooHigh() {
        ((ThrowableAssert)this.assertThrown(w -> w.withBatchMaxRecords(501)).isInstanceOf(IllegalArgumentException.class)).hasMessage("batchMaxRecords must be in [1,500]");
    }

    @Test
    public void validateBatchMaxBytesTooLow() {
        ((ThrowableAssert)this.assertThrown(w -> w.withBatchMaxBytes(0)).isInstanceOf(IllegalArgumentException.class)).hasMessage("batchMaxBytes must be in [1,5242880]");
    }

    @Test
    public void validateBatchMaxBytesTooHigh() {
        ((ThrowableAssert)this.assertThrown(w -> w.withBatchMaxBytes(0x500001)).isInstanceOf(IllegalArgumentException.class)).hasMessage("batchMaxBytes must be in [1,5242880]");
    }

    @Test
    public void validateRecordAggregationMaxBytesAboveLimit() {
        ((ThrowableAssert)this.assertThrown(w -> w.withRecordAggregation(b -> b.maxBytes(0x100001))).isInstanceOf(IllegalArgumentException.class)).hasMessage("maxBytes must be positive and <= 1048576");
    }

    private Shard shard(BigInteger lowerRange) {
        return (Shard)Shard.builder().hashKeyRange((HashKeyRange)HashKeyRange.builder().startingHashKey(lowerRange.toString()).build()).build();
    }

    private ListShardsResponse listShardsResponse(String nextToken, Shard ... shards) {
        return (ListShardsResponse)ListShardsResponse.builder().shards(shards).nextToken(nextToken).build();
    }

    protected ArgumentMatcher<ListShardsRequest> isRequest(String stream, String nextToken) {
        return req -> req != null && Objects.equal((Object)stream, (Object)req.streamName()) && Objects.equal((Object)nextToken, (Object)req.nextToken());
    }

    private void mockShardRanges(BigInteger ... lowerBounds) {
        List shards = Arrays.stream(lowerBounds).map(lower -> this.shard((BigInteger)lower)).collect(Collectors.toList());
        Mockito.when((Object)this.client.listShards((ListShardsRequest)ArgumentMatchers.any(ListShardsRequest.class))).thenReturn(CompletableFuture.completedFuture((ListShardsResponse)ListShardsResponse.builder().shards(shards).build()));
    }

    private ThrowableAssert assertThrown(Function<KinesisIO.Write<TestRow>, KinesisIO.Write<TestRow>> writeConfig) {
        this.pipeline.enableAbandonedNodeEnforcement(false);
        PCollection input = (PCollection)Mockito.mock(PCollection.class);
        Mockito.when((Object)input.getPipeline()).thenReturn((Object)this.pipeline);
        input.getPipeline();
        return (ThrowableAssert)Assertions.assertThatThrownBy(() -> ((KinesisIO.Write)writeConfig.apply(KinesisIO.write())).expand(input));
    }

    private Supplier<List<List<TestRow>>> captureBatchRecords(KinesisAsyncClient mock) {
        ArgumentCaptor cap = ArgumentCaptor.forClass(PutRecordsRequest.class);
        Mockito.when((Object)mock.putRecords((PutRecordsRequest)cap.capture())).thenReturn(CompletableFuture.completedFuture(this.successResponse));
        return () -> Lists.transform((List)cap.getAllValues(), req -> Lists.transform((List)req.records(), this::toTestRow));
    }

    private static class GenerateTestRows
    extends DoFn<Integer, TestRow> {
        private GenerateTestRows() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element Integer rowCount, DoFn.OutputReceiver<TestRow> out) {
            for (TestRow row : TestRow.getExpectedValues((int)0, (int)rowCount)) {
                out.output((Object)row);
            }
        }
    }
}

