package org.apache.nifi.processors.kafka.pubsub;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.kafka.shared.attribute.StandardTransitUriProvider;
import org.apache.nifi.kafka.shared.component.KafkaPublishComponent;
import org.apache.nifi.kafka.shared.property.FailureStrategy;
import org.apache.nifi.kafka.shared.property.PublishStrategy;
import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
import org.apache.nifi.kafka.shared.validation.KafkaClientCustomValidationFunction;
import org.apache.nifi.kafka.shared.validation.KafkaDeprecationValidator;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.kafka.pubsub.Partitioners;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;

@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 2.6 Producer API. The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. The complementary NiFi processor for fetching messages is ConsumeKafkaRecord_2_6.")
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", description = "These properties will be added on the Kafka configuration after loading any provided configuration properties. In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged. For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ", expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
@WritesAttribute(attribute = PublishKafkaRecord_2_6.MSG_COUNT, description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to FlowFiles that are routed to success.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "2.6"})
@SeeAlso({PublishKafka_2_6.class, ConsumeKafka_2_6.class, ConsumeKafkaRecord_2_6.class})
/* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.class */
public class PublishKafkaRecord_2_6 extends AbstractProcessor implements KafkaPublishComponent, VerifiableProcessor {
    protected static final String MSG_COUNT = "msg.count";
    static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", "FlowFile will be routed to failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration");
    static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes");
    static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after successfully sending the content to a Kafka node, without waiting for any acknowledgment from the node at all. This provides the best performance but may result in data loss.");
    static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(Partitioners.RoundRobinPartitioner.class.getName(), Partitioners.RoundRobinPartitioner.class.getSimpleName(), "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, the next Partition to Partition 2, and so on, wrapping as necessary.");
    static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner", "DefaultPartitioner", "The default partitioning strategy will choose the sticky partition that changes when the batch is full (See KIP-480 for details about sticky partitioning).");
    static final AllowableValue RECORD_PATH_PARTITIONING = new AllowableValue(Partitioners.RecordPathPartitioner.class.getName(), "RecordPath Partitioner", "Interprets the <Partition> property as a RecordPath that will be evaluated against each Record to determine which partition the Record will go to. All Records that have the same value for the given RecordPath will go to the same Partition.");
    static final AllowableValue EXPRESSION_LANGUAGE_PARTITIONING = new AllowableValue(Partitioners.ExpressionLanguagePartitioner.class.getName(), "Expression Language Partitioner", "Interprets the <Partition> property as Expression Language that will be evaluated against each FlowFile. This Expression will be evaluated once against the FlowFile, so all Records in a given FlowFile will go to the same partition.");
    static final AllowableValue RECORD_METADATA_FROM_RECORD = new AllowableValue("Metadata From Record", "Metadata From Record", "The Kafka Record's Topic and Partition will be determined by looking at the /metadata/topic and /metadata/partition fields of the Record, respectively. If these fields are invalid or not present, the Topic Name and Partition/Partitioner class properties of the processor will be considered.");
    static final AllowableValue RECORD_METADATA_FROM_PROPERTIES = new AllowableValue("Use Configured Values", "Use Configured Values", "The Kafka Record's Topic will be determined using the 'Topic Name' processor property. The partition will be determined using the 'Partition' and 'Partitioner class' properties.");
    static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder().name("topic").displayName("Topic Name").description("The name of the Kafka Topic to publish to.").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("The Record Reader to use for incoming FlowFiles").identifiesControllerService(RecordReaderFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("The Record Writer to use in order to serialize the data before sending to Kafka").identifiesControllerService(RecordSetWriterFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor PUBLISH_STRATEGY = new PropertyDescriptor.Builder().name("publish-strategy").displayName("Publish Strategy").description("The format used to publish the incoming FlowFile record to Kafka.").required(true).defaultValue(PublishStrategy.USE_VALUE.getValue()).allowableValues(PublishStrategy.class).build();
    static final PropertyDescriptor MESSAGE_KEY_FIELD = new PropertyDescriptor.Builder().name("message-key-field").displayName("Message Key Field").description("The name of a field in the Input Records that should be used as the Key for the Kafka message.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.getValue(), new String[0]).required(false).build();
    static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder().name("acks").displayName("Delivery Guarantee").description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new AllowableValue[]{DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED}).defaultValue(DELIVERY_REPLICATED.getValue()).build();
    static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder().name("max.block.ms").displayName("Max Metadata Wait Time").description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the entire 'send' call. Corresponds to Kafka's 'max.block.ms' property").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("5 sec").build();
    static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder().name("ack.wait.time").displayName("Acknowledgment Wait Time").description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).defaultValue("5 secs").build();
    static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder().name("max.request.size").displayName("Max Request Size").description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).").required(true).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").build();
    static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder().name("partitioner.class").displayName("Partitioner class").description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.").allowableValues(new AllowableValue[]{ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, RECORD_PATH_PARTITIONING, EXPRESSION_LANGUAGE_PARTITIONING}).defaultValue(RANDOM_PARTITIONING.getValue()).required(false).build();
    static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder().name("partition").displayName("Partition").description("Specifies which Partition Records will go to. How this value is interpreted is dictated by the <Partitioner class> property.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder().name("compression.type").displayName("Compression Type").description("This parameter allows you to specify the compression codec for all data generated by this producer.").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).allowableValues(new String[]{"none", "gzip", "snappy", "lz4"}).defaultValue("none").build();
    static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder().name("attribute-name-regex").displayName("Attributes to Send as Headers (Regex)").description("A Regular Expression that is matched against all FlowFile attribute names. Any attribute whose name matches the regex will be added to the Kafka messages as a Header. If not specified, no FlowFile attributes will be added as headers.").addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.getValue(), new String[0]).required(false).build();
    static final PropertyDescriptor USE_TRANSACTIONS = new PropertyDescriptor.Builder().name("use-transactions").displayName("Use Transactions").description("Specifies whether or not NiFi should provide Transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true requires that the <Delivery Guarantee> property be set to \"Guarantee Replicated Delivery.\"").expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new String[]{"true", "false"}).defaultValue("true").required(true).build();
    static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new PropertyDescriptor.Builder().name("transactional-id-prefix").displayName("Transactional Id Prefix").description("When Use Transaction is set to true, KafkaProducer config 'transactional.id' will be a generated UUID and will be prefixed with this string.").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).dependsOn(USE_TRANSACTIONS, "true", new String[0]).required(false).build();
    static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new PropertyDescriptor.Builder().name("message-header-encoding").displayName("Message Header Encoding").description("For any attribute that is added as a message header, as configured via the <Attributes to Send as Headers> property, this property indicates the Character Encoding to use for serializing the headers.").addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).defaultValue("UTF-8").required(false).build();
    static final PropertyDescriptor RECORD_KEY_WRITER = new PropertyDescriptor.Builder().name("record-key-writer").displayName("Record Key Writer").description("The Record Key Writer to use for outgoing FlowFiles").identifiesControllerService(RecordSetWriterFactory.class).dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.getValue(), new String[0]).build();
    static final PropertyDescriptor RECORD_METADATA_STRATEGY = new PropertyDescriptor.Builder().name("Record Metadata Strategy").displayName("Record Metadata Strategy").description("Specifies whether the Record's metadata (topic and partition) should come from the Record's metadata field or if it should come from the configured Topic Name and Partition / Partitioner class properties").required(true).allowableValues(new AllowableValue[]{RECORD_METADATA_FROM_PROPERTIES, RECORD_METADATA_FROM_RECORD}).defaultValue(RECORD_METADATA_FROM_PROPERTIES.getValue()).dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.getValue(), new String[0]).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles for which all content was sent to Kafka.").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship").build();
    private static final List<PropertyDescriptor> PROPERTIES;
    private static final Set<Relationship> RELATIONSHIPS;
    private volatile PublisherPool publisherPool = null;
    private final RecordPathCache recordPathCache = new RecordPathCache(25);

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String str) {
        return new PropertyDescriptor.Builder().description("Specifies the value for '" + str + "' Kafka Configuration.").name(str).addValidator(new DynamicPropertyValidator(ProducerConfig.class)).dynamic(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ValidationResult validate;
        KafkaDeprecationValidator.validate(getClass(), getIdentifier(), validationContext);
        Collection<ValidationResult> apply = new KafkaClientCustomValidationFunction().apply(validationContext);
        if (validationContext.getProperty(USE_TRANSACTIONS).asBoolean().booleanValue()) {
            if (!DELIVERY_REPLICATED.getValue().equals(validationContext.getProperty(DELIVERY_GUARANTEE).getValue())) {
                apply.add(new ValidationResult.Builder().subject("Delivery Guarantee").valid(false).explanation("In order to use Transactions, the Delivery Guarantee must be \"Guarantee Replicated Delivery.\" Either change the <Use Transactions> property or the <Delivery Guarantee> property.").build());
            }
        }
        String value = validationContext.getProperty(PARTITION_CLASS).getValue();
        if (RECORD_PATH_PARTITIONING.getValue().equals(value)) {
            String value2 = validationContext.getProperty(PARTITION).getValue();
            if (value2 == null) {
                apply.add(new ValidationResult.Builder().subject("Partition").valid(false).explanation("The <Partition> property must be specified if using the RecordPath Partitioning class").build());
            } else if (!validationContext.isExpressionLanguagePresent(value2) && (validate = new RecordPathValidator().validate(PARTITION.getDisplayName(), value2, validationContext)) != null) {
                apply.add(validate);
            }
        } else if (EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(value) && validationContext.getProperty(PARTITION).getValue() == null) {
            apply.add(new ValidationResult.Builder().subject("Partition").valid(false).explanation("The <Partition> property must be specified if using the Expression Language Partitioning class").build());
        }
        return apply;
    }

    private synchronized PublisherPool getPublisherPool(ProcessContext processContext) {
        PublisherPool publisherPool = this.publisherPool;
        if (publisherPool != null) {
            return publisherPool;
        }
        PublisherPool createPublisherPool = createPublisherPool(processContext);
        this.publisherPool = createPublisherPool;
        return createPublisherPool;
    }

    protected PublisherPool createPublisherPool(ProcessContext processContext) {
        int intValue = processContext.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
        long longValue = processContext.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
        String value = processContext.getProperty(ATTRIBUTE_NAME_REGEX).getValue();
        Pattern compile = value == null ? null : Pattern.compile(value);
        boolean booleanValue = processContext.getProperty(USE_TRANSACTIONS).asBoolean().booleanValue();
        TransactionIdSupplier transactionIdSupplier = new TransactionIdSupplier(processContext.getProperty(TRANSACTIONAL_ID_PREFIX).evaluateAttributeExpressions().getValue());
        PublishStrategy valueOf = PublishStrategy.valueOf(processContext.getProperty(PUBLISH_STRATEGY).getValue());
        Charset forName = Charset.forName(processContext.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue());
        RecordSetWriterFactory asControllerService = processContext.getProperty(RECORD_KEY_WRITER).asControllerService(RecordSetWriterFactory.class);
        Map properties = new StandardKafkaPropertyProvider(ProducerConfig.class).getProperties(processContext);
        properties.put("key.serializer", ByteArraySerializer.class.getName());
        properties.put("value.serializer", ByteArraySerializer.class.getName());
        properties.put("max.request.size", String.valueOf(intValue));
        return new PublisherPool(properties, getLogger(), intValue, longValue, booleanValue, transactionIdSupplier, compile, forName, valueOf, asControllerService);
    }

    @OnStopped
    public void closePool() {
        if (this.publisherPool != null) {
            this.publisherPool.close();
        }
        this.publisherPool = null;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        List<FlowFile> pollFlowFiles = PublishKafkaUtil.pollFlowFiles(processSession);
        if (pollFlowFiles.isEmpty()) {
            return;
        }
        PublisherPool publisherPool = getPublisherPool(processContext);
        if (publisherPool == null) {
            processContext.yield();
            return;
        }
        String value = processContext.getProperty(SECURITY_PROTOCOL).getValue();
        String value2 = processContext.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
        RecordSetWriterFactory asControllerService = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        RecordReaderFactory asControllerService2 = processContext.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        boolean booleanValue = processContext.getProperty(USE_TRANSACTIONS).asBoolean().booleanValue();
        PublishFailureStrategy failureStrategy = getFailureStrategy(processContext);
        PublishMetadataStrategy publishMetadataStrategy = RECORD_METADATA_FROM_RECORD.getValue().equalsIgnoreCase(processContext.getProperty(RECORD_METADATA_STRATEGY).getValue()) ? PublishMetadataStrategy.USE_RECORD_METADATA : PublishMetadataStrategy.USE_CONFIGURED_VALUES;
        long nanoTime = System.nanoTime();
        PublisherLease obtainPublisher = obtainPublisher(processContext, publisherPool);
        Throwable th = null;
        try {
            if (booleanValue) {
                try {
                    obtainPublisher.beginTransaction();
                } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
                    obtainPublisher.poison();
                    getLogger().error("Failed to send messages to Kafka; will yield Processor and transfer FlowFiles to specified failure strategy");
                    failureStrategy.routeFlowFiles(processSession, pollFlowFiles);
                    processContext.yield();
                }
            }
            Iterator<FlowFile> it = pollFlowFiles.iterator();
            while (it.hasNext()) {
                FlowFile next = it.next();
                if (isScheduled()) {
                    String value3 = processContext.getProperty(TOPIC).evaluateAttributeExpressions(next).getValue();
                    String value4 = processContext.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(next).getValue();
                    Function<Record, Integer> partitioner = getPartitioner(processContext, next);
                    try {
                        PublishMetadataStrategy publishMetadataStrategy2 = publishMetadataStrategy;
                        processSession.read(next, inputStream -> {
                            try {
                                RecordSet createRecordSet = asControllerService2.createRecordReader(next, inputStream, getLogger()).createRecordSet();
                                obtainPublisher.publish(next, createRecordSet, asControllerService, asControllerService.getSchema(next.getAttributes(), createRecordSet.getSchema()), value4, value3, partitioner, publishMetadataStrategy2);
                            } catch (SchemaNotFoundException | MalformedRecordException e2) {
                                throw new ProcessException(e2);
                            }
                        });
                        if (booleanValue && "false".equals(next.getAttribute("kafka.consumer.offsets.committed"))) {
                            PublishKafkaUtil.addConsumerOffsets(obtainPublisher, next, getLogger());
                        }
                    } catch (Exception e2) {
                        obtainPublisher.fail(next, e2);
                    }
                } else {
                    if (booleanValue) {
                        processSession.rollback();
                        obtainPublisher.rollback();
                        if (obtainPublisher != null) {
                            if (0 == 0) {
                                obtainPublisher.close();
                                return;
                            }
                            try {
                                obtainPublisher.close();
                                return;
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                                return;
                            }
                        }
                        return;
                    }
                    processSession.transfer(next);
                    it.remove();
                }
            }
            PublishResult complete = obtainPublisher.complete();
            if (complete.isFailure()) {
                getLogger().info("Failed to send FlowFile to kafka; transferring to specified failure strategy");
                failureStrategy.routeFlowFiles(processSession, pollFlowFiles);
                if (obtainPublisher != null) {
                    if (0 == 0) {
                        obtainPublisher.close();
                        return;
                    }
                    try {
                        obtainPublisher.close();
                        return;
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                        return;
                    }
                }
                return;
            }
            long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
            for (FlowFile flowFile : pollFlowFiles) {
                String value5 = processContext.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
                int successfulMessageCount = complete.getSuccessfulMessageCount(flowFile);
                FlowFile putAttribute = processSession.putAttribute(flowFile, MSG_COUNT, String.valueOf(successfulMessageCount));
                processSession.adjustCounter("Messages Sent", successfulMessageCount, true);
                processSession.getProvenanceReporter().send(putAttribute, StandardTransitUriProvider.getTransitUri(value, value2, value5), "Sent " + successfulMessageCount + " messages", millis);
                processSession.transfer(putAttribute, REL_SUCCESS);
            }
            if (obtainPublisher != null) {
                if (0 == 0) {
                    obtainPublisher.close();
                    return;
                }
                try {
                    obtainPublisher.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            }
        } catch (Throwable th5) {
            if (obtainPublisher != null) {
                if (0 != 0) {
                    try {
                        obtainPublisher.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    obtainPublisher.close();
                }
            }
            throw th5;
        }
    }

    private PublisherLease obtainPublisher(ProcessContext processContext, PublisherPool publisherPool) {
        try {
            return publisherPool.obtainPublisher();
        } catch (KafkaException e) {
            getLogger().error("Failed to obtain Kafka Producer", e);
            processContext.yield();
            throw e;
        }
    }

    private Function<Record, Integer> getPartitioner(ProcessContext processContext, FlowFile flowFile) {
        String value = processContext.getProperty(PARTITION_CLASS).getValue();
        if (RECORD_PATH_PARTITIONING.getValue().equals(value)) {
            RecordPath compiled = this.recordPathCache.getCompiled(processContext.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue());
            return record -> {
                return evaluateRecordPath(compiled, record);
            };
        }
        if (!EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(value)) {
            return null;
        }
        int hashCode = Objects.hashCode(processContext.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue());
        return record2 -> {
            return Integer.valueOf(hashCode);
        };
    }

    private Integer evaluateRecordPath(RecordPath recordPath, Record record) {
        RecordPathResult evaluate = recordPath.evaluate(record);
        LongAccumulator longAccumulator = new LongAccumulator(Long::sum, 0L);
        evaluate.getSelectedFields().forEach(fieldValue -> {
            longAccumulator.accumulate(Objects.hashCode(fieldValue.getValue()));
        });
        return Integer.valueOf(longAccumulator.intValue());
    }

    private PublishFailureStrategy getFailureStrategy(ProcessContext processContext) {
        return FailureStrategy.ROLLBACK.getValue().equals(processContext.getProperty(FAILURE_STRATEGY).getValue()) ? (processSession, list) -> {
            processSession.rollback();
        } : (processSession2, list2) -> {
            processSession2.transfer(list2, REL_FAILURE);
        };
    }

    public List<ConfigVerificationResult> verify(ProcessContext processContext, ComponentLog componentLog, Map<String, String> map) {
        String value = processContext.getProperty(TOPIC).evaluateAttributeExpressions(map).getValue();
        PublisherPool createPublisherPool = createPublisherPool(processContext);
        Throwable th = null;
        try {
            try {
                List<ConfigVerificationResult> verifyConfiguration = createPublisherPool.verifyConfiguration(value);
                if (createPublisherPool != null) {
                    if (0 != 0) {
                        try {
                            createPublisherPool.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createPublisherPool.close();
                    }
                }
                return verifyConfiguration;
            } finally {
            }
        } catch (Throwable th3) {
            if (createPublisherPool != null) {
                if (th != null) {
                    try {
                        createPublisherPool.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createPublisherPool.close();
                }
            }
            throw th3;
        }
    }

    static {
        ArrayList arrayList = new ArrayList();
        arrayList.add(BOOTSTRAP_SERVERS);
        arrayList.add(TOPIC);
        arrayList.add(RECORD_READER);
        arrayList.add(RECORD_WRITER);
        arrayList.add(USE_TRANSACTIONS);
        arrayList.add(TRANSACTIONAL_ID_PREFIX);
        arrayList.add(FAILURE_STRATEGY);
        arrayList.add(DELIVERY_GUARANTEE);
        arrayList.add(PUBLISH_STRATEGY);
        arrayList.add(RECORD_KEY_WRITER);
        arrayList.add(RECORD_METADATA_STRATEGY);
        arrayList.add(ATTRIBUTE_NAME_REGEX);
        arrayList.add(MESSAGE_HEADER_ENCODING);
        arrayList.add(SECURITY_PROTOCOL);
        arrayList.add(SASL_MECHANISM);
        arrayList.add(KERBEROS_CREDENTIALS_SERVICE);
        arrayList.add(SELF_CONTAINED_KERBEROS_USER_SERVICE);
        arrayList.add(KERBEROS_SERVICE_NAME);
        arrayList.add(KERBEROS_PRINCIPAL);
        arrayList.add(KERBEROS_KEYTAB);
        arrayList.add(SASL_USERNAME);
        arrayList.add(SASL_PASSWORD);
        arrayList.add(TOKEN_AUTHENTICATION);
        arrayList.add(AWS_PROFILE_NAME);
        arrayList.add(SSL_CONTEXT_SERVICE);
        arrayList.add(MESSAGE_KEY_FIELD);
        arrayList.add(MAX_REQUEST_SIZE);
        arrayList.add(ACK_WAIT_TIME);
        arrayList.add(METADATA_WAIT_TIME);
        arrayList.add(PARTITION_CLASS);
        arrayList.add(PARTITION);
        arrayList.add(COMPRESSION_CODEC);
        PROPERTIES = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        RELATIONSHIPS = Collections.unmodifiableSet(hashSet);
    }
}
