/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.quarkus.component.aws2.kinesis.it;

import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import org.apache.camel.quarkus.test.support.aws2.Aws2TestEnvContext;
import org.apache.camel.quarkus.test.support.aws2.Aws2TestEnvCustomizer;
import org.apache.commons.lang3.RandomStringUtils;
import org.awaitility.Awaitility;
import org.jboss.logging.Logger;
import org.testcontainers.containers.localstack.LocalStackContainer;
import software.amazon.awssdk.services.firehose.FirehoseClient;
import software.amazon.awssdk.services.firehose.model.BufferingHints;
import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamRequest;
import software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamRequest;
import software.amazon.awssdk.services.firehose.model.DeliveryStreamStatus;
import software.amazon.awssdk.services.firehose.model.DeliveryStreamType;
import software.amazon.awssdk.services.firehose.model.DescribeDeliveryStreamRequest;
import software.amazon.awssdk.services.firehose.model.InvalidArgumentException;
import software.amazon.awssdk.services.firehose.model.S3DestinationConfiguration;
import software.amazon.awssdk.services.iam.IamClient;
import software.amazon.awssdk.services.iam.model.AttachRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.CreatePolicyRequest;
import software.amazon.awssdk.services.iam.model.CreateRoleRequest;
import software.amazon.awssdk.services.iam.model.DeletePolicyRequest;
import software.amazon.awssdk.services.iam.model.DeleteRoleRequest;
import software.amazon.awssdk.services.iam.model.DetachRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.GetPolicyRequest;
import software.amazon.awssdk.services.iam.model.GetRoleRequest;
import software.amazon.awssdk.services.iam.waiters.IamWaiter;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.waiters.KinesisWaiter;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.waiters.S3Waiter;

public class Aws2KinesisTestEnvCustomizer
implements Aws2TestEnvCustomizer {
    public static final int BUFFERING_SIZE_MB = 1;
    public static final int BUFFERING_TIME_SEC = 60;
    private static final Logger LOG = Logger.getLogger(Aws2KinesisTestEnvCustomizer.class);

    public LocalStackContainer.Service[] localstackServices() {
        return new LocalStackContainer.Service[]{LocalStackContainer.Service.KINESIS, LocalStackContainer.Service.FIREHOSE, LocalStackContainer.Service.S3, LocalStackContainer.Service.IAM};
    }

    public LocalStackContainer.Service[] exportCredentialsForLocalstackServices() {
        return new LocalStackContainer.Service[]{LocalStackContainer.Service.KINESIS, LocalStackContainer.Service.FIREHOSE};
    }

    public void customize(Aws2TestEnvContext envContext) {
        String streamArn;
        String streamName = "camel-quarkus-" + RandomStringUtils.randomAlphanumeric((int)16).toLowerCase(Locale.ROOT);
        envContext.property("aws-kinesis.stream-name", streamName);
        KinesisClient client = (KinesisClient)envContext.client(LocalStackContainer.Service.KINESIS, KinesisClient::builder);
        client.createStream((CreateStreamRequest)CreateStreamRequest.builder().shardCount(Integer.valueOf(1)).streamName(streamName).build());
        try (KinesisWaiter waiter = client.waiter();){
            streamArn = ((DescribeStreamResponse)waiter.waitUntilStreamExists((DescribeStreamRequest)DescribeStreamRequest.builder().streamName(streamName).build()).matched().response().get()).streamDescription().streamARN();
        }
        envContext.closeable(() -> client.deleteStream((DeleteStreamRequest)DeleteStreamRequest.builder().streamName(streamName).build()));
        S3Client s3Client = (S3Client)envContext.client(LocalStackContainer.Service.S3, S3Client::builder);
        String bucketName = "camel-quarkus-firehose-" + RandomStringUtils.randomAlphanumeric((int)32).toLowerCase(Locale.ROOT);
        String bucketArn = "arn:aws:s3:::" + bucketName;
        envContext.property("aws-kinesis.s3-bucket-name", bucketName);
        s3Client.createBucket((CreateBucketRequest)CreateBucketRequest.builder().bucket(bucketName).build());
        envContext.closeable(() -> s3Client.deleteBucket((DeleteBucketRequest)DeleteBucketRequest.builder().bucket(bucketName).build()));
        envContext.closeable(() -> {
            ListObjectsResponse objects = s3Client.listObjects((ListObjectsRequest)ListObjectsRequest.builder().bucket(bucketName).build());
            List objs = objects.contents();
            LOG.info((Object)("Deleting " + objs.size() + " objects in bucket " + bucketName));
            for (S3Object obj : objs) {
                LOG.info((Object)("Deleting object " + obj.key()));
                s3Client.deleteObject((DeleteObjectRequest)DeleteObjectRequest.builder().bucket(bucketName).key(obj.key()).build());
            }
        });
        try (S3Waiter w = s3Client.waiter();){
            w.waitUntilBucketExists((HeadBucketRequest)HeadBucketRequest.builder().bucket(bucketName).build());
        }
        String deliveryStreamName = "camel-quarkus-firehose-delstr-" + RandomStringUtils.randomAlphanumeric((int)16).toLowerCase(Locale.ROOT);
        envContext.property("aws-kinesis-firehose.delivery-stream-name", deliveryStreamName);
        String roleName = "s3-" + deliveryStreamName;
        IamClient iamClient = (IamClient)envContext.client(LocalStackContainer.Service.IAM, IamClient::builder);
        String roleArn = iamClient.createRole((CreateRoleRequest)CreateRoleRequest.builder().roleName(roleName).path("/service-role/").assumeRolePolicyDocument("{\n  \"Version\": \"2012-10-17\",\n  \"Statement\": [\n    {\n      \"Sid\": \"sid" + RandomStringUtils.randomAlphanumeric((int)16) + "\",\n      \"Effect\": \"Allow\",\n      \"Principal\": {\n        \"Service\": \"firehose.amazonaws.com\"\n      },\n      \"Action\": \"sts:AssumeRole\"\n    }\n  ]\n}").build()).role().arn();
        envContext.closeable(() -> iamClient.deleteRole((DeleteRoleRequest)DeleteRoleRequest.builder().roleName(roleName).build()));
        try (IamWaiter w = iamClient.waiter();){
            w.waitUntilRoleExists((GetRoleRequest)GetRoleRequest.builder().roleName(roleName).build());
        }
        String policyName = "firehose-s3-policy-" + deliveryStreamName;
        String policy = "{\n    \"Version\": \"2012-10-17\",\n    \"Statement\":\n    [\n        {\n            \"Sid\": \"sid" + RandomStringUtils.randomAlphanumeric((int)16) + "\",\n            \"Effect\": \"Allow\",\n            \"Action\": [\n                \"s3:AbortMultipartUpload\",\n                \"s3:GetBucketLocation\",\n                \"s3:GetObject\",\n                \"s3:ListBucket\",\n                \"s3:ListBucketMultipartUploads\",\n                \"s3:PutObject\"\n            ],      \n            \"Resource\": [\n                \"arn:aws:s3:::" + bucketName + "\",\n                \"arn:aws:s3:::" + bucketName + "/*\"\n            ]\n        },\n        {\n            \"Sid\": \"sid" + RandomStringUtils.randomAlphanumeric((int)16) + "\",\n            \"Effect\": \"Allow\",\n            \"Action\": [\n                \"kinesis:DescribeStream\",\n                \"kinesis:GetShardIterator\",\n                \"kinesis:GetRecords\",\n                \"kinesis:ListShards\"\n            ],\n            \"Resource\": \"" + streamArn + "\"\n        }\n    ]\n}";
        String policyArn = iamClient.createPolicy((CreatePolicyRequest)CreatePolicyRequest.builder().policyName(policyName).policyDocument(policy).build()).policy().arn();
        envContext.closeable(() -> iamClient.deletePolicy((DeletePolicyRequest)DeletePolicyRequest.builder().policyArn(policyArn).build()));
        try (IamWaiter w = iamClient.waiter();){
            w.waitUntilPolicyExists((GetPolicyRequest)GetPolicyRequest.builder().policyArn(policyArn).build());
        }
        iamClient.attachRolePolicy((AttachRolePolicyRequest)AttachRolePolicyRequest.builder().policyArn(policyArn).roleName(roleName).build());
        envContext.closeable(() -> iamClient.detachRolePolicy((DetachRolePolicyRequest)DetachRolePolicyRequest.builder().roleName(roleName).policyArn(policyArn).build()));
        FirehoseClient fhClient = (FirehoseClient)envContext.client(LocalStackContainer.Service.FIREHOSE, FirehoseClient::builder);
        Awaitility.await().pollInterval(1L, TimeUnit.SECONDS).atMost(120L, TimeUnit.SECONDS).until(() -> {
            try {
                fhClient.createDeliveryStream((CreateDeliveryStreamRequest)CreateDeliveryStreamRequest.builder().deliveryStreamName(deliveryStreamName).s3DestinationConfiguration((S3DestinationConfiguration)S3DestinationConfiguration.builder().bucketARN(bucketArn).roleARN(roleArn).bufferingHints((BufferingHints)BufferingHints.builder().intervalInSeconds(Integer.valueOf(60)).sizeInMBs(Integer.valueOf(1)).build()).build()).deliveryStreamType(DeliveryStreamType.DIRECT_PUT).build());
                LOG.info((Object)("Firehose delivery stream " + deliveryStreamName + " finally created"));
                return true;
            }
            catch (InvalidArgumentException e) {
                LOG.info((Object)("Retrying the creation of delivery stream " + deliveryStreamName + " because " + e.getMessage()));
                return false;
            }
        });
        Awaitility.await().pollInterval(1L, TimeUnit.SECONDS).atMost(120L, TimeUnit.SECONDS).until(() -> {
            DeliveryStreamStatus status = fhClient.describeDeliveryStream((DescribeDeliveryStreamRequest)DescribeDeliveryStreamRequest.builder().deliveryStreamName(deliveryStreamName).build()).deliveryStreamDescription().deliveryStreamStatus();
            LOG.info((Object)("Delivery stream " + deliveryStreamName + " status: " + status));
            return status == DeliveryStreamStatus.ACTIVE;
        });
        envContext.closeable(() -> fhClient.deleteDeliveryStream((DeleteDeliveryStreamRequest)DeleteDeliveryStreamRequest.builder().deliveryStreamName(deliveryStreamName).build()));
    }
}

