package org.apache.flink.connector.firehose.sink;

import java.util.Properties;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.aws.config.AWSConfigConstants;
import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import software.amazon.awssdk.services.firehose.model.Record;

/* loaded from: input_file:org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkTest.class */
public class KinesisFirehoseSinkTest {
    private static final ElementConverter<String, Record> elementConverter = KinesisFirehoseSinkElementConverter.builder().setSerializationSchema(new SimpleStringSchema()).build();

    @Test
    public void deliveryStreamNameMustNotBeNull() {
        Assertions.assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> {
            new KinesisFirehoseSink(elementConverter, 500, 16, 10000, 4194304L, 5000L, 1024000L, false, (String) null, new Properties());
        }).withMessageContaining("The delivery stream name must not be null when initializing the KDF Sink.");
    }

    @Test
    public void deliveryStreamNameMustNotBeEmpty() {
        Assertions.assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> {
            new KinesisFirehoseSink(elementConverter, 500, 16, 10000, 4194304L, 5000L, 1024000L, false, "", new Properties());
        }).withMessageContaining("The delivery stream name must be set when initializing the KDF Sink.");
    }

    @Test
    public void firehoseSinkFailsWhenAccessKeyIdIsNotProvided() {
        Properties createConfig = AWSServicesTestUtils.createConfig("https://non-exisitent-location");
        createConfig.setProperty("aws.credentials.provider", AWSConfigConstants.CredentialProvider.BASIC.toString());
        createConfig.remove(AWSConfigConstants.accessKeyId("aws.credentials.provider"));
        firehoseSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(createConfig, "Please set values for AWS Access Key ID");
    }

    @Test
    public void firehoseSinkFailsWhenRegionIsNotProvided() {
        Properties createConfig = AWSServicesTestUtils.createConfig("https://non-exisitent-location");
        createConfig.remove("aws.region");
        firehoseSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(createConfig, "region must not be null.");
    }

    @Test
    public void firehoseSinkFailsWhenUnableToConnectToRemoteService() {
        Properties createConfig = AWSServicesTestUtils.createConfig("https://non-exisitent-location");
        createConfig.remove("aws.trust.all.certificates");
        firehoseSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(createConfig, "Received an UnknownHostException when attempting to interact with a service.");
    }

    private void firehoseSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(Properties properties, String str) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        KinesisFirehoseTestUtils.getSampleDataGenerator(executionEnvironment, 10).sinkTo(KinesisFirehoseSink.builder().setSerializationSchema(new SimpleStringSchema()).setDeliveryStreamName("non-existent-stream").setMaxBatchSize(1).setFirehoseClientProperties(properties).build());
        Assertions.assertThatExceptionOfType(JobExecutionException.class).isThrownBy(() -> {
            executionEnvironment.execute("Integration Test");
        }).havingCause().havingCause().withMessageContaining(str);
    }
}
