package org.apache.nifi.processors.kafka.pubsub;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Consumes messages from Apache Kafka,specifically built against the Kafka 0.9.x Consumer API. The complementary NiFi processor for sending messages is PublishKafka.")
@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.9.x"})
/* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.class */
public class ConsumeKafka extends AbstractKafkaProcessor<Consumer<byte[], byte[]>> {
    static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
    static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
    static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
    static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder().name("group.id").displayName("Group ID").description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(false).build();
    static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder().name("auto.offset.reset").displayName("Offset Reset").description("Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.").required(true).allowableValues(new AllowableValue[]{OFFSET_EARLIEST, OFFSET_LATEST, OFFSET_NONE}).defaultValue(OFFSET_LATEST.getValue()).build();
    static final PropertyDescriptor MESSAGE_DEMARCATOR = MESSAGE_DEMARCATOR_BUILDER.description("Since KafkaConsumer receives messages in batches, you have an option to output a single FlowFile which contains all Kafka messages in a single batch and this property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received in a batch will result in a single FlowFile which essentially means that this processor may output multiple FlowFiles for each time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS").build();
    static final List<PropertyDescriptor> DESCRIPTORS;
    static final Set<Relationship> RELATIONSHIPS;
    private volatile byte[] demarcatorBytes;
    private volatile String topic;
    private volatile String brokers;

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    @Override // org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessor
    @OnStopped
    public void close() {
        if (this.kafkaResource != 0) {
            try {
                this.kafkaResource.unsubscribe();
            } finally {
                super.close();
            }
        }
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return DESCRIPTORS;
    }

    @Override // org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessor
    protected boolean rendezvousWithKafka(ProcessContext processContext, ProcessSession processSession) {
        ConsumerRecords poll = this.kafkaResource.poll(100L);
        if (poll != null && !poll.isEmpty()) {
            long nanoTime = System.nanoTime();
            FlowFile create = processSession.create();
            final AtomicInteger atomicInteger = new AtomicInteger();
            final HashMap hashMap = new HashMap();
            final Iterator it = poll.iterator();
            while (it.hasNext()) {
                create = processSession.putAllAttributes(processSession.append(create, new OutputStreamCallback() { // from class: org.apache.nifi.processors.kafka.pubsub.ConsumeKafka.1
                    public void process(OutputStream outputStream) throws IOException {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        hashMap.put("kafka.offset", String.valueOf(consumerRecord.offset()));
                        if (consumerRecord.key() != null) {
                            hashMap.put("kafka.key", new String((byte[]) consumerRecord.key(), StandardCharsets.UTF_8));
                        }
                        hashMap.put("kafka.partition", String.valueOf(consumerRecord.partition()));
                        hashMap.put("kafka.topic", consumerRecord.topic());
                        if (atomicInteger.getAndIncrement() > 0 && ConsumeKafka.this.demarcatorBytes != null) {
                            outputStream.write(ConsumeKafka.this.demarcatorBytes);
                        }
                        outputStream.write((byte[]) consumerRecord.value());
                    }
                }), hashMap);
                if (it.hasNext() && this.demarcatorBytes == null) {
                    releaseFlowFile(create, processContext, processSession, nanoTime, atomicInteger.get());
                    create = processSession.create();
                    atomicInteger.set(0);
                }
            }
            releaseFlowFile(create, processContext, processSession, nanoTime, atomicInteger.get());
        }
        return (poll == null || poll.isEmpty()) ? false : true;
    }

    @Override // org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessor
    protected void postCommit(ProcessContext processContext) {
        this.kafkaResource.commitSync();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessor
    public Consumer<byte[], byte[]> buildKafkaResource(ProcessContext processContext, ProcessSession processSession) {
        this.demarcatorBytes = processContext.getProperty(MESSAGE_DEMARCATOR).isSet() ? processContext.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8) : null;
        this.topic = processContext.getProperty(TOPIC).evaluateAttributeExpressions().getValue();
        this.brokers = processContext.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
        Properties buildKafkaProperties = buildKafkaProperties(processContext);
        if (!"false".equals(buildKafkaProperties.get("check.connection"))) {
            checkIfInitialConnectionPossible();
        }
        buildKafkaProperties.put("key.deserializer", ByteArrayDeserializer.class.getName());
        buildKafkaProperties.put("value.deserializer", ByteArrayDeserializer.class.getName());
        KafkaConsumer kafkaConsumer = new KafkaConsumer(buildKafkaProperties);
        kafkaConsumer.subscribe(Collections.singletonList(this.topic));
        return kafkaConsumer;
    }

    private void checkIfInitialConnectionPossible() {
        String[] split = this.brokers.split(",");
        boolean z = false;
        for (int i = 0; i < split.length && !z; i++) {
            String str = split[i];
            String[] split2 = str.split(":");
            Socket socket = null;
            try {
                try {
                    socket = new Socket();
                    socket.connect(new InetSocketAddress(split2[0].trim(), Integer.parseInt(split2[1].trim())), 10000);
                    z = true;
                    try {
                        socket.close();
                    } catch (IOException e) {
                    }
                } catch (Throwable th) {
                    try {
                        socket.close();
                    } catch (IOException e2) {
                    }
                    throw th;
                }
            } catch (Exception e3) {
                this.logger.error("Connection to '" + str + "' is not possible", e3);
                try {
                    socket.close();
                } catch (IOException e4) {
                }
            }
        }
        if (!z) {
            throw new ProcessException("Connection to " + this.brokers + " is not possible. See logs for more details");
        }
    }

    private void releaseFlowFile(FlowFile flowFile, ProcessContext processContext, ProcessSession processSession, long j, int i) {
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - j);
        processSession.getProvenanceReporter().receive(flowFile, buildTransitURI(processContext.getProperty(SECURITY_PROTOCOL).getValue(), this.brokers, this.topic), "Received " + i + " Kafka messages", millis);
        getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[]{flowFile, Integer.valueOf(i), Long.valueOf(millis)});
        processSession.transfer(flowFile, REL_SUCCESS);
    }

    static {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(SHARED_DESCRIPTORS);
        arrayList.add(GROUP_ID);
        arrayList.add(AUTO_OFFSET_RESET);
        arrayList.add(MESSAGE_DEMARCATOR);
        DESCRIPTORS = Collections.unmodifiableList(arrayList);
        RELATIONSHIPS = Collections.unmodifiableSet(SHARED_RELATIONSHIPS);
    }
}
