package org.apache.flink.connector.kinesis.sink;

import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.connector.aws.config.AWSConfigConstants;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.rnorth.ducttape.ratelimits.RateLimiter;
import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.StreamStatus;
import software.amazon.awssdk.utils.SdkAutoCloseable;

/* loaded from: input_file:org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.class */
public class KinesisStreamsSinkITCase extends TestLogger {
    private static final String DEFAULT_FIRST_SHARD_NAME = "shardId-000000000000";
    private final SerializationSchema<String> serializationSchema = new SimpleStringSchema();
    private final PartitionKeyGenerator<String> partitionKeyGenerator = str -> {
        return String.valueOf(str.hashCode());
    };
    private final PartitionKeyGenerator<String> longPartitionKeyGenerator = str -> {
        return str;
    };

    @ClassRule
    public static final KinesaliteContainer KINESALITE = (KinesaliteContainer) ((KinesaliteContainer) new KinesaliteContainer(DockerImageName.parse("instructure/kinesalite:latest")).withNetwork(Network.newNetwork())).withNetworkAliases(new String[]{"kinesalite"});
    private StreamExecutionEnvironment env;
    private SdkAsyncHttpClient httpClient;
    private KinesisAsyncClient kinesisClient;

    /* loaded from: input_file:org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase$Scenario.class */
    private class Scenario {
        private int numberOfElementsToSend;
        private int sizeOfMessageBytes;
        private int bufferMaxTimeMS;
        private int maxInflightReqs;
        private int maxBatchSize;
        private int expectedElements;
        private boolean failOnError;
        private String kinesaliteStreamName;
        private String sinkConnectionStreamName;
        private SerializationSchema<String> serializationSchema;
        private PartitionKeyGenerator<String> partitionKeyGenerator;
        private Properties properties;

        private Scenario() {
            this.numberOfElementsToSend = 50;
            this.sizeOfMessageBytes = 25;
            this.bufferMaxTimeMS = 1000;
            this.maxInflightReqs = 1;
            this.maxBatchSize = 50;
            this.expectedElements = 50;
            this.failOnError = false;
            this.kinesaliteStreamName = null;
            this.serializationSchema = KinesisStreamsSinkITCase.this.serializationSchema;
            this.partitionKeyGenerator = KinesisStreamsSinkITCase.this.partitionKeyGenerator;
            this.properties = KinesisStreamsSinkITCase.this.getDefaultProperties();
        }

        public void runScenario() throws Exception {
            if (this.kinesaliteStreamName != null) {
                prepareStream(this.kinesaliteStreamName);
            }
            this.properties.setProperty("aws.trust.all.certificates", "true");
            this.properties.setProperty("aws.http.protocol.version", "HTTP1_1");
            KinesisStreamsSinkITCase.this.env.addSource(new DataGeneratorSource(RandomGenerator.stringGenerator(this.sizeOfMessageBytes), 100L, Long.valueOf(this.numberOfElementsToSend))).returns(String.class).sinkTo(KinesisStreamsSink.builder().setSerializationSchema(this.serializationSchema).setPartitionKeyGenerator(this.partitionKeyGenerator).setMaxTimeInBufferMS(this.bufferMaxTimeMS).setMaxInFlightRequests(this.maxInflightReqs).setMaxBatchSize(this.maxBatchSize).setFailOnError(this.failOnError).setMaxBufferedRequests(1000).setStreamName(this.sinkConnectionStreamName).setKinesisClientProperties(this.properties).setFailOnError(true).build());
            KinesisStreamsSinkITCase.this.env.execute("KDS Async Sink Example Program");
            Assertions.assertThat(((GetRecordsResponse) KinesisStreamsSinkITCase.this.kinesisClient.getRecords((GetRecordsRequest) GetRecordsRequest.builder().shardIterator(((GetShardIteratorResponse) KinesisStreamsSinkITCase.this.kinesisClient.getShardIterator((GetShardIteratorRequest) GetShardIteratorRequest.builder().shardId(KinesisStreamsSinkITCase.DEFAULT_FIRST_SHARD_NAME).shardIteratorType(ShardIteratorType.TRIM_HORIZON).streamName(this.kinesaliteStreamName).build()).get()).shardIterator()).build()).get()).records().size()).isEqualTo(this.expectedElements);
        }

        public Scenario withNumberOfElementsToSend(int i) {
            this.numberOfElementsToSend = i;
            return this;
        }

        public Scenario withSizeOfMessageBytes(int i) {
            this.sizeOfMessageBytes = i;
            return this;
        }

        public Scenario withBufferMaxTimeMS(int i) {
            this.bufferMaxTimeMS = i;
            return this;
        }

        public Scenario withMaxInflightReqs(int i) {
            this.maxInflightReqs = i;
            return this;
        }

        public Scenario withMaxBatchSize(int i) {
            this.maxBatchSize = i;
            return this;
        }

        public Scenario withExpectedElements(int i) {
            this.expectedElements = i;
            return this;
        }

        public Scenario withFailOnError(boolean z) {
            this.failOnError = z;
            return this;
        }

        public Scenario withSinkConnectionStreamName(String str) {
            this.sinkConnectionStreamName = str;
            return this;
        }

        public Scenario withKinesaliteStreamName(String str) {
            this.kinesaliteStreamName = str;
            return this;
        }

        public Scenario withSerializationSchema(SerializationSchema<String> serializationSchema) {
            this.serializationSchema = serializationSchema;
            return this;
        }

        public Scenario withPartitionKeyGenerator(PartitionKeyGenerator<String> partitionKeyGenerator) {
            this.partitionKeyGenerator = partitionKeyGenerator;
            return this;
        }

        public Scenario withProperties(Properties properties) {
            this.properties = properties;
            return this;
        }

        private void prepareStream(String str) throws Exception {
            RateLimiter build = RateLimiterBuilder.newBuilder().withRate(1, TimeUnit.SECONDS).withConstantThroughput().build();
            KinesisStreamsSinkITCase.this.kinesisClient.createStream((CreateStreamRequest) CreateStreamRequest.builder().streamName(str).shardCount(1).build()).get();
            Deadline fromNow = Deadline.fromNow(Duration.ofMinutes(1L));
            while (!((Boolean) build.getWhenReady(() -> {
                return Boolean.valueOf(streamExists(str));
            })).booleanValue()) {
                if (fromNow.isOverdue()) {
                    throw new RuntimeException("Failed to create stream within time");
                }
            }
        }

        private boolean streamExists(String str) {
            try {
                return ((DescribeStreamResponse) KinesisStreamsSinkITCase.this.kinesisClient.describeStream((DescribeStreamRequest) DescribeStreamRequest.builder().streamName(str).build()).get()).streamDescription().streamStatus() == StreamStatus.ACTIVE;
            } catch (Exception e) {
                return false;
            }
        }
    }

    @Before
    public void setUp() throws Exception {
        System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.env.setParallelism(1);
        this.httpClient = KINESALITE.buildSdkAsyncHttpClient();
        this.kinesisClient = KINESALITE.createHostClient(this.httpClient);
    }

    @After
    public void teardown() {
        System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
        AWSGeneralUtil.closeResources(new SdkAutoCloseable[]{this.httpClient, this.kinesisClient});
    }

    @Test
    public void elementsMaybeWrittenSuccessfullyToLocalInstanceWhenBatchSizeIsReached() throws Exception {
        new Scenario().withKinesaliteStreamName("test-stream-name-1").withSinkConnectionStreamName("test-stream-name-1").runScenario();
    }

    @Test
    public void elementsBufferedAndTriggeredByTimeBasedFlushShouldBeFlushedIfSourcedIsKeptAlive() throws Exception {
        new Scenario().withNumberOfElementsToSend(10).withMaxBatchSize(100).withExpectedElements(10).withKinesaliteStreamName("test-stream-name-2").withSinkConnectionStreamName("test-stream-name-2").runScenario();
    }

    @Test
    public void veryLargeMessagesSucceedInBeingPersisted() throws Exception {
        new Scenario().withNumberOfElementsToSend(5).withSizeOfMessageBytes(2500).withMaxBatchSize(10).withExpectedElements(5).withKinesaliteStreamName("test-stream-name-3").withSinkConnectionStreamName("test-stream-name-3").runScenario();
    }

    @Test
    public void multipleInFlightRequestsResultsInCorrectNumberOfElementsPersisted() throws Exception {
        new Scenario().withNumberOfElementsToSend(150).withSizeOfMessageBytes(2500).withBufferMaxTimeMS(2000).withMaxInflightReqs(10).withMaxBatchSize(20).withExpectedElements(150).withKinesaliteStreamName("test-stream-name-4").withSinkConnectionStreamName("test-stream-name-4").runScenario();
    }

    @Test
    public void nonExistentStreamNameShouldResultInFailureInFailOnErrorIsOn() {
        testJobFatalFailureTerminatesCorrectlyWithFailOnErrorFlagSetTo(true, "test-stream-name-5");
    }

    @Test
    public void nonExistentStreamNameShouldResultInFailureInFailOnErrorIsOff() {
        testJobFatalFailureTerminatesCorrectlyWithFailOnErrorFlagSetTo(false, "test-stream-name-6");
    }

    private void testJobFatalFailureTerminatesCorrectlyWithFailOnErrorFlagSetTo(boolean z, String str) {
        Assertions.assertThatExceptionOfType(JobExecutionException.class).isThrownBy(() -> {
            new Scenario().withKinesaliteStreamName(str).withSinkConnectionStreamName("non-existent-stream").withFailOnError(z).runScenario();
        }).havingCause().havingCause().withMessageContaining("Encountered non-recoverable exception");
    }

    @Test
    public void veryLargeMessagesFailGracefullyWithBrokenElementConverter() {
        Assertions.assertThatExceptionOfType(JobExecutionException.class).isThrownBy(() -> {
            new Scenario().withNumberOfElementsToSend(5).withSizeOfMessageBytes(2500).withExpectedElements(5).withKinesaliteStreamName("test-stream-name-7").withSinkConnectionStreamName("test-stream-name-7").withSerializationSchema(this.serializationSchema).withPartitionKeyGenerator(this.longPartitionKeyGenerator).runScenario();
        }).havingCause().havingCause().withMessageContaining("Encountered an exception while persisting records, not retrying due to {failOnError} being set.");
    }

    @Test
    public void badRegionShouldResultInFailureWhenInFailOnErrorIsOn() {
        badRegionShouldResultInFailureWhenInFailOnErrorIs(true);
    }

    @Test
    public void badRegionShouldResultInFailureWhenInFailOnErrorIsOff() {
        badRegionShouldResultInFailureWhenInFailOnErrorIs(false);
    }

    private void badRegionShouldResultInFailureWhenInFailOnErrorIs(boolean z) {
        Properties defaultProperties = getDefaultProperties();
        defaultProperties.setProperty("aws.region", "some-bad-region");
        assertRunWithPropertiesAndStreamShouldFailWithExceptionOfType(z, defaultProperties, "Invalid AWS region");
    }

    @Test
    public void missingRegionShouldResultInFailureWhenInFailOnErrorIsOn() {
        missingRegionShouldResultInFailureWhenInFailOnErrorIs(true);
    }

    @Test
    public void missingRegionShouldResultInFailureWhenInFailOnErrorIsOff() {
        missingRegionShouldResultInFailureWhenInFailOnErrorIs(false);
    }

    private void missingRegionShouldResultInFailureWhenInFailOnErrorIs(boolean z) {
        Properties defaultProperties = getDefaultProperties();
        defaultProperties.remove("aws.region");
        assertRunWithPropertiesAndStreamShouldFailWithExceptionOfType(z, defaultProperties, "region must not be null.");
    }

    @Test
    public void noURIEndpointShouldResultInFailureWhenInFailOnErrorIsOn() {
        noURIEndpointShouldResultInFailureWhenInFailOnErrorIs(true);
    }

    @Test
    public void noURIEndpointShouldResultInFailureWhenInFailOnErrorIsOff() {
        noURIEndpointShouldResultInFailureWhenInFailOnErrorIs(false);
    }

    private void noURIEndpointShouldResultInFailureWhenInFailOnErrorIs(boolean z) {
        Properties defaultProperties = getDefaultProperties();
        defaultProperties.setProperty("aws.endpoint", "bad-endpoint-no-uri");
        assertRunWithPropertiesAndStreamShouldFailWithExceptionOfType(z, defaultProperties, "The URI scheme of endpointOverride must not be null.");
    }

    @Test
    public void badEndpointShouldResultInFailureWhenInFailOnErrorIsOn() {
        badEndpointShouldResultInFailureWhenInFailOnErrorIs(true);
    }

    @Test
    public void badEndpointShouldResultInFailureWhenInFailOnErrorIsOff() {
        badEndpointShouldResultInFailureWhenInFailOnErrorIs(false);
    }

    private void badEndpointShouldResultInFailureWhenInFailOnErrorIs(boolean z) {
        Properties defaultProperties = getDefaultProperties();
        defaultProperties.setProperty("aws.endpoint", "https://bad-endpoint-with-uri");
        assertRunWithPropertiesAndStreamShouldFailWithExceptionOfType(z, defaultProperties, "UnknownHostException when attempting to interact with a service.");
    }

    @Test
    public void envVarWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOn() {
        noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(true, AWSConfigConstants.CredentialProvider.ENV_VAR.toString(), "Access key must be specified either via environment variable");
    }

    @Test
    public void envVarWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOff() {
        noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(false, AWSConfigConstants.CredentialProvider.ENV_VAR.toString(), "Access key must be specified either via environment variable");
    }

    @Test
    public void sysPropWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOn() {
        noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(true, AWSConfigConstants.CredentialProvider.SYS_PROP.toString(), "Unable to load credentials from system settings");
    }

    @Test
    public void sysPropWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOff() {
        noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(false, AWSConfigConstants.CredentialProvider.SYS_PROP.toString(), "Unable to load credentials from system settings");
    }

    @Test
    public void basicWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOn() {
        noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(true, AWSConfigConstants.CredentialProvider.BASIC.toString(), "Please set values for AWS Access Key ID ('aws.credentials.provider.basic.accesskeyid') and Secret Key ('aws.credentials.provider.basic.secretkey') when using the BASIC AWS credential provider type.");
    }

    @Test
    public void basicWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOff() {
        noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(false, AWSConfigConstants.CredentialProvider.BASIC.toString(), "Please set values for AWS Access Key ID ('aws.credentials.provider.basic.accesskeyid') and Secret Key ('aws.credentials.provider.basic.secretkey') when using the BASIC AWS credential provider type.");
    }

    @Test
    public void webIdentityTokenWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOn() {
        noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(true, AWSConfigConstants.CredentialProvider.WEB_IDENTITY_TOKEN.toString(), "Either the environment variable AWS_WEB_IDENTITY_TOKEN_FILE or the javaproperty aws.webIdentityTokenFile must be set");
    }

    @Test
    public void webIdentityTokenWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOff() {
        noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(false, AWSConfigConstants.CredentialProvider.WEB_IDENTITY_TOKEN.toString(), "Either the environment variable AWS_WEB_IDENTITY_TOKEN_FILE or the javaproperty aws.webIdentityTokenFile must be set");
    }

    @Test
    public void wrongCredentialProviderNameShouldResultInFailureWhenInFailOnErrorIsOn() {
        noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(true, "WRONG", "Invalid AWS Credential Provider Type");
    }

    @Test
    public void wrongCredentialProviderNameShouldResultInFailureWhenInFailOnErrorIsOff() {
        noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(false, "WRONG", "Invalid AWS Credential Provider Type");
    }

    private void credentialsProvidedThroughProfilePathShouldResultInFailure(boolean z, String str, String str2, String str3) {
        Properties defaultPropertiesWithoutCredentialsSetAndCredentialProvider = getDefaultPropertiesWithoutCredentialsSetAndCredentialProvider(str);
        defaultPropertiesWithoutCredentialsSetAndCredentialProvider.put(AWSConfigConstants.profilePath("aws.credentials.provider"), str2);
        assertRunWithPropertiesAndStreamShouldFailWithExceptionOfType(z, defaultPropertiesWithoutCredentialsSetAndCredentialProvider, str3);
    }

    private void noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(boolean z, String str, String str2) {
        assertRunWithPropertiesAndStreamShouldFailWithExceptionOfType(z, getDefaultPropertiesWithoutCredentialsSetAndCredentialProvider(str), str2);
    }

    private void assertRunWithPropertiesAndStreamShouldFailWithExceptionOfType(boolean z, Properties properties, String str) {
        Assertions.assertThatExceptionOfType(JobExecutionException.class).isThrownBy(() -> {
            new Scenario().withSinkConnectionStreamName("default-stream-name").withFailOnError(z).withProperties(properties).runScenario();
        }).havingCause().havingCause().withMessageContaining(str);
    }

    private Properties getDefaultPropertiesWithoutCredentialsSetAndCredentialProvider(String str) {
        Properties defaultProperties = getDefaultProperties();
        defaultProperties.setProperty("aws.credentials.provider", str);
        defaultProperties.remove(AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
        defaultProperties.remove(AWSConfigConstants.AWS_ACCESS_KEY_ID);
        return defaultProperties;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Properties getDefaultProperties() {
        Properties properties = new Properties();
        properties.setProperty("aws.endpoint", KINESALITE.getHostEndpointUrl());
        properties.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, KINESALITE.getAccessKey());
        properties.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, KINESALITE.getSecretKey());
        properties.setProperty("aws.region", KINESALITE.getRegion().toString());
        return properties;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1274293664:
                if (implMethodName.equals("lambda$new$16bad596$1")) {
                    z = true;
                    break;
                }
                break;
            case -332657180:
                if (implMethodName.equals("lambda$new$efabbfb2$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/connector/kinesis/sink/PartitionKeyGenerator") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/String;")) {
                    return str -> {
                        return str;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/connector/kinesis/sink/PartitionKeyGenerator") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/String;")) {
                    return str2 -> {
                        return String.valueOf(str2.hashCode());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
