package org.apache.nifi.processors.kafka.pubsub;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils;

@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 0.10.x Consumer API.  Please note there are cases where the publisher can get into an indefinite stuck state.  We are closely monitoring how this evolves in the Kafka community and will take advantage of those fixes as soon as we can.  In the meantime it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on. The complementary NiFi processor for sending messages is PublishKafka_0_10.")
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", description = "These properties will be added on the Kafka configuration after loading any provided configuration properties. In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged. For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.10.x"})
@WritesAttributes({@WritesAttribute(attribute = "kafka.count", description = "The number of messages written if more than one"), @WritesAttribute(attribute = "kafka.key", description = "The key of message if present and if single message. How the key is encoded depends on the value of the 'Key Attribute Encoding' property."), @WritesAttribute(attribute = "kafka.offset", description = "The offset of the message in the partition of the topic."), @WritesAttribute(attribute = "kafka.partition", description = "The partition of the topic the message or message bundle is from"), @WritesAttribute(attribute = "kafka.topic", description = "The topic the message or message bundle is from")})
/* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.class */
public class ConsumeKafka_0_10 extends AbstractProcessor {
    static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
    static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
    static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
    static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder().name("topic").displayName("Topic Name(s)").description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(true).build();
    static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder().name("group.id").displayName("Group ID").description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(false).build();
    static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder().name("auto.offset.reset").displayName("Offset Reset").description("Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.").required(true).allowableValues(new AllowableValue[]{OFFSET_EARLIEST, OFFSET_LATEST, OFFSET_NONE}).defaultValue(OFFSET_LATEST.getValue()).build();
    static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder().name("key-attribute-encoding").displayName("Key Attribute Encoding").description("FlowFiles that are emitted have an attribute named 'kafka.key'. This property dictates how the value of the attribute should be encoded.").required(true).defaultValue(KafkaProcessorUtils.UTF8_ENCODING.getValue()).allowableValues(new AllowableValue[]{KafkaProcessorUtils.UTF8_ENCODING, KafkaProcessorUtils.HEX_ENCODING}).build();
    static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder().name("message-demarcator").displayName("Message Demarcator").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).description("Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains all Kafka messages in a single batch for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received will result in a single FlowFile which  time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS").build();
    static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder().name("max.poll.records").displayName("Max Poll Records").description("Specifies the maximum number of records Kafka should return in a single poll.").required(false).defaultValue("10000").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new PropertyDescriptor.Builder().name("max-uncommit-offset-wait").displayName("Max Uncommitted Time").description("Specifies the maximum amount of time allowed to pass before offsets must be committed. This value impacts how often offsets will be committed.  Committing offsets less often increases throughput but also increases the window of potential data duplication in the event of a rebalance or JVM restart between commits.  This value is also related to maximum poll records and the use of a message demarcator.  When using a message demarcator we can have far more uncommitted messages than when we're not as there is much less for us to keep track of in memory.").required(false).defaultValue("1 secs").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles received from Kafka.  Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.").build();
    static final List<PropertyDescriptor> DESCRIPTORS;
    static final Set<Relationship> RELATIONSHIPS;
    private volatile ConsumerPool consumerPool = null;
    private final Set<ConsumerLease> activeLeases = Collections.synchronizedSet(new HashSet());

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return DESCRIPTORS;
    }

    @OnStopped
    public void close() {
        ConsumerPool consumerPool = this.consumerPool;
        this.consumerPool = null;
        if (consumerPool != null) {
            consumerPool.close();
        }
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String str) {
        return new PropertyDescriptor.Builder().description("Specifies the value for '" + str + "' Kafka Configuration.").name(str).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class)).dynamic(true).build();
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        return KafkaProcessorUtils.validateCommonProperties(validationContext);
    }

    private synchronized ConsumerPool getConsumerPool(ProcessContext processContext) {
        ConsumerPool consumerPool = this.consumerPool;
        if (consumerPool != null) {
            return consumerPool;
        }
        ConsumerPool createConsumerPool = createConsumerPool(processContext, getLogger());
        this.consumerPool = createConsumerPool;
        return createConsumerPool;
    }

    protected ConsumerPool createConsumerPool(ProcessContext processContext, ComponentLog componentLog) {
        int maxConcurrentTasks = processContext.getMaxConcurrentTasks();
        long longValue = processContext.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
        byte[] bytes = processContext.getProperty(MESSAGE_DEMARCATOR).isSet() ? processContext.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8) : null;
        HashMap hashMap = new HashMap();
        KafkaProcessorUtils.buildCommonKafkaProperties(processContext, ConsumerConfig.class, hashMap);
        hashMap.put("enable.auto.commit", "false");
        hashMap.put("key.deserializer", ByteArrayDeserializer.class.getName());
        hashMap.put("value.deserializer", ByteArrayDeserializer.class.getName());
        String value = processContext.getProperty(TOPICS).evaluateAttributeExpressions().getValue();
        ArrayList arrayList = new ArrayList();
        for (String str : value.split(",", 100)) {
            String trim = str.trim();
            if (!trim.isEmpty()) {
                arrayList.add(trim);
            }
        }
        return new ConsumerPool(maxConcurrentTasks, bytes, hashMap, arrayList, longValue, processContext.getProperty(KEY_ATTRIBUTE_ENCODING).getValue(), processContext.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(), processContext.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(), componentLog);
    }

    @OnUnscheduled
    public void interruptActiveThreads() {
        long nanos = TimeUnit.SECONDS.toNanos(5L);
        long nanoTime = System.nanoTime();
        while (System.nanoTime() - nanoTime < nanos && !this.activeLeases.isEmpty()) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }
        if (!this.activeLeases.isEmpty()) {
            int i = 0;
            for (ConsumerLease consumerLease : this.activeLeases) {
                getLogger().info("Consumer {} has not finished after waiting 30 seconds; will attempt to wake-up the lease", new Object[]{consumerLease});
                consumerLease.wakeup();
                i++;
            }
            getLogger().info("Woke up {} consumers", new Object[]{Integer.valueOf(i)});
        }
        this.activeLeases.clear();
    }

    /* JADX WARN: Finally extract failed */
    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        ConsumerPool consumerPool = getConsumerPool(processContext);
        if (consumerPool == null) {
            processContext.yield();
            return;
        }
        ConsumerLease obtainConsumer = consumerPool.obtainConsumer(processSession);
        Throwable th = null;
        try {
            if (obtainConsumer == null) {
                processContext.yield();
                if (obtainConsumer != null) {
                    if (0 == 0) {
                        obtainConsumer.close();
                        return;
                    }
                    try {
                        obtainConsumer.close();
                        return;
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                        return;
                    }
                }
                return;
            }
            this.activeLeases.add(obtainConsumer);
            while (isScheduled() && obtainConsumer.continuePolling()) {
                try {
                    try {
                        obtainConsumer.poll();
                    } catch (Throwable th3) {
                        this.activeLeases.remove(obtainConsumer);
                        throw th3;
                    }
                } catch (KafkaException e) {
                    getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}", new Object[]{obtainConsumer, e}, e);
                    this.activeLeases.remove(obtainConsumer);
                } catch (WakeupException e2) {
                    getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. Will roll back session and discard any partially received data.", new Object[]{obtainConsumer});
                    this.activeLeases.remove(obtainConsumer);
                } catch (Throwable th4) {
                    getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}", new Object[]{obtainConsumer, th4}, th4);
                    this.activeLeases.remove(obtainConsumer);
                }
            }
            if (isScheduled() && !obtainConsumer.commit()) {
                processContext.yield();
            }
            this.activeLeases.remove(obtainConsumer);
            if (obtainConsumer != null) {
                if (0 == 0) {
                    obtainConsumer.close();
                    return;
                }
                try {
                    obtainConsumer.close();
                } catch (Throwable th5) {
                    th.addSuppressed(th5);
                }
            }
        } catch (Throwable th6) {
            if (obtainConsumer != null) {
                if (0 != 0) {
                    try {
                        obtainConsumer.close();
                    } catch (Throwable th7) {
                        th.addSuppressed(th7);
                    }
                } else {
                    obtainConsumer.close();
                }
            }
            throw th6;
        }
    }

    static {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
        arrayList.add(TOPICS);
        arrayList.add(GROUP_ID);
        arrayList.add(AUTO_OFFSET_RESET);
        arrayList.add(KEY_ATTRIBUTE_ENCODING);
        arrayList.add(MESSAGE_DEMARCATOR);
        arrayList.add(MAX_POLL_RECORDS);
        arrayList.add(MAX_UNCOMMITTED_TIME);
        DESCRIPTORS = Collections.unmodifiableList(arrayList);
        RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
    }
}
