package org.apache.nifi.processors.kafka.pubsub;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.DatatypeConverter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils;

@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 0.9 Consumer API.  Please note there are cases where the publisher can get into an indefinite stuck state.  We are closely monitoring how this evolves in the Kafka community and will take advantage of those fixes as soon as we can.  In the mean time it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on.")
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", description = "These properties will be added on the Kafka configuration after loading any provided configuration properties. In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged. For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.9.x"})
@WritesAttributes({@WritesAttribute(attribute = "kafka.count", description = "The number of messages written if more than one"), @WritesAttribute(attribute = "kafka.key", description = "The key of message if present and if single message. How the key is encoded depends on the value of the 'Key Attribute Encoding' property."), @WritesAttribute(attribute = "kafka.offset", description = "The offset of the message in the partition of the topic."), @WritesAttribute(attribute = "kafka.partition", description = "The partition of the topic the message or message bundle is from"), @WritesAttribute(attribute = "kafka.topic", description = "The topic the message or message bundle is from")})
/* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.class */
public class ConsumeKafka extends AbstractProcessor {
    private static final long TWO_MB = 2097152;
    static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
    static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
    static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded", "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
    static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder().name("topic").displayName("Topic Name(s)").description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma seperated.").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(true).build();
    static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder().name("group.id").displayName("Group ID").description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(false).build();
    static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder().name("auto.offset.reset").displayName("Offset Reset").description("Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.").required(true).allowableValues(new AllowableValue[]{OFFSET_EARLIEST, OFFSET_LATEST, OFFSET_NONE}).defaultValue(OFFSET_LATEST.getValue()).build();
    static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder().name("key-attribute-encoding").displayName("Key Attribute Encoding").description("FlowFiles that are emitted have an attribute named 'kafka.key'. This property dictates how the value of the attribute should be encoded.").required(true).defaultValue(UTF8_ENCODING.getValue()).allowableValues(new AllowableValue[]{UTF8_ENCODING, HEX_ENCODING}).build();
    static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder().name("message-demarcator").displayName("Message Demarcator").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).description("Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains all Kafka messages in a single batch for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received will result in a single FlowFile which  time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS").build();
    static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder().name("max.poll.records").displayName("Max Poll Records").description("Specifies the maximum number of records Kafka should return in a single poll.").required(false).defaultValue("10000").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles received from Kafka.  Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.").build();
    static final List<PropertyDescriptor> DESCRIPTORS;
    static final Set<Relationship> RELATIONSHIPS;
    private volatile byte[] demarcatorBytes = null;
    private volatile ConsumerPool consumerPool = null;

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return DESCRIPTORS;
    }

    @OnScheduled
    public void prepareProcessing(ProcessContext processContext) {
        this.demarcatorBytes = processContext.getProperty(MESSAGE_DEMARCATOR).isSet() ? processContext.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8) : null;
    }

    @OnStopped
    public void close() {
        this.demarcatorBytes = null;
        ConsumerPool consumerPool = this.consumerPool;
        this.consumerPool = null;
        if (consumerPool != null) {
            consumerPool.close();
        }
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String str) {
        return new PropertyDescriptor.Builder().description("Specifies the value for '" + str + "' Kafka Configuration.").name(str).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class)).dynamic(true).build();
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        return KafkaProcessorUtils.validateCommonProperties(validationContext);
    }

    private synchronized ConsumerPool getConsumerPool(ProcessContext processContext) {
        ConsumerPool consumerPool = this.consumerPool;
        if (consumerPool != null) {
            return consumerPool;
        }
        HashMap hashMap = new HashMap();
        KafkaProcessorUtils.buildCommonKafkaProperties(processContext, ConsumerConfig.class, hashMap);
        String value = processContext.getProperty(TOPICS).evaluateAttributeExpressions().getValue();
        ArrayList arrayList = new ArrayList();
        for (String str : value.split(",", 100)) {
            String trim = str.trim();
            if (!trim.isEmpty()) {
                arrayList.add(trim);
            }
        }
        hashMap.put("enable.auto.commit", "false");
        hashMap.put("key.deserializer", ByteArrayDeserializer.class.getName());
        hashMap.put("value.deserializer", ByteArrayDeserializer.class.getName());
        ConsumerPool createConsumerPool = createConsumerPool(processContext.getMaxConcurrentTasks(), arrayList, hashMap, getLogger());
        this.consumerPool = createConsumerPool;
        return createConsumerPool;
    }

    protected ConsumerPool createConsumerPool(int i, List<String> list, Map<String, String> map, ComponentLog componentLog) {
        return new ConsumerPool(i, list, map, componentLog);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        long nanoTime = System.nanoTime();
        ConsumerPool consumerPool = getConsumerPool(processContext);
        if (consumerPool == null) {
            processContext.yield();
            return;
        }
        HashMap hashMap = new HashMap();
        ConsumerLease obtainConsumer = consumerPool.obtainConsumer();
        Throwable th = null;
        try {
            try {
            } catch (KafkaException e) {
                obtainConsumer.poison();
                getLogger().error("Problem while accessing kafka consumer " + e, e);
                processContext.yield();
                processSession.rollback();
            }
            if (obtainConsumer == null) {
                processContext.yield();
                if (obtainConsumer != null) {
                    if (0 == 0) {
                        obtainConsumer.close();
                        return;
                    }
                    try {
                        obtainConsumer.close();
                        return;
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                        return;
                    }
                }
                return;
            }
            if (!gatherDataFromKafka(obtainConsumer, hashMap, processContext)) {
                processSession.rollback();
                if (obtainConsumer != null) {
                    if (0 == 0) {
                        obtainConsumer.close();
                        return;
                    }
                    try {
                        obtainConsumer.close();
                        return;
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                        return;
                    }
                }
                return;
            }
            writeSessionData(processContext, processSession, hashMap, nanoTime);
            processSession.commit();
            commitOffsets(obtainConsumer, hashMap);
            if (obtainConsumer != null) {
                if (0 == 0) {
                    obtainConsumer.close();
                    return;
                }
                try {
                    obtainConsumer.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            }
        } catch (Throwable th5) {
            if (obtainConsumer != null) {
                if (0 != 0) {
                    try {
                        obtainConsumer.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    obtainConsumer.close();
                }
            }
            throw th5;
        }
    }

    private void commitOffsets(ConsumerLease consumerLease, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map) {
        HashMap hashMap = new HashMap();
        map.entrySet().stream().filter(entry -> {
            return !((List) entry.getValue()).isEmpty();
        }).forEach(entry2 -> {
            hashMap.put(entry2.getKey(), new OffsetAndMetadata(((List) entry2.getValue()).stream().mapToLong(consumerRecord -> {
                return consumerRecord.offset();
            }).max().getAsLong() + 1));
        });
        consumerLease.commitOffsets(hashMap);
    }

    private void writeSessionData(ProcessContext processContext, ProcessSession processSession, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map, long j) {
        if (this.demarcatorBytes != null) {
            map.entrySet().stream().filter(entry -> {
                return !((List) entry.getValue()).isEmpty();
            }).forEach(entry2 -> {
                writeData(processContext, processSession, (List) entry2.getValue(), j);
            });
        } else {
            map.entrySet().stream().filter(entry3 -> {
                return !((List) entry3.getValue()).isEmpty();
            }).flatMap(entry4 -> {
                return ((List) entry4.getValue()).stream();
            }).forEach(consumerRecord -> {
                writeData(processContext, processSession, Collections.singletonList(consumerRecord), j);
            });
        }
    }

    private String encodeKafkaKey(byte[] bArr, String str) {
        if (bArr == null) {
            return null;
        }
        if (HEX_ENCODING.getValue().equals(str)) {
            return DatatypeConverter.printHexBinary(bArr);
        }
        if (UTF8_ENCODING.getValue().equals(str)) {
            return new String(bArr, StandardCharsets.UTF_8);
        }
        return null;
    }

    private void writeData(ProcessContext processContext, ProcessSession processSession, List<ConsumerRecord<byte[], byte[]>> list, long j) {
        ConsumerRecord<byte[], byte[]> consumerRecord = list.get(0);
        String valueOf = String.valueOf(consumerRecord.offset());
        String encodeKafkaKey = encodeKafkaKey((byte[]) consumerRecord.key(), processContext.getProperty(KEY_ATTRIBUTE_ENCODING).getValue());
        String str = consumerRecord.topic();
        String valueOf2 = String.valueOf(consumerRecord.partition());
        FlowFile write = processSession.write(processSession.create(), outputStream -> {
            boolean z = false;
            Iterator it = list.iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord2 = (ConsumerRecord) it.next();
                if (z) {
                    outputStream.write(this.demarcatorBytes);
                }
                outputStream.write((byte[]) consumerRecord2.value());
                z = true;
            }
        });
        HashMap hashMap = new HashMap();
        hashMap.put("kafka.offset", valueOf);
        if (encodeKafkaKey != null && list.size() == 1) {
            hashMap.put("kafka.key", encodeKafkaKey);
        }
        hashMap.put("kafka.partition", valueOf2);
        hashMap.put("kafka.topic", str);
        if (list.size() > 1) {
            hashMap.put("kafka.count", String.valueOf(list.size()));
        }
        FlowFile putAllAttributes = processSession.putAllAttributes(write, hashMap);
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - j);
        processSession.getProvenanceReporter().receive(putAllAttributes, KafkaProcessorUtils.buildTransitURI(processContext.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(), processContext.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(), str), millis);
        getLogger().debug("Created {} containing {} messages from Kafka topic {}, partition {}, starting offset {} in {} millis", new Object[]{putAllAttributes, Integer.valueOf(list.size()), str, valueOf2, valueOf, Long.valueOf(millis)});
        processSession.transfer(putAllAttributes, REL_SUCCESS);
    }

    private boolean gatherDataFromKafka(ConsumerLease consumerLease, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map, ProcessContext processContext) {
        long nanoTime = System.nanoTime();
        boolean z = false;
        int intValue = processContext.getProperty(MAX_POLL_RECORDS).asInteger().intValue();
        do {
            ConsumerRecords<byte[], byte[]> poll = consumerLease.poll();
            for (TopicPartition topicPartition : poll.partitions()) {
                List<ConsumerRecord<byte[], byte[]>> list = map.get(topicPartition);
                if (list == null) {
                    list = new ArrayList();
                    map.put(topicPartition, list);
                }
                list.addAll(poll.records(topicPartition));
                if (list.size() > 0) {
                    z = true;
                }
            }
            if (poll.isEmpty()) {
                break;
            }
        } while (!checkIfGatheredEnoughData(map, intValue, nanoTime));
        return z;
    }

    private boolean checkIfGatheredEnoughData(Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map, long j, long j2) {
        if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - j2) > 500) {
            return true;
        }
        int i = 0;
        int i2 = 0;
        long j3 = 0;
        for (List<ConsumerRecord<byte[], byte[]>> list : map.values()) {
            if (!list.isEmpty()) {
                i++;
            }
            i2 += list.size();
            while (list.iterator().hasNext()) {
                j3 += ((byte[]) r0.next().value()).length;
            }
        }
        return (this.demarcatorBytes == null || this.demarcatorBytes.length <= 0) ? j3 > TWO_MB || ((long) i2) > j : i > 50;
    }

    static {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
        arrayList.add(TOPICS);
        arrayList.add(GROUP_ID);
        arrayList.add(AUTO_OFFSET_RESET);
        arrayList.add(KEY_ATTRIBUTE_ENCODING);
        arrayList.add(MESSAGE_DEMARCATOR);
        arrayList.add(MAX_POLL_RECORDS);
        DESCRIPTORS = Collections.unmodifiableList(arrayList);
        RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
    }
}
