package org.apache.nifi.processors.kafka.pubsub;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.xml.bind.DatatypeConverter;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.FlowFileFilters;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils;
import org.apache.nifi.processors.kafka.pubsub.Partitioners;

@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka using the Kafka 0.11.x Producer API.The messages to send may be individual FlowFiles or may be delimited, using a user-specified delimiter, such as a new-line.  Please note there are cases where the publisher can get into an indefinite stuck state.  We are closely monitoring how this evolves in the Kafka community and will take advantage of those fixes as soon as we can.  In the meantime it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on. The complementary NiFi processor for fetching messages is ConsumeKafka_0_11.")
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", description = "These properties will be added on the Kafka configuration after loading any provided configuration properties. In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged. For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
@WritesAttribute(attribute = PublishKafka_0_11.MSG_COUNT, description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to FlowFiles that are routed to success. If the <Message Demarcator> Property is not set, this will always be 1, but if the Property is set, it may be greater than 1.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "0.11.x"})
/* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_11.class */
public class PublishKafka_0_11 extends AbstractProcessor {
    protected static final String MSG_COUNT = "msg.count";
    static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", "FlowFile will be routed to failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration");
    static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes");
    static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result in data loss.");
    static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(Partitioners.RoundRobinPartitioner.class.getName(), Partitioners.RoundRobinPartitioner.class.getSimpleName(), "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, the next Partition to Partition 2, and so on, wrapping as necessary.");
    static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner", "DefaultPartitioner", "Messages will be assigned to random partitions.");
    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded", "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
    static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder().name("topic").displayName("Topic Name").description("The name of the Kafka Topic to publish to.").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(true).build();
    static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder().name("acks").displayName("Delivery Guarantee").description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.").required(true).expressionLanguageSupported(false).allowableValues(new AllowableValue[]{DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED}).defaultValue(DELIVERY_BEST_EFFORT.getValue()).build();
    static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder().name("max.block.ms").displayName("Max Metadata Wait Time").description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the entire 'send' call. Corresponds to Kafka's 'max.block.ms' property").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(true).defaultValue("5 sec").build();
    static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder().name("ack.wait.time").displayName("Acknowledgment Wait Time").description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(false).required(true).defaultValue("5 secs").build();
    static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder().name("max.request.size").displayName("Max Request Size").description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).").required(true).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").build();
    static final PropertyDescriptor KEY = new PropertyDescriptor.Builder().name("kafka-key").displayName("Kafka Key").description("The Key to use for the Message. If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present and we're not demarcating.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).build();
    static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder().name("key-attribute-encoding").displayName("Key Attribute Encoding").description("FlowFiles that are emitted have an attribute named 'kafka.key'. This property dictates how the value of the attribute should be encoded.").required(true).defaultValue(UTF8_ENCODING.getValue()).allowableValues(new AllowableValue[]{UTF8_ENCODING, HEX_ENCODING}).build();
    static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder().name("message-demarcator").displayName("Message Demarcator").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.").build();
    static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder().name("partitioner.class").displayName("Partitioner class").description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.").allowableValues(new AllowableValue[]{ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING}).defaultValue(RANDOM_PARTITIONING.getValue()).required(false).build();
    static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder().name("compression.type").displayName("Compression Type").description("This parameter allows you to specify the compression codec for all data generated by this producer.").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).allowableValues(new String[]{"none", "gzip", "snappy", "lz4"}).defaultValue("none").build();
    static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder().name("attribute-name-regex").displayName("Attributes to Send as Headers (Regex)").description("A Regular Expression that is matched against all FlowFile attribute names. Any attribute whose name matches the regex will be added to the Kafka messages as a Header. If not specified, no FlowFile attributes will be added as headers.").addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).expressionLanguageSupported(false).required(false).build();
    static final PropertyDescriptor USE_TRANSACTIONS = new PropertyDescriptor.Builder().name("use-transactions").displayName("Use Transactions").description("Specifies whether or not NiFi should provide Transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true requires that the <Delivery Guarantee> property be set to \"Guarantee Replicated Delivery.\"").expressionLanguageSupported(false).allowableValues(new String[]{"true", "false"}).defaultValue("true").required(true).build();
    static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new PropertyDescriptor.Builder().name("message-header-encoding").displayName("Message Header Encoding").description("For any attribute that is added as a message header, as configured via the <Attributes to Send as Headers> property, this property indicates the Character Encoding to use for serializing the headers.").addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).defaultValue("UTF-8").required(false).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles for which all content was sent to Kafka.").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship").build();
    private static final List<PropertyDescriptor> PROPERTIES;
    private static final Set<Relationship> RELATIONSHIPS;
    private volatile PublisherPool publisherPool = null;

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String str) {
        return new PropertyDescriptor.Builder().description("Specifies the value for '" + str + "' Kafka Configuration.").name(str).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class)).dynamic(true).build();
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(KafkaProcessorUtils.validateCommonProperties(validationContext));
        if (validationContext.getProperty(USE_TRANSACTIONS).asBoolean().booleanValue()) {
            if (!DELIVERY_REPLICATED.getValue().equals(validationContext.getProperty(DELIVERY_GUARANTEE).getValue())) {
                arrayList.add(new ValidationResult.Builder().subject("Delivery Guarantee").valid(false).explanation("In order to use Transactions, the Delivery Guarantee must be \"Guarantee Replicated Delivery.\" Either change the <Use Transactions> property or the <Delivery Guarantee> property.").build());
            }
        }
        return arrayList;
    }

    private synchronized PublisherPool getPublisherPool(ProcessContext processContext) {
        PublisherPool publisherPool = this.publisherPool;
        if (publisherPool != null) {
            return publisherPool;
        }
        PublisherPool createPublisherPool = createPublisherPool(processContext);
        this.publisherPool = createPublisherPool;
        return createPublisherPool;
    }

    protected PublisherPool createPublisherPool(ProcessContext processContext) {
        int intValue = processContext.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
        long longValue = processContext.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
        String value = processContext.getProperty(ATTRIBUTE_NAME_REGEX).getValue();
        Pattern compile = value == null ? null : Pattern.compile(value);
        boolean booleanValue = processContext.getProperty(USE_TRANSACTIONS).asBoolean().booleanValue();
        Charset forName = Charset.forName(processContext.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue());
        HashMap hashMap = new HashMap();
        KafkaProcessorUtils.buildCommonKafkaProperties(processContext, ProducerConfig.class, hashMap);
        hashMap.put("key.serializer", ByteArraySerializer.class.getName());
        hashMap.put("value.serializer", ByteArraySerializer.class.getName());
        hashMap.put("max.request.size", String.valueOf(intValue));
        return new PublisherPool(hashMap, getLogger(), intValue, longValue, booleanValue, compile, forName);
    }

    @OnStopped
    public void closePool() {
        if (this.publisherPool != null) {
            this.publisherPool.close();
        }
        this.publisherPool = null;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        boolean isSet = processContext.getProperty(MESSAGE_DEMARCATOR).isSet();
        List<FlowFile> list = processSession.get(FlowFileFilters.newSizeBasedFilter(250.0d, DataUnit.KB, 500));
        if (list.isEmpty()) {
            return;
        }
        PublisherPool publisherPool = getPublisherPool(processContext);
        if (publisherPool == null) {
            processContext.yield();
            return;
        }
        String value = processContext.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
        String value2 = processContext.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
        boolean booleanValue = processContext.getProperty(USE_TRANSACTIONS).asBoolean().booleanValue();
        long nanoTime = System.nanoTime();
        final PublisherLease obtainPublisher = publisherPool.obtainPublisher();
        Throwable th = null;
        try {
            if (booleanValue) {
                obtainPublisher.beginTransaction();
            }
            for (final FlowFile flowFile : list) {
                if (isScheduled()) {
                    final byte[] messageKey = getMessageKey(flowFile, processContext);
                    final String value3 = processContext.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
                    final byte[] bytes = isSet ? processContext.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null;
                    processSession.read(flowFile, new InputStreamCallback() { // from class: org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_11.1
                        public void process(InputStream inputStream) throws IOException {
                            BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
                            Throwable th2 = null;
                            try {
                                try {
                                    obtainPublisher.publish(flowFile, bufferedInputStream, messageKey, bytes, value3);
                                    if (bufferedInputStream != null) {
                                        if (0 == 0) {
                                            bufferedInputStream.close();
                                            return;
                                        }
                                        try {
                                            bufferedInputStream.close();
                                        } catch (Throwable th3) {
                                            th2.addSuppressed(th3);
                                        }
                                    }
                                } catch (Throwable th4) {
                                    th2 = th4;
                                    throw th4;
                                }
                            } catch (Throwable th5) {
                                if (bufferedInputStream != null) {
                                    if (th2 != null) {
                                        try {
                                            bufferedInputStream.close();
                                        } catch (Throwable th6) {
                                            th2.addSuppressed(th6);
                                        }
                                    } else {
                                        bufferedInputStream.close();
                                    }
                                }
                                throw th5;
                            }
                        }
                    });
                } else {
                    if (booleanValue) {
                        processSession.rollback();
                        obtainPublisher.rollback();
                        if (obtainPublisher != null) {
                            if (0 == 0) {
                                obtainPublisher.close();
                                return;
                            }
                            try {
                                obtainPublisher.close();
                                return;
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                                return;
                            }
                        }
                        return;
                    }
                    processSession.transfer(flowFile);
                }
            }
            PublishResult complete = obtainPublisher.complete();
            if (complete.isFailure()) {
                getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
                processSession.transfer(list, REL_FAILURE);
                if (obtainPublisher != null) {
                    if (0 == 0) {
                        obtainPublisher.close();
                        return;
                    }
                    try {
                        obtainPublisher.close();
                        return;
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                        return;
                    }
                }
                return;
            }
            long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
            for (FlowFile flowFile2 : list) {
                String value4 = processContext.getProperty(TOPIC).evaluateAttributeExpressions(flowFile2).getValue();
                int successfulMessageCount = complete.getSuccessfulMessageCount(flowFile2);
                FlowFile putAttribute = processSession.putAttribute(flowFile2, MSG_COUNT, String.valueOf(successfulMessageCount));
                processSession.adjustCounter("Messages Sent", successfulMessageCount, true);
                processSession.getProvenanceReporter().send(putAttribute, KafkaProcessorUtils.buildTransitURI(value, value2, value4), "Sent " + successfulMessageCount + " messages", millis);
                processSession.transfer(putAttribute, REL_SUCCESS);
            }
            if (obtainPublisher != null) {
                if (0 == 0) {
                    obtainPublisher.close();
                    return;
                }
                try {
                    obtainPublisher.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            }
        } catch (Throwable th5) {
            if (obtainPublisher != null) {
                if (0 != 0) {
                    try {
                        obtainPublisher.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    obtainPublisher.close();
                }
            }
            throw th5;
        }
    }

    private byte[] getMessageKey(FlowFile flowFile, ProcessContext processContext) {
        if (processContext.getProperty(MESSAGE_DEMARCATOR).isSet()) {
            return null;
        }
        String value = processContext.getProperty(KEY).isSet() ? processContext.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue() : flowFile.getAttribute("kafka.key");
        if (value == null) {
            return null;
        }
        return UTF8_ENCODING.getValue().equals(processContext.getProperty(KEY_ATTRIBUTE_ENCODING).getValue()) ? value.getBytes(StandardCharsets.UTF_8) : DatatypeConverter.parseHexBinary(value);
    }

    static {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
        arrayList.add(TOPIC);
        arrayList.add(DELIVERY_GUARANTEE);
        arrayList.add(USE_TRANSACTIONS);
        arrayList.add(ATTRIBUTE_NAME_REGEX);
        arrayList.add(MESSAGE_HEADER_ENCODING);
        arrayList.add(KEY);
        arrayList.add(KEY_ATTRIBUTE_ENCODING);
        arrayList.add(MESSAGE_DEMARCATOR);
        arrayList.add(MAX_REQUEST_SIZE);
        arrayList.add(ACK_WAIT_TIME);
        arrayList.add(METADATA_WAIT_TIME);
        arrayList.add(PARTITION_CLASS);
        arrayList.add(COMPRESSION_CODEC);
        PROPERTIES = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        RELATIONSHIPS = Collections.unmodifiableSet(hashSet);
    }
}
