package org.apache.nifi.processors.aws.kinesis.stream;

import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.kinesis.AbstractBaseKinesisProcessor;

@CapabilityDescription("Sends the contents to a specified Amazon Kinesis. In order to send data to Kinesis, the stream name has to be specified.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@SupportsBatching
@Tags({"amazon", "aws", "kinesis", "put", "stream"})
@WritesAttributes({@WritesAttribute(attribute = AbstractBaseKinesisProcessor.AWS_KINESIS_ERROR_MESSAGE, description = "Error message on posting message to AWS Kinesis"), @WritesAttribute(attribute = PutKinesisStream.AWS_KINESIS_ERROR_CODE, description = "Error code for the message when posting to AWS Kinesis"), @WritesAttribute(attribute = PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER, description = "Sequence number for the message when posting to AWS Kinesis"), @WritesAttribute(attribute = PutKinesisStream.AWS_KINESIS_SHARD_ID, description = "Shard id of the message posted to AWS Kinesis")})
/* loaded from: input_file:org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.class */
public class PutKinesisStream extends AbstractKinesisStreamProcessor {
    public static final String AWS_KINESIS_ERROR_CODE = "aws.kinesis.error.code";
    public static final String AWS_KINESIS_SHARD_ID = "aws.kinesis.shard.id";
    public static final String AWS_KINESIS_SEQUENCE_NUMBER = "aws.kinesis.sequence.number";
    public static final PropertyDescriptor KINESIS_PARTITION_KEY = new PropertyDescriptor.Builder().displayName("Amazon Kinesis Stream Partition Key").name("amazon-kinesis-stream-partition-key").description("The partition key attribute.  If it is not set, a random value is used").expressionLanguageSupported(true).defaultValue("${kinesis.partition.key}").required(false).addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).build();
    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(KINESIS_STREAM_NAME, KINESIS_PARTITION_KEY, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_HOST, PROXY_HOST_PORT));
    protected Random randomParitionKeyGenerator = new Random();

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) {
        int intValue = processContext.getProperty(BATCH_SIZE).asInteger().intValue();
        long longValue = processContext.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue();
        String value = processContext.getProperty(KINESIS_STREAM_NAME).getValue();
        List<FlowFile> filterMessagesByMaxSize = filterMessagesByMaxSize(processSession, intValue, longValue, value, AbstractBaseKinesisProcessor.AWS_KINESIS_ERROR_MESSAGE);
        AmazonKinesisClient amazonKinesisClient = (AmazonKinesisClient) getClient();
        try {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            for (int i = 0; i < filterMessagesByMaxSize.size(); i++) {
                FlowFile flowFile = filterMessagesByMaxSize.get(i);
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                processSession.exportTo(flowFile, byteArrayOutputStream);
                PutRecordsRequestEntry withData = new PutRecordsRequestEntry().withData(ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
                String value2 = processContext.getProperty(KINESIS_PARTITION_KEY).evaluateAttributeExpressions(filterMessagesByMaxSize.get(i)).getValue();
                if (StringUtils.isBlank(value2)) {
                    withData.setPartitionKey(Integer.toString(this.randomParitionKeyGenerator.nextInt()));
                } else {
                    withData.setPartitionKey(value2);
                }
                arrayList.add(withData);
            }
            if (arrayList.size() > 0) {
                PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
                putRecordsRequest.setStreamName(value);
                putRecordsRequest.setRecords(arrayList);
                List records = amazonKinesisClient.putRecords(putRecordsRequest).getRecords();
                for (int i2 = 0; i2 < records.size(); i2++) {
                    PutRecordsResultEntry putRecordsResultEntry = (PutRecordsResultEntry) records.get(i2);
                    FlowFile flowFile2 = filterMessagesByMaxSize.get(i2);
                    HashMap hashMap = new HashMap();
                    hashMap.put(AWS_KINESIS_SHARD_ID, putRecordsResultEntry.getShardId());
                    hashMap.put(AWS_KINESIS_SEQUENCE_NUMBER, putRecordsResultEntry.getSequenceNumber());
                    if (StringUtils.isBlank(putRecordsResultEntry.getErrorCode())) {
                        arrayList3.add(processSession.putAllAttributes(flowFile2, hashMap));
                    } else {
                        hashMap.put(AWS_KINESIS_ERROR_CODE, putRecordsResultEntry.getErrorCode());
                        hashMap.put(AbstractBaseKinesisProcessor.AWS_KINESIS_ERROR_MESSAGE, putRecordsResultEntry.getErrorMessage());
                        arrayList2.add(processSession.putAllAttributes(flowFile2, hashMap));
                    }
                }
                if (arrayList2.size() > 0) {
                    processSession.transfer(arrayList2, REL_FAILURE);
                    getLogger().error("Failed to publish to kinesis {} records {}", new Object[]{value, arrayList2});
                }
                if (arrayList3.size() > 0) {
                    processSession.transfer(arrayList3, REL_SUCCESS);
                    getLogger().debug("Successfully published to kinesis {} records {}", new Object[]{value, arrayList3});
                }
                arrayList.clear();
            }
        } catch (Exception e) {
            getLogger().error("Failed to publish due to exception {} to kinesis {} flowfiles {} ", new Object[]{e, value, filterMessagesByMaxSize});
            processSession.transfer(filterMessagesByMaxSize, REL_FAILURE);
            processContext.yield();
        }
    }
}
