package org.apache.nifi.processors.kafka;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

@CapabilityDescription("Fetches messages from Apache Kafka")
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", description = "These properties will be added on the Kafka configuration after loading any provided configuration properties. In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be overriden with warning message describing the override. For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@SupportsBatching
@Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"})
@WritesAttributes({@WritesAttribute(attribute = "kafka.topic", description = "The name of the Kafka Topic from which the message was received"), @WritesAttribute(attribute = "kafka.key", description = "The key of the Kafka message, if it exists and batch size is 1. If the message does not have a key, or if the batch size is greater than 1, this attribute will not be added"), @WritesAttribute(attribute = "kafka.partition", description = "The partition of the Kafka Topic from which the message was received. This attribute is added only if the batch size is 1"), @WritesAttribute(attribute = "kafka.offset", description = "The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1")})
/* loaded from: input_file:org/apache/nifi/processors/kafka/GetKafka.class */
public class GetKafka extends AbstractProcessor {
    private volatile ConsumerConnector consumer;
    private volatile long deadlockTimeout;
    private volatile ExecutorService executor;
    public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder().name("ZooKeeper Connection String").description("The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of <host>:<port> combinations. For example, host1:2181,host2:2181,host3:2188").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(false).build();
    public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder().name("Topic Name").description("The Kafka Topic to pull messages from").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(false).build();
    public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new PropertyDescriptor.Builder().name("Zookeeper Commit Frequency").description("Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. A longer time period will result in better overall performance but can result in more data duplication if a NiFi node is lost").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(false).defaultValue("60 secs").build();
    public static final PropertyDescriptor ZOOKEEPER_TIMEOUT = new PropertyDescriptor.Builder().name("ZooKeeper Communications Timeout").description("The amount of time to wait for a response from ZooKeeper before determining that there is a communications error").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(false).defaultValue("30 secs").build();
    public static final PropertyDescriptor KAFKA_TIMEOUT = new PropertyDescriptor.Builder().name("Kafka Communications Timeout").description("The amount of time to wait for a response from Kafka before determining that there is a communications error").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(false).defaultValue("30 secs").build();
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").description("Specifies the maximum number of messages to combine into a single FlowFile. These messages will be concatenated together with the <Message Demarcator> string placed between the content of each message. If the messages from Kafka should not be concatenated together, leave this value at 1.").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(false).defaultValue("1").build();
    public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder().name("Message Demarcator").description("Specifies the characters to use in order to demarcate multiple messages from Kafka. If the <Batch Size> property is set to 1, this value is ignored. Otherwise, for each two subsequent messages in the batch, this value will be placed in between them.").required(true).addValidator(Validator.VALID).expressionLanguageSupported(false).defaultValue("\\n").build();
    public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder().name("Client Name").description("Client Name to use when communicating with Kafka").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(false).build();
    public static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder().name("Group ID").description("A Group ID is used to identify consumers that are within the same consumer group").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(false).build();
    public static final String SMALLEST = "smallest";
    public static final String LARGEST = "largest";
    public static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder().name("Auto Offset Reset").description("Automatically reset the offset to the smallest or largest offset available on the broker").required(true).allowableValues(new String[]{SMALLEST, LARGEST}).defaultValue(LARGEST).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are created are routed to this relationship").build();
    private final BlockingQueue<ConsumerIterator<byte[], byte[]>> streamIterators = new LinkedBlockingQueue();
    private final AtomicBoolean consumerStreamsReady = new AtomicBoolean();

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        PropertyDescriptor build = new PropertyDescriptor.Builder().fromPropertyDescriptor(CLIENT_NAME).defaultValue("NiFi-" + getIdentifier()).build();
        PropertyDescriptor build2 = new PropertyDescriptor.Builder().fromPropertyDescriptor(GROUP_ID).defaultValue(getIdentifier()).build();
        ArrayList arrayList = new ArrayList();
        arrayList.add(ZOOKEEPER_CONNECTION_STRING);
        arrayList.add(TOPIC);
        arrayList.add(ZOOKEEPER_COMMIT_DELAY);
        arrayList.add(BATCH_SIZE);
        arrayList.add(MESSAGE_DEMARCATOR);
        arrayList.add(build);
        arrayList.add(build2);
        arrayList.add(KAFKA_TIMEOUT);
        arrayList.add(ZOOKEEPER_TIMEOUT);
        arrayList.add(AUTO_OFFSET_RESET);
        return arrayList;
    }

    public Set<Relationship> getRelationships() {
        HashSet hashSet = new HashSet(1);
        hashSet.add(REL_SUCCESS);
        return hashSet;
    }

    public void createConsumers(ProcessContext processContext) {
        String value = processContext.getProperty(TOPIC).getValue();
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", processContext.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue());
        properties.setProperty("group.id", processContext.getProperty(GROUP_ID).getValue());
        properties.setProperty("client.id", processContext.getProperty(CLIENT_NAME).getValue());
        properties.setProperty("auto.commit.interval.ms", String.valueOf(processContext.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
        properties.setProperty("auto.offset.reset", processContext.getProperty(AUTO_OFFSET_RESET).getValue());
        properties.setProperty("zookeeper.connection.timeout.ms", processContext.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
        properties.setProperty("socket.timeout.ms", processContext.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
        for (Map.Entry entry : processContext.getProperties().entrySet()) {
            PropertyDescriptor propertyDescriptor = (PropertyDescriptor) entry.getKey();
            if (propertyDescriptor.isDynamic()) {
                if (properties.containsKey(propertyDescriptor.getName())) {
                    getLogger().warn("Overriding existing property '" + propertyDescriptor.getName() + "' which had value of '" + properties.getProperty(propertyDescriptor.getName()) + "' with dynamically set value '" + ((String) entry.getValue()) + "'.");
                }
                properties.setProperty(propertyDescriptor.getName(), (String) entry.getValue());
            }
        }
        if (!properties.containsKey("consumer.timeout.ms")) {
            getLogger().info("Setting 'consumer.timeout.ms' to 1 milliseconds to avoid consumer block in the event when no events are present in Kafka topic. If you wish to change this value  set it as dynamic property. If you wish to explicitly enable consumer block (at your own risk) set its value to -1.");
            properties.setProperty("consumer.timeout.ms", "1");
        }
        int retrievePartitionCountForTopic = KafkaUtils.retrievePartitionCountForTopic(processContext.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), processContext.getProperty(TOPIC).getValue());
        this.consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
        HashMap hashMap = new HashMap(1);
        int maxConcurrentTasks = processContext.getMaxConcurrentTasks();
        if (processContext.getMaxConcurrentTasks() < retrievePartitionCountForTopic) {
            getLogger().warn("The amount of concurrent tasks '" + processContext.getMaxConcurrentTasks() + "' configured for this processor is less than the amount of partitions '" + retrievePartitionCountForTopic + "' for topic '" + processContext.getProperty(TOPIC).getValue() + "'. Consider making it equal to the amount of partition count for most efficient event consumption.");
        } else if (processContext.getMaxConcurrentTasks() > retrievePartitionCountForTopic) {
            maxConcurrentTasks = retrievePartitionCountForTopic;
            getLogger().warn("The amount of concurrent tasks '" + processContext.getMaxConcurrentTasks() + "' configured for this processor is greater than the amount of partitions '" + retrievePartitionCountForTopic + "' for topic '" + processContext.getProperty(TOPIC).getValue() + "'. Therefore those tasks would never see a message. To avoid that the '" + retrievePartitionCountForTopic + "'(partition count) will be used to consume events");
        }
        hashMap.put(value, Integer.valueOf(maxConcurrentTasks));
        List list = (List) this.consumer.createMessageStreams(hashMap).get(value);
        this.streamIterators.clear();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            this.streamIterators.add(((KafkaStream) it.next()).iterator());
        }
        this.consumerStreamsReady.set(true);
    }

    @OnStopped
    public void shutdownConsumer() {
        this.consumerStreamsReady.set(false);
        if (this.consumer != null) {
            try {
                this.consumer.commitOffsets();
            } finally {
                this.consumer.shutdown();
            }
        }
        if (this.executor != null) {
            this.executor.shutdown();
            try {
                if (!this.executor.awaitTermination(30000L, TimeUnit.MILLISECONDS)) {
                    this.executor.shutdownNow();
                    getLogger().warn("Executor did not stop in 30 sec. Terminated.");
                }
                this.executor = null;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String str) {
        return new PropertyDescriptor.Builder().description("Specifies the value for '" + str + "' Kafka Configuration.").name(str).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true).build();
    }

    @OnScheduled
    public void schedule(ProcessContext processContext) {
        this.deadlockTimeout = processContext.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue() * 2;
    }

    public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) throws ProcessException {
        synchronized (this.consumerStreamsReady) {
            if (this.executor == null || this.executor.isShutdown()) {
                this.executor = Executors.newCachedThreadPool();
            }
            if (!this.consumerStreamsReady.get()) {
                Future submit = this.executor.submit(new Callable<Void>() { // from class: org.apache.nifi.processors.kafka.GetKafka.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        GetKafka.this.createConsumers(processContext);
                        return null;
                    }
                });
                try {
                    submit.get(this.deadlockTimeout, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    shutdownConsumer();
                    submit.cancel(true);
                    Thread.currentThread().interrupt();
                    getLogger().warn("Interrupted while waiting to get connection", e);
                } catch (ExecutionException e2) {
                    throw new IllegalStateException(e2);
                } catch (TimeoutException e3) {
                    shutdownConsumer();
                    submit.cancel(true);
                    getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while waiting to get connection", e3);
                }
            }
        }
        if (this.consumerStreamsReady.get()) {
            Future submit2 = this.executor.submit(new Callable<Void>() { // from class: org.apache.nifi.processors.kafka.GetKafka.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    ConsumerIterator<byte[], byte[]> streamIterator = GetKafka.this.getStreamIterator();
                    if (streamIterator == null) {
                        return null;
                    }
                    GetKafka.this.consumeFromKafka(processContext, processSession, streamIterator);
                    return null;
                }
            });
            try {
                submit2.get(this.deadlockTimeout, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e4) {
                shutdownConsumer();
                submit2.cancel(true);
                Thread.currentThread().interrupt();
                getLogger().warn("Interrupted while consuming messages", e4);
            } catch (ExecutionException e5) {
                throw new IllegalStateException(e5);
            } catch (TimeoutException e6) {
                shutdownConsumer();
                submit2.cancel(true);
                getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while consuming messages", e6);
            }
        }
    }

    protected ConsumerIterator<byte[], byte[]> getStreamIterator() {
        return this.streamIterators.poll();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void consumeFromKafka(ProcessContext processContext, ProcessSession processSession, ConsumerIterator<byte[], byte[]> consumerIterator) throws ProcessException {
        int intValue = processContext.getProperty(BATCH_SIZE).asInteger().intValue();
        final byte[] bytes = processContext.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t").getBytes(StandardCharsets.UTF_8);
        String value = processContext.getProperty(TOPIC).getValue();
        FlowFile create = processSession.create();
        HashMap hashMap = new HashMap();
        hashMap.put("kafka.topic", value);
        long nanoTime = System.nanoTime();
        int i = 0;
        while (i < intValue) {
            try {
                try {
                    try {
                        if (!consumerIterator.hasNext()) {
                            break;
                        }
                        final MessageAndMetadata next = consumerIterator.next();
                        if (intValue == 1) {
                            byte[] bArr = (byte[]) next.key();
                            if (bArr != null) {
                                hashMap.put("kafka.key", new String(bArr, StandardCharsets.UTF_8));
                            }
                            hashMap.put("kafka.offset", String.valueOf(next.offset()));
                            hashMap.put("kafka.partition", String.valueOf(next.partition()));
                        }
                        final boolean z = i == 0;
                        create = processSession.append(create, new OutputStreamCallback() { // from class: org.apache.nifi.processors.kafka.GetKafka.3
                            public void process(OutputStream outputStream) throws IOException {
                                if (!z) {
                                    outputStream.write(bytes);
                                }
                                outputStream.write((byte[]) next.message());
                            }
                        });
                        i++;
                    } catch (ConsumerTimeoutException e) {
                        releaseFlowFile(create, processSession, hashMap, nanoTime, value, i);
                        if (consumerIterator != null) {
                            this.streamIterators.offer(consumerIterator);
                            return;
                        }
                        return;
                    }
                } catch (Exception e2) {
                    shutdownConsumer();
                    getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[]{e2});
                    if (create != null) {
                        processSession.remove(create);
                    }
                    if (consumerIterator != null) {
                        this.streamIterators.offer(consumerIterator);
                        return;
                    }
                    return;
                }
            } catch (Throwable th) {
                if (consumerIterator != null) {
                    this.streamIterators.offer(consumerIterator);
                }
                throw th;
            }
        }
        releaseFlowFile(create, processSession, hashMap, nanoTime, value, i);
        if (consumerIterator != null) {
            this.streamIterators.offer(consumerIterator);
        }
    }

    private void releaseFlowFile(FlowFile flowFile, ProcessSession processSession, Map<String, String> map, long j, String str, int i) {
        if (flowFile.getSize() == 0) {
            processSession.remove(flowFile);
            return;
        }
        FlowFile putAllAttributes = processSession.putAllAttributes(flowFile, map);
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - j);
        processSession.getProvenanceReporter().receive(putAllAttributes, "kafka://" + str, "Received " + i + " Kafka messages", millis);
        getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[]{putAllAttributes, Integer.valueOf(i), Long.valueOf(millis)});
        processSession.transfer(putAllAttributes, REL_SUCCESS);
    }
}
