package org.apache.beam.sdk.io.kinesis;

import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.producer.IKinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Properties;
import org.apache.beam.sdk.io.kinesis.AmazonKinesisMock;
import org.apache.beam.sdk.io.kinesis.KinesisIO;
import org.apache.beam.sdk.io.kinesis.KinesisMockReadTest;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/kinesis/KinesisMockWriteTest.class */
public class KinesisMockWriteTest {
    private static final String STREAM = "BEAM";
    private static final String PARTITION_KEY = "partitionKey";

    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    @Rule
    public final transient TestPipeline p2 = TestPipeline.create();

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    /* loaded from: input_file:org/apache/beam/sdk/io/kinesis/KinesisMockWriteTest$BasicKinesisPartitioner.class */
    private static final class BasicKinesisPartitioner implements KinesisPartitioner {
        private BasicKinesisPartitioner() {
        }

        public String getPartitionKey(byte[] bArr) {
            return String.valueOf(bArr.length);
        }

        public String getExplicitHashKey(byte[] bArr) {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kinesis/KinesisMockWriteTest$FakeKinesisProvider.class */
    private static final class FakeKinesisProvider implements AWSClientsProvider {
        private boolean isFailedFlush = false;

        public FakeKinesisProvider setFailedFlush(boolean z) {
            this.isFailedFlush = z;
            return this;
        }

        public AmazonKinesis getKinesisClient() {
            return (AmazonKinesis) Mockito.mock(AmazonKinesis.class);
        }

        public AmazonCloudWatch getCloudWatchClient() {
            throw new RuntimeException("Not implemented");
        }

        public IKinesisProducer createKinesisProducer(KinesisProducerConfiguration kinesisProducerConfiguration) {
            return new KinesisProducerMock(kinesisProducerConfiguration, this.isFailedFlush);
        }
    }

    @Before
    public void beforeTest() {
        KinesisServiceMock.getInstance().init(STREAM, 1);
    }

    @Test
    public void testWriteBuildsCorrectly() {
        Properties properties = new Properties();
        properties.setProperty("KinesisEndpoint", "localhost");
        properties.setProperty("KinesisPort", "4567");
        KinesisIO.Write withRetries = KinesisIO.write().withStreamName(STREAM).withPartitionKey(PARTITION_KEY).withPartitioner(new BasicKinesisPartitioner()).withAWSClientsProvider(new FakeKinesisProvider()).withProducerProperties(properties).withRetries(10);
        Assert.assertEquals(STREAM, withRetries.getStreamName());
        Assert.assertEquals(PARTITION_KEY, withRetries.getPartitionKey());
        Assert.assertEquals(properties, withRetries.getProducerProperties());
        Assert.assertEquals(FakeKinesisProvider.class, withRetries.getAWSClientsProvider().getClass());
        Assert.assertEquals(BasicKinesisPartitioner.class, withRetries.getPartitioner().getClass());
        Assert.assertEquals(10L, withRetries.getRetries());
        Assert.assertEquals("localhost", withRetries.getProducerProperties().getProperty("KinesisEndpoint"));
        Assert.assertEquals("4567", withRetries.getProducerProperties().getProperty("KinesisPort"));
    }

    @Test
    public void testWriteValidationFailsMissingStreamName() {
        KinesisIO.Write withAWSClientsProvider = KinesisIO.write().withPartitionKey(PARTITION_KEY).withAWSClientsProvider(new FakeKinesisProvider());
        this.thrown.expect(IllegalArgumentException.class);
        withAWSClientsProvider.expand((PCollection) null);
    }

    @Test
    public void testWriteValidationFailsMissingPartitioner() {
        KinesisIO.Write withAWSClientsProvider = KinesisIO.write().withStreamName(STREAM).withAWSClientsProvider(new FakeKinesisProvider());
        this.thrown.expect(IllegalArgumentException.class);
        withAWSClientsProvider.expand((PCollection) null);
    }

    @Test
    public void testWriteValidationFailsPartitionerAndPartitioneKey() {
        KinesisIO.Write withAWSClientsProvider = KinesisIO.write().withStreamName(STREAM).withPartitionKey(PARTITION_KEY).withPartitioner(new BasicKinesisPartitioner()).withAWSClientsProvider(new FakeKinesisProvider());
        this.thrown.expect(IllegalArgumentException.class);
        withAWSClientsProvider.expand((PCollection) null);
    }

    @Test
    public void testWriteValidationFailsMissingAWSClientsProvider() {
        KinesisIO.Write withStreamName = KinesisIO.write().withPartitionKey(PARTITION_KEY).withStreamName(STREAM);
        this.thrown.expect(IllegalArgumentException.class);
        withStreamName.expand((PCollection) null);
    }

    @Test
    public void testSetInvalidProperty() {
        Properties properties = new Properties();
        properties.setProperty("KinesisPort", "qwe");
        KinesisIO.Write withProducerProperties = KinesisIO.write().withStreamName(STREAM).withPartitionKey(PARTITION_KEY).withAWSClientsProvider(new FakeKinesisProvider()).withProducerProperties(properties);
        this.thrown.expect(IllegalArgumentException.class);
        withProducerProperties.expand((PCollection) null);
    }

    @Test
    public void testWrite() {
        KinesisServiceMock kinesisServiceMock = KinesisServiceMock.getInstance();
        Properties properties = new Properties();
        properties.setProperty("KinesisEndpoint", "localhost");
        properties.setProperty("KinesisPort", "4567");
        properties.setProperty("VerifyCertificate", "false");
        this.p.apply(Create.of(ImmutableList.of("1".getBytes(StandardCharsets.UTF_8), "2".getBytes(StandardCharsets.UTF_8), "3".getBytes(StandardCharsets.UTF_8)))).apply(KinesisIO.write().withStreamName(STREAM).withPartitionKey(PARTITION_KEY).withAWSClientsProvider(new FakeKinesisProvider()).withProducerProperties(properties));
        this.p.run().waitUntilFinish();
        Assert.assertEquals(3L, kinesisServiceMock.getAddedRecords().get());
    }

    @Test
    public void testWriteFailed() {
        this.p.apply(Create.of(ImmutableList.of("1".getBytes(StandardCharsets.UTF_8)))).apply(KinesisIO.write().withStreamName(STREAM).withPartitionKey(PARTITION_KEY).withAWSClientsProvider(new FakeKinesisProvider().setFailedFlush(true)).withRetries(2));
        this.thrown.expect(RuntimeException.class);
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testWriteAndReadFromMockKinesis() {
        KinesisServiceMock kinesisServiceMock = KinesisServiceMock.getInstance();
        this.p.apply(Create.of(ImmutableList.of("1".getBytes(StandardCharsets.UTF_8), "2".getBytes(StandardCharsets.UTF_8)))).apply(KinesisIO.write().withStreamName(STREAM).withPartitionKey(PARTITION_KEY).withAWSClientsProvider(new FakeKinesisProvider()));
        this.p.run().waitUntilFinish();
        Assert.assertEquals(2L, kinesisServiceMock.getAddedRecords().get());
        List<List<AmazonKinesisMock.TestData>> shardedData = kinesisServiceMock.getShardedData();
        PAssert.that(this.p2.apply(KinesisIO.read().withStreamName(STREAM).withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON).withAWSClientsProvider(new AmazonKinesisMock.Provider(shardedData, 10)).withMaxNumRecords(1 * 2)).apply(ParDo.of(new KinesisMockReadTest.KinesisRecordToTestData()))).containsInAnyOrder(Iterables.concat(shardedData));
        this.p2.run().waitUntilFinish();
    }
}
