package org.apache.nifi.processors.kafka.pubsub;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.xml.bind.DatatypeConverter;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;

/* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/ConsumerLease.class */
public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListener {
    private final long maxWaitMillis;
    private final Consumer<byte[], byte[]> kafkaConsumer;
    private final ComponentLog logger;
    private final byte[] demarcatorBytes;
    private final String keyEncoding;
    private final String securityProtocol;
    private final String bootstrapServers;
    private final RecordSetWriterFactory writerFactory;
    private final RecordReaderFactory readerFactory;
    private final Charset headerCharacterSet;
    private final Pattern headerNamePattern;
    private boolean poisoned = false;
    private final Map<BundleInformation, BundleTracker> bundleMap = new HashMap();
    private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap();
    private long leaseStartNanos = -1;
    private boolean lastPollEmpty = false;
    private int totalMessages = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/ConsumerLease$BundleInformation.class */
    public static class BundleInformation {
        private final TopicPartition topicPartition;
        private final RecordSchema schema;
        private final Map<String, String> attributes;

        public BundleInformation(TopicPartition topicPartition, RecordSchema recordSchema, Map<String, String> map) {
            this.topicPartition = topicPartition;
            this.schema = recordSchema;
            this.attributes = map;
        }

        public int hashCode() {
            return 41 + (13 * this.topicPartition.hashCode()) + (this.schema == null ? 0 : 13 * this.schema.hashCode()) + (this.attributes == null ? 0 : 13 * this.attributes.hashCode());
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (obj == null || !(obj instanceof BundleInformation)) {
                return false;
            }
            BundleInformation bundleInformation = (BundleInformation) obj;
            return Objects.equals(this.topicPartition, bundleInformation.topicPartition) && Objects.equals(this.schema, bundleInformation.schema) && Objects.equals(this.attributes, bundleInformation.attributes);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/ConsumerLease$BundleTracker.class */
    public static class BundleTracker {
        final long initialOffset;
        final int partition;
        final String topic;
        final String key;
        final RecordSetWriter recordWriter;
        FlowFile flowFile;
        long totalRecords;

        private BundleTracker(ConsumerRecord<byte[], byte[]> consumerRecord, TopicPartition topicPartition, String str) {
            this(consumerRecord, topicPartition, str, (RecordSetWriter) null);
        }

        private BundleTracker(ConsumerRecord<byte[], byte[]> consumerRecord, TopicPartition topicPartition, String str, RecordSetWriter recordSetWriter) {
            this.totalRecords = 0L;
            this.initialOffset = consumerRecord.offset();
            this.partition = topicPartition.partition();
            this.topic = topicPartition.topic();
            this.recordWriter = recordSetWriter;
            this.key = ConsumerLease.encodeKafkaKey((byte[]) consumerRecord.key(), str);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void incrementRecordCount(long j) {
            this.totalRecords += j;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void updateFlowFile(FlowFile flowFile) {
            this.flowFile = flowFile;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerLease(long j, Consumer<byte[], byte[]> consumer, byte[] bArr, String str, String str2, String str3, RecordReaderFactory recordReaderFactory, RecordSetWriterFactory recordSetWriterFactory, ComponentLog componentLog, Charset charset, Pattern pattern) {
        this.maxWaitMillis = j;
        this.kafkaConsumer = consumer;
        this.demarcatorBytes = bArr;
        this.keyEncoding = str;
        this.securityProtocol = str2;
        this.bootstrapServers = str3;
        this.readerFactory = recordReaderFactory;
        this.writerFactory = recordSetWriterFactory;
        this.logger = componentLog;
        this.headerCharacterSet = charset;
        this.headerNamePattern = pattern;
    }

    private void resetInternalState() {
        this.bundleMap.clear();
        this.uncommittedOffsetsMap.clear();
        this.leaseStartNanos = -1L;
        this.lastPollEmpty = false;
        this.totalMessages = 0;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        this.logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{collection, this, this.kafkaConsumer});
        commit();
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        this.logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{collection, this, this.kafkaConsumer});
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void poll() {
        try {
            ConsumerRecords<byte[], byte[]> poll = this.kafkaConsumer.poll(10L);
            this.lastPollEmpty = poll.count() == 0;
            processRecords(poll);
        } catch (ProcessException e) {
            throw e;
        } catch (Throwable th) {
            poison();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean commit() {
        if (this.uncommittedOffsetsMap.isEmpty()) {
            resetInternalState();
            return false;
        }
        try {
            Collection<FlowFile> bundles = getBundles();
            if (!bundles.isEmpty()) {
                getProcessSession().transfer(bundles, ConsumeKafkaRecord_0_11.REL_SUCCESS);
            }
            getProcessSession().commit();
            this.kafkaConsumer.commitSync(this.uncommittedOffsetsMap);
            resetInternalState();
            return true;
        } catch (IOException e) {
            poison();
            this.logger.error("Failed to finish writing out FlowFile bundle", e);
            throw new ProcessException(e);
        } catch (KafkaException e2) {
            poison();
            this.logger.warn("Duplicates are likely as we were able to commit the process session but received an exception from Kafka while committing offsets.");
            throw e2;
        } catch (Throwable th) {
            poison();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean continuePolling() {
        if (this.lastPollEmpty) {
            return false;
        }
        if (this.leaseStartNanos < 0) {
            this.leaseStartNanos = System.nanoTime();
        }
        return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.leaseStartNanos) <= this.maxWaitMillis && this.bundleMap.size() <= 200 && this.totalMessages < 1000;
    }

    private void poison() {
        this.poisoned = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isPoisoned() {
        return this.poisoned;
    }

    public void wakeup() {
        this.kafkaConsumer.wakeup();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        resetInternalState();
    }

    public abstract ProcessSession getProcessSession();

    public abstract void yield();

    private void processRecords(ConsumerRecords<byte[], byte[]> consumerRecords) {
        consumerRecords.partitions().stream().forEach(topicPartition -> {
            List<ConsumerRecord<byte[], byte[]>> records = consumerRecords.records(topicPartition);
            if (records.isEmpty()) {
                return;
            }
            long asLong = records.stream().mapToLong(consumerRecord -> {
                return consumerRecord.offset();
            }).max().getAsLong();
            if (this.demarcatorBytes != null) {
                writeDemarcatedData(getProcessSession(), records, topicPartition);
            } else if (this.readerFactory == null || this.writerFactory == null) {
                records.stream().forEach(consumerRecord2 -> {
                    writeData(getProcessSession(), consumerRecord2, topicPartition);
                });
            } else {
                writeRecordData(getProcessSession(), records, topicPartition);
            }
            this.totalMessages += records.size();
            this.uncommittedOffsetsMap.put(topicPartition, new OffsetAndMetadata(asLong + 1));
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String encodeKafkaKey(byte[] bArr, String str) {
        if (bArr == null) {
            return null;
        }
        if (KafkaProcessorUtils.HEX_ENCODING.getValue().equals(str)) {
            return DatatypeConverter.printHexBinary(bArr);
        }
        if (KafkaProcessorUtils.UTF8_ENCODING.getValue().equals(str)) {
            return new String(bArr, StandardCharsets.UTF_8);
        }
        return null;
    }

    private Collection<FlowFile> getBundles() throws IOException {
        ArrayList arrayList = new ArrayList();
        for (BundleTracker bundleTracker : this.bundleMap.values()) {
            if (processBundle(bundleTracker)) {
                arrayList.add(bundleTracker.flowFile);
            }
        }
        return arrayList;
    }

    private boolean processBundle(BundleTracker bundleTracker) throws IOException {
        RecordSetWriter recordSetWriter = bundleTracker.recordWriter;
        if (recordSetWriter != null) {
            try {
                WriteResult finishRecordSet = recordSetWriter.finishRecordSet();
                recordSetWriter.close();
                if (finishRecordSet.getRecordCount() == 0) {
                    getProcessSession().remove(bundleTracker.flowFile);
                    return false;
                }
                HashMap hashMap = new HashMap();
                hashMap.putAll(finishRecordSet.getAttributes());
                hashMap.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType());
                bundleTracker.flowFile = getProcessSession().putAllAttributes(bundleTracker.flowFile, hashMap);
            } catch (Throwable th) {
                recordSetWriter.close();
                throw th;
            }
        }
        populateAttributes(bundleTracker);
        return true;
    }

    private void writeData(ProcessSession processSession, ConsumerRecord<byte[], byte[]> consumerRecord, TopicPartition topicPartition) {
        FlowFile create = processSession.create();
        BundleTracker bundleTracker = new BundleTracker(consumerRecord, topicPartition, this.keyEncoding);
        bundleTracker.incrementRecordCount(1L);
        byte[] bArr = (byte[]) consumerRecord.value();
        if (bArr != null) {
            create = processSession.write(create, outputStream -> {
                outputStream.write(bArr);
            });
        }
        bundleTracker.updateFlowFile(processSession.putAllAttributes(create, getAttributes(consumerRecord)));
        populateAttributes(bundleTracker);
        processSession.transfer(bundleTracker.flowFile, ConsumeKafkaRecord_0_11.REL_SUCCESS);
    }

    private void writeDemarcatedData(ProcessSession processSession, List<ConsumerRecord<byte[], byte[]>> list, TopicPartition topicPartition) {
        boolean z;
        for (Map.Entry entry : ((Map) list.stream().collect(Collectors.groupingBy(consumerRecord -> {
            return new BundleInformation(topicPartition, null, getAttributes(consumerRecord));
        }))).entrySet()) {
            BundleInformation bundleInformation = (BundleInformation) entry.getKey();
            List list2 = (List) entry.getValue();
            BundleTracker bundleTracker = this.bundleMap.get(bundleInformation);
            if (bundleTracker == null) {
                bundleTracker = new BundleTracker((ConsumerRecord) list2.get(0), topicPartition, this.keyEncoding);
                bundleTracker.updateFlowFile(processSession.putAllAttributes(processSession.create(), bundleInformation.attributes));
                z = false;
            } else {
                z = true;
            }
            boolean z2 = z;
            FlowFile flowFile = bundleTracker.flowFile;
            bundleTracker.incrementRecordCount(list2.size());
            bundleTracker.updateFlowFile(processSession.append(flowFile, outputStream -> {
                boolean z3 = z2;
                Iterator it = list2.iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord2 = (ConsumerRecord) it.next();
                    if (z3) {
                        outputStream.write(this.demarcatorBytes);
                    }
                    if (((byte[]) consumerRecord2.value()) != null) {
                        outputStream.write((byte[]) consumerRecord2.value());
                    }
                    z3 = true;
                }
            }));
            this.bundleMap.put(bundleInformation, bundleTracker);
        }
    }

    private void handleParseFailure(ConsumerRecord<byte[], byte[]> consumerRecord, ProcessSession processSession, Exception exc) {
        handleParseFailure(consumerRecord, processSession, exc, "Failed to parse message from Kafka using the configured Record Reader. Will route message as its own FlowFile to the 'parse.failure' relationship");
    }

    private void handleParseFailure(ConsumerRecord<byte[], byte[]> consumerRecord, ProcessSession processSession, Exception exc, String str) {
        Map<String, String> attributes = getAttributes(consumerRecord);
        attributes.put("kafka.offset", String.valueOf(consumerRecord.offset()));
        attributes.put("kafka.partition", String.valueOf(consumerRecord.partition()));
        attributes.put("kafka.topic", consumerRecord.topic());
        FlowFile create = processSession.create();
        byte[] bArr = (byte[]) consumerRecord.value();
        if (bArr != null) {
            create = processSession.write(create, outputStream -> {
                outputStream.write(bArr);
            });
        }
        FlowFile putAllAttributes = processSession.putAllAttributes(create, attributes);
        processSession.getProvenanceReporter().receive(putAllAttributes, KafkaProcessorUtils.buildTransitURI(this.securityProtocol, this.bootstrapServers, consumerRecord.topic()));
        processSession.transfer(putAllAttributes, ConsumeKafkaRecord_0_11.REL_PARSE_FAILURE);
        if (exc == null) {
            this.logger.error(str);
        } else {
            this.logger.error(str, exc);
        }
        processSession.adjustCounter("Parse Failures", 1L, false);
    }

    private Map<String, String> getAttributes(ConsumerRecord<?, ?> consumerRecord) {
        HashMap hashMap = new HashMap();
        if (this.headerNamePattern == null) {
            return hashMap;
        }
        for (Header header : consumerRecord.headers()) {
            String key = header.key();
            if (this.headerNamePattern.matcher(key).matches()) {
                hashMap.put(key, new String(header.value(), this.headerCharacterSet));
            }
        }
        return hashMap;
    }

    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Multi-variable type inference failed */
    private void writeRecordData(ProcessSession processSession, List<ConsumerRecord<byte[], byte[]>> list, TopicPartition topicPartition) {
        RecordSetWriter recordSetWriter = null;
        try {
            for (ConsumerRecord<byte[], byte[]> consumerRecord : list) {
                Map<String, String> attributes = getAttributes(consumerRecord);
                ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(consumerRecord.value() == null ? new byte[0] : (byte[]) consumerRecord.value());
                Throwable th = null;
                try {
                    try {
                        try {
                            RecordReader createRecordReader = this.readerFactory.createRecordReader(attributes, byteArrayInputStream, this.logger);
                            while (true) {
                                Record nextRecord = createRecordReader.nextRecord();
                                if (nextRecord == null) {
                                    break;
                                }
                                RecordSchema schema = nextRecord.getSchema();
                                BundleInformation bundleInformation = new BundleInformation(topicPartition, schema, attributes);
                                BundleTracker bundleTracker = this.bundleMap.get(bundleInformation);
                                if (bundleTracker == null) {
                                    FlowFile putAllAttributes = processSession.putAllAttributes(processSession.create(), attributes);
                                    try {
                                        recordSetWriter = this.writerFactory.createWriter(this.logger, this.writerFactory.getSchema(putAllAttributes.getAttributes(), schema), processSession.write(putAllAttributes));
                                        recordSetWriter.beginRecordSet();
                                        bundleTracker = new BundleTracker(consumerRecord, topicPartition, this.keyEncoding, recordSetWriter);
                                        bundleTracker.updateFlowFile(putAllAttributes);
                                        this.bundleMap.put(bundleInformation, bundleTracker);
                                    } catch (Exception e) {
                                        this.logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
                                        rollback(topicPartition);
                                        yield();
                                        throw new ProcessException(e);
                                    }
                                } else {
                                    recordSetWriter = bundleTracker.recordWriter;
                                }
                                try {
                                    recordSetWriter.write(nextRecord);
                                    bundleTracker.incrementRecordCount(1L);
                                    processSession.adjustCounter("Records Received", 1L, false);
                                } catch (RuntimeException e2) {
                                    handleParseFailure(consumerRecord, processSession, e2, "Failed to write message from Kafka using the configured Record Writer. Will route message as its own FlowFile to the 'parse.failure' relationship");
                                }
                            }
                            if (byteArrayInputStream != null) {
                                if (0 != 0) {
                                    try {
                                        byteArrayInputStream.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    byteArrayInputStream.close();
                                }
                            }
                        } catch (Throwable th3) {
                            if (byteArrayInputStream != null) {
                                if (0 != 0) {
                                    try {
                                        byteArrayInputStream.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    byteArrayInputStream.close();
                                }
                            }
                            throw th3;
                        }
                    } catch (IOException e3) {
                        yield();
                        rollback(topicPartition);
                        handleParseFailure(consumerRecord, processSession, e3, "Failed to parse message from Kafka due to comms failure. Will roll back session and try again momentarily.");
                        closeWriter(recordSetWriter);
                        if (byteArrayInputStream != null) {
                            if (0 == 0) {
                                byteArrayInputStream.close();
                                return;
                            }
                            try {
                                byteArrayInputStream.close();
                                return;
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                                return;
                            }
                        }
                        return;
                    }
                } catch (Exception e4) {
                    handleParseFailure(consumerRecord, processSession, e4);
                    if (byteArrayInputStream != null) {
                        if (0 != 0) {
                            try {
                                byteArrayInputStream.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        } else {
                            byteArrayInputStream.close();
                        }
                    }
                }
            }
        } catch (Exception e5) {
            this.logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e5);
            closeWriter(recordSetWriter);
            rollback(topicPartition);
            throw new ProcessException(e5);
        }
    }

    private void closeWriter(RecordSetWriter recordSetWriter) {
        if (recordSetWriter != null) {
            try {
                recordSetWriter.close();
            } catch (Exception e) {
                this.logger.warn("Failed to close Record Writer", e);
            }
        }
    }

    private void rollback(TopicPartition topicPartition) {
        try {
            OffsetAndMetadata offsetAndMetadata = this.uncommittedOffsetsMap.get(topicPartition);
            if (offsetAndMetadata == null) {
                offsetAndMetadata = this.kafkaConsumer.committed(topicPartition);
            }
            this.kafkaConsumer.seek(topicPartition, offsetAndMetadata == null ? 0L : offsetAndMetadata.offset());
        } catch (Exception e) {
            this.logger.warn("Attempted to rollback Kafka message offset but was unable to do so", e);
        }
    }

    private void populateAttributes(BundleTracker bundleTracker) {
        HashMap hashMap = new HashMap();
        hashMap.put("kafka.offset", String.valueOf(bundleTracker.initialOffset));
        if (bundleTracker.key != null && bundleTracker.totalRecords == 1) {
            hashMap.put("kafka.key", bundleTracker.key);
        }
        hashMap.put("kafka.partition", String.valueOf(bundleTracker.partition));
        hashMap.put("kafka.topic", bundleTracker.topic);
        if (bundleTracker.totalRecords > 1) {
            if (bundleTracker.recordWriter == null) {
                hashMap.put("kafka.count", String.valueOf(bundleTracker.totalRecords));
            } else {
                hashMap.put("record.count", String.valueOf(bundleTracker.totalRecords));
            }
        }
        FlowFile putAllAttributes = getProcessSession().putAllAttributes(bundleTracker.flowFile, hashMap);
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.leaseStartNanos);
        getProcessSession().getProvenanceReporter().receive(putAllAttributes, KafkaProcessorUtils.buildTransitURI(this.securityProtocol, this.bootstrapServers, bundleTracker.topic), millis);
        bundleTracker.updateFlowFile(putAllAttributes);
    }
}
