package org.apache.flink.connector.firehose.sink;

import java.util.List;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
import org.apache.flink.connector.aws.testutils.LocalstackContainer;
import org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.services.firehose.FirehoseClient;
import software.amazon.awssdk.services.iam.IamClient;
import software.amazon.awssdk.services.s3.S3Client;

@Testcontainers
@ExtendWith({MiniClusterExtension.class})
/* loaded from: input_file:org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.class */
class KinesisFirehoseSinkITCase {
    private static final String ROLE_NAME = "super-role";
    private static final String ROLE_ARN = "arn:aws:iam::000000000000:role/super-role";
    private static final String BUCKET_NAME = "s3-firehose";
    private static final String STREAM_NAME = "s3-stream";
    private static final int NUMBER_OF_ELEMENTS = 92;
    private StreamExecutionEnvironment env;
    private SdkHttpClient httpClient;
    private S3Client s3Client;
    private FirehoseClient firehoseClient;
    private IamClient iamClient;
    private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseSinkITCase.class);

    @Container
    private static LocalstackContainer mockFirehoseContainer = new LocalstackContainer(DockerImageName.parse("localstack/localstack:0.13.3"));

    KinesisFirehoseSinkITCase() {
    }

    @BeforeEach
    void setup() {
        System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
        this.httpClient = AWSServicesTestUtils.createHttpClient();
        this.s3Client = AWSServicesTestUtils.createS3Client(mockFirehoseContainer.getEndpoint(), this.httpClient);
        this.firehoseClient = KinesisFirehoseTestUtils.createFirehoseClient(mockFirehoseContainer.getEndpoint(), this.httpClient);
        this.iamClient = AWSServicesTestUtils.createIamClient(mockFirehoseContainer.getEndpoint(), this.httpClient);
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
    }

    @AfterEach
    void teardown() {
        System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
    }

    @Test
    void firehoseSinkWritesCorrectDataToMockAWSServices() throws Exception {
        LOG.info("1 - Creating the bucket for Firehose to deliver into...");
        AWSServicesTestUtils.createBucket(this.s3Client, BUCKET_NAME);
        LOG.info("2 - Creating the IAM Role for Firehose to write into the s3 bucket...");
        AWSServicesTestUtils.createIAMRole(this.iamClient, ROLE_NAME);
        LOG.info("3 - Creating the Firehose delivery stream...");
        KinesisFirehoseTestUtils.createDeliveryStream(STREAM_NAME, BUCKET_NAME, ROLE_ARN, this.firehoseClient);
        KinesisFirehoseTestUtils.getSampleDataGenerator(this.env, NUMBER_OF_ELEMENTS).sinkTo(KinesisFirehoseSink.builder().setSerializationSchema(new SimpleStringSchema()).setDeliveryStreamName(STREAM_NAME).setMaxBatchSize(1).setFirehoseClientProperties(AWSServicesTestUtils.createConfig(mockFirehoseContainer.getEndpoint())).build());
        this.env.execute("Integration Test");
        List listBucketObjects = AWSServicesTestUtils.listBucketObjects(AWSServicesTestUtils.createS3Client(mockFirehoseContainer.getEndpoint(), this.httpClient), BUCKET_NAME);
        Assertions.assertThat(listBucketObjects.size()).isEqualTo(NUMBER_OF_ELEMENTS);
        Assertions.assertThat(AWSServicesTestUtils.readObjectsFromS3Bucket(this.s3Client, listBucketObjects, BUCKET_NAME, responseBytes -> {
            return new String(responseBytes.asByteArrayUnsafe());
        })).containsAll(KinesisFirehoseTestUtils.getSampleData(NUMBER_OF_ELEMENTS));
    }
}
