package org.apache.nifi.processors.kafka.pubsub;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.kafka.shared.property.PublishStrategy;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
import org.apache.nifi.stream.io.util.StreamDemarcator;

/* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/PublisherLease.class */
public class PublisherLease implements Closeable {
    private final ComponentLog logger;
    private final Producer<byte[], byte[]> producer;
    private final int maxMessageSize;
    private final long maxAckWaitMillis;
    private final boolean useTransactions;
    private final Pattern attributeNameRegex;
    private final Charset headerCharacterSet;
    private final PublishStrategy publishStrategy;
    private final RecordSetWriterFactory recordKeyWriterFactory;
    private volatile boolean poisoned = false;
    private final AtomicLong messagesSent = new AtomicLong(0);
    private volatile boolean transactionsInitialized = false;
    private volatile boolean activeTransaction = false;
    private InFlightMessageTracker tracker;
    private static final RecordField FIELD_TOPIC = new RecordField("topic", RecordFieldType.STRING.getDataType());
    private static final RecordField FIELD_PARTITION = new RecordField("partition", RecordFieldType.INT.getDataType());
    private static final RecordField FIELD_TIMESTAMP = new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType());
    private static final RecordSchema SCHEMA_METADATA = new SimpleRecordSchema(Arrays.asList(FIELD_TOPIC, FIELD_PARTITION, FIELD_TIMESTAMP));
    private static final RecordField FIELD_METADATA = new RecordField("metadata", RecordFieldType.RECORD.getRecordDataType(SCHEMA_METADATA));
    private static final RecordField FIELD_HEADERS = new RecordField("headers", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));

    public PublisherLease(Producer<byte[], byte[]> producer, int i, long j, ComponentLog componentLog, boolean z, Pattern pattern, Charset charset, PublishStrategy publishStrategy, RecordSetWriterFactory recordSetWriterFactory) {
        this.producer = producer;
        this.maxMessageSize = i;
        this.logger = componentLog;
        this.maxAckWaitMillis = j;
        this.useTransactions = z;
        this.attributeNameRegex = pattern;
        this.headerCharacterSet = charset;
        this.publishStrategy = publishStrategy;
        this.recordKeyWriterFactory = recordSetWriterFactory;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void poison() {
        this.poisoned = true;
    }

    public boolean isPoisoned() {
        return this.poisoned;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void beginTransaction() {
        if (this.useTransactions) {
            try {
                if (!this.transactionsInitialized) {
                    this.producer.initTransactions();
                    this.transactionsInitialized = true;
                }
                this.producer.beginTransaction();
                this.activeTransaction = true;
            } catch (Exception e) {
                poison();
                throw e;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void rollback() {
        if (this.useTransactions && this.activeTransaction) {
            try {
                this.producer.abortTransaction();
                this.activeTransaction = false;
            } catch (Exception e) {
                poison();
                throw e;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void fail(FlowFile flowFile, Exception exc) {
        getTracker().fail(flowFile, exc);
        rollback();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void publish(FlowFile flowFile, InputStream inputStream, byte[] bArr, byte[] bArr2, String str, Integer num) throws IOException {
        StreamDemarcator streamDemarcator;
        Throwable th;
        if (this.tracker == null) {
            this.tracker = new InFlightMessageTracker(this.logger);
        }
        if (bArr2 != null) {
            try {
                if (bArr2.length != 0) {
                    try {
                        streamDemarcator = new StreamDemarcator(inputStream, bArr2, this.maxMessageSize);
                        th = null;
                    } catch (TokenTooLargeException e) {
                        this.tracker.fail(flowFile, e);
                    }
                    do {
                        try {
                            try {
                                byte[] nextToken = streamDemarcator.nextToken();
                                if (nextToken == null) {
                                    if (streamDemarcator != null) {
                                        if (0 != 0) {
                                            try {
                                                streamDemarcator.close();
                                            } catch (Throwable th2) {
                                                th.addSuppressed(th2);
                                            }
                                        } else {
                                            streamDemarcator.close();
                                        }
                                    }
                                    return;
                                }
                                publish(flowFile, bArr, nextToken, str, this.tracker, num);
                            } catch (Throwable th3) {
                                th = th3;
                                throw th3;
                            }
                        } catch (Throwable th4) {
                            if (streamDemarcator != null) {
                                if (th != null) {
                                    try {
                                        streamDemarcator.close();
                                    } catch (Throwable th5) {
                                        th.addSuppressed(th5);
                                    }
                                } else {
                                    streamDemarcator.close();
                                }
                            }
                            throw th4;
                        }
                    } while (!this.tracker.isFailed(flowFile));
                    if (streamDemarcator != null) {
                        if (0 == 0) {
                            streamDemarcator.close();
                            return;
                        }
                        try {
                            streamDemarcator.close();
                            return;
                        } catch (Throwable th6) {
                            th.addSuppressed(th6);
                            return;
                        }
                    }
                    return;
                }
            } catch (Exception e2) {
                this.tracker.fail(flowFile, e2);
                poison();
                throw e2;
            }
        }
        if (flowFile.getSize() > this.maxMessageSize) {
            this.tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + this.maxMessageSize + " bytes."));
            return;
        }
        byte[] bArr3 = new byte[(int) flowFile.getSize()];
        StreamUtils.fillBuffer(inputStream, bArr3);
        publish(flowFile, bArr, bArr3, str, this.tracker, num);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void publish(FlowFile flowFile, RecordSet recordSet, RecordSetWriterFactory recordSetWriterFactory, RecordSchema recordSchema, String str, String str2, Function<Record, Integer> function, PublishMetadataStrategy publishMetadataStrategy) throws IOException {
        List<Header> headers;
        byte[] byteArray;
        byte[] messageKey;
        String str3;
        Integer apply;
        if (this.tracker == null) {
            this.tracker = new InFlightMessageTracker(this.logger);
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(1024);
        int i = 0;
        do {
            try {
                try {
                    Record next = recordSet.next();
                    if (next == null) {
                        if (i == 0) {
                            this.tracker.trackEmpty(flowFile);
                        }
                        return;
                    }
                    i++;
                    byteArrayOutputStream.reset();
                    if (PublishStrategy.USE_WRAPPER.equals(this.publishStrategy)) {
                        headers = toHeadersWrapper(next.getValue("headers"));
                        Object value = next.getValue("key");
                        byteArray = toByteArray("value", next.getValue("value"), recordSetWriterFactory, flowFile);
                        messageKey = toByteArray("key", value, this.recordKeyWriterFactory, flowFile);
                        if (publishMetadataStrategy == PublishMetadataStrategy.USE_RECORD_METADATA) {
                            Object value2 = next.getValue("metadata");
                            if (value2 instanceof Record) {
                                Record record = (Record) value2;
                                String asString = record.getAsString("topic");
                                str3 = asString == null ? str2 : asString;
                                try {
                                    apply = record.getAsInt("partition");
                                } catch (Exception e) {
                                    this.logger.warn("Encountered invalid partition for record in {}; will use configured partitioner for Record", new Object[]{flowFile});
                                    apply = function == null ? null : function.apply(next);
                                }
                            } else {
                                str3 = str2;
                                apply = function == null ? null : function.apply(next);
                            }
                        } else {
                            str3 = str2;
                            apply = function == null ? null : function.apply(next);
                        }
                    } else {
                        RecordSetWriter createWriter = recordSetWriterFactory.createWriter(this.logger, recordSchema, byteArrayOutputStream, flowFile);
                        Throwable th = null;
                        try {
                            try {
                                Map<String, ?> attributes = createWriter.write(next).getAttributes();
                                createWriter.flush();
                                if (createWriter != null) {
                                    if (0 != 0) {
                                        try {
                                            createWriter.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        createWriter.close();
                                    }
                                }
                                headers = toHeaders(flowFile, attributes);
                                byteArray = byteArrayOutputStream.toByteArray();
                                messageKey = getMessageKey(flowFile, recordSetWriterFactory, next.getValue(str));
                                str3 = str2;
                                apply = function == null ? null : function.apply(next);
                            } catch (Throwable th3) {
                                th = th3;
                                throw th3;
                            }
                        } catch (Throwable th4) {
                            if (createWriter != null) {
                                if (th != null) {
                                    try {
                                        createWriter.close();
                                    } catch (Throwable th5) {
                                        th.addSuppressed(th5);
                                    }
                                } else {
                                    createWriter.close();
                                }
                            }
                            throw th4;
                        }
                    }
                    publish(flowFile, headers, messageKey, byteArray, str3, this.tracker, apply);
                } catch (SchemaNotFoundException | MalformedRecordException e2) {
                    throw new IOException((Throwable) e2);
                }
            } catch (TokenTooLargeException e3) {
                this.tracker.fail(flowFile, e3);
                return;
            } catch (Exception e4) {
                this.tracker.fail(flowFile, e4);
                poison();
                throw e4;
            }
        } while (!this.tracker.isFailed(flowFile));
    }

    private List<Header> toHeadersWrapper(Object obj) {
        ArrayList arrayList = new ArrayList();
        if (obj instanceof Record) {
            Record record = (Record) obj;
            for (String str : record.getRawFieldNames()) {
                arrayList.add(new RecordHeader(str, record.getAsString(str).getBytes(StandardCharsets.UTF_8)));
            }
        }
        return arrayList;
    }

    private Record toWrapperRecord(Record record, List<Header> list, String str, String str2) {
        Record record2 = (Record) record.getValue(str);
        SimpleRecordSchema simpleRecordSchema = new SimpleRecordSchema(Arrays.asList(FIELD_METADATA, FIELD_HEADERS, new RecordField("key", RecordFieldType.RECORD.getRecordDataType(record2 == null ? null : record2.getSchema())), new RecordField("value", RecordFieldType.RECORD.getRecordDataType(record.getSchema()))));
        HashMap hashMap = new HashMap();
        hashMap.put("topic", str2);
        hashMap.put("timestamp", Long.valueOf(getTimestamp()));
        MapRecord mapRecord = new MapRecord(SCHEMA_METADATA, hashMap);
        HashMap hashMap2 = new HashMap();
        for (Header header : list) {
            hashMap2.put(header.key(), new String(header.value(), this.headerCharacterSet));
        }
        HashMap hashMap3 = new HashMap();
        hashMap3.put("metadata", mapRecord);
        hashMap3.put("headers", hashMap2);
        hashMap3.put("key", record.getValue(str));
        hashMap3.put("value", record);
        return new MapRecord(simpleRecordSchema, hashMap3);
    }

    protected long getTimestamp() {
        return System.currentTimeMillis();
    }

    private List<Header> toHeaders(FlowFile flowFile, Map<String, ?> map) {
        Object value;
        if (this.attributeNameRegex == null) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : flowFile.getAttributes().entrySet()) {
            if (this.attributeNameRegex.matcher((CharSequence) entry.getKey()).matches()) {
                arrayList.add(new RecordHeader((String) entry.getKey(), ((String) entry.getValue()).getBytes(this.headerCharacterSet)));
            }
        }
        for (Map.Entry<String, ?> entry2 : map.entrySet()) {
            if (this.attributeNameRegex.matcher(entry2.getKey()).matches() && (value = entry2.getValue()) != null) {
                arrayList.add(new RecordHeader(entry2.getKey(), value.toString().getBytes(this.headerCharacterSet)));
            }
        }
        return arrayList;
    }

    private byte[] toByteArray(String str, Object obj, RecordSetWriterFactory recordSetWriterFactory, FlowFile flowFile) throws IOException, SchemaNotFoundException, MalformedRecordException {
        if (obj == null) {
            return null;
        }
        if (!(obj instanceof Record)) {
            if (!(obj instanceof Byte[])) {
                if (obj instanceof String) {
                    return ((String) obj).getBytes(StandardCharsets.UTF_8);
                }
                throw new MalformedRecordException(String.format("Couldn't convert %s record data to byte array.", str));
            }
            Byte[] bArr = (Byte[]) obj;
            byte[] bArr2 = new byte[bArr.length];
            for (int i = 0; i < bArr.length; i++) {
                bArr2[i] = bArr[i].byteValue();
            }
            return bArr2;
        }
        if (recordSetWriterFactory == null) {
            throw new MalformedRecordException("Record has a key that is itself a record, but the 'Record Key Writer' of the processor was not configured. If Records are expected to have a Record as the key, the 'Record Key Writer' property must be set.");
        }
        Record record = (Record) obj;
        RecordSchema schema = record.getSchema();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Throwable th = null;
        try {
            RecordSetWriter createWriter = recordSetWriterFactory.createWriter(this.logger, schema, byteArrayOutputStream, flowFile);
            Throwable th2 = null;
            try {
                try {
                    createWriter.write(record);
                    createWriter.flush();
                    byte[] byteArray = byteArrayOutputStream.toByteArray();
                    if (createWriter != null) {
                        if (0 != 0) {
                            try {
                                createWriter.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createWriter.close();
                        }
                    }
                    return byteArray;
                } finally {
                }
            } catch (Throwable th4) {
                if (createWriter != null) {
                    if (th2 != null) {
                        try {
                            createWriter.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createWriter.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (byteArrayOutputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayOutputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    byteArrayOutputStream.close();
                }
            }
        }
    }

    private byte[] getMessageKey(FlowFile flowFile, RecordSetWriterFactory recordSetWriterFactory, Object obj) throws IOException, SchemaNotFoundException {
        byte[] bytes;
        if (obj == null) {
            bytes = null;
        } else if (obj instanceof byte[]) {
            bytes = (byte[]) obj;
        } else if (obj instanceof Byte[]) {
            Byte[] bArr = (Byte[]) obj;
            byte[] bArr2 = new byte[bArr.length];
            for (int i = 0; i < bArr.length; i++) {
                bArr2[i] = bArr[i].byteValue();
            }
            bytes = bArr2;
        } else if (obj instanceof Record) {
            Record record = (Record) obj;
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(1024);
            Throwable th = null;
            try {
                RecordSetWriter createWriter = recordSetWriterFactory.createWriter(this.logger, record.getSchema(), byteArrayOutputStream, flowFile);
                Throwable th2 = null;
                try {
                    createWriter.write(record);
                    createWriter.flush();
                    if (createWriter != null) {
                        if (0 != 0) {
                            try {
                                createWriter.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createWriter.close();
                        }
                    }
                    bytes = byteArrayOutputStream.toByteArray();
                    if (byteArrayOutputStream != null) {
                        if (0 != 0) {
                            try {
                                byteArrayOutputStream.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            byteArrayOutputStream.close();
                        }
                    }
                } catch (Throwable th5) {
                    if (createWriter != null) {
                        if (0 != 0) {
                            try {
                                createWriter.close();
                            } catch (Throwable th6) {
                                th2.addSuppressed(th6);
                            }
                        } else {
                            createWriter.close();
                        }
                    }
                    throw th5;
                }
            } catch (Throwable th7) {
                if (byteArrayOutputStream != null) {
                    if (0 != 0) {
                        try {
                            byteArrayOutputStream.close();
                        } catch (Throwable th8) {
                            th.addSuppressed(th8);
                        }
                    } else {
                        byteArrayOutputStream.close();
                    }
                }
                throw th7;
            }
        } else {
            bytes = obj.toString().getBytes(StandardCharsets.UTF_8);
        }
        return bytes;
    }

    private void addHeaders(FlowFile flowFile, Map<String, String> map, ProducerRecord<?, ?> producerRecord) {
        if (this.attributeNameRegex == null) {
            return;
        }
        Headers headers = producerRecord.headers();
        for (Map.Entry entry : flowFile.getAttributes().entrySet()) {
            if (this.attributeNameRegex.matcher((CharSequence) entry.getKey()).matches()) {
                headers.add((String) entry.getKey(), ((String) entry.getValue()).getBytes(this.headerCharacterSet));
            }
        }
        for (Map.Entry<String, String> entry2 : map.entrySet()) {
            if (this.attributeNameRegex.matcher(entry2.getKey()).matches()) {
                headers.add(entry2.getKey(), entry2.getValue().getBytes(this.headerCharacterSet));
            }
        }
    }

    protected void publish(FlowFile flowFile, byte[] bArr, byte[] bArr2, String str, InFlightMessageTracker inFlightMessageTracker, Integer num) {
        publish(flowFile, Collections.emptyList(), bArr, bArr2, str, inFlightMessageTracker, num);
    }

    protected void publish(final FlowFile flowFile, List<Header> list, byte[] bArr, byte[] bArr2, String str, final InFlightMessageTracker inFlightMessageTracker, Integer num) {
        this.producer.send(new ProducerRecord(str, num == null ? null : Integer.valueOf(Math.abs(num.intValue()) % this.producer.partitionsFor(str).size()), bArr, bArr2, list), new Callback() { // from class: org.apache.nifi.processors.kafka.pubsub.PublisherLease.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc == null) {
                    inFlightMessageTracker.incrementAcknowledgedCount(flowFile);
                } else {
                    inFlightMessageTracker.fail(flowFile, exc);
                    PublisherLease.this.poison();
                }
            }
        });
        this.messagesSent.incrementAndGet();
        inFlightMessageTracker.incrementSentCount(flowFile);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void ackConsumerOffsets(String str, int i, long j, Integer num, String str2) {
        Map singletonMap = Collections.singletonMap(new TopicPartition(str, i), new OffsetAndMetadata(j + 1, Optional.ofNullable(num), (String) null));
        this.logger.debug("Acknowledging Consumer Offsets for topic={}, partition={}, offset={}, consumerGroup={}, leaderEpoch={}", new Object[]{str, Integer.valueOf(i), Long.valueOf(j), str2, num});
        this.producer.sendOffsetsToTransaction(singletonMap, str2);
    }

    public PublishResult complete() {
        if (this.tracker == null) {
            if (this.messagesSent.get() == 0) {
                return PublishResult.EMPTY;
            }
            rollback();
            throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
        }
        try {
            this.producer.flush();
            if (this.activeTransaction) {
                this.producer.commitTransaction();
                this.activeTransaction = false;
            }
            try {
                this.tracker.awaitCompletion(this.maxAckWaitMillis);
                return this.tracker.createPublishResult();
            } catch (InterruptedException e) {
                this.logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
                Thread.currentThread().interrupt();
                return this.tracker.failOutstanding(e);
            } catch (TimeoutException e2) {
                this.logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
                return this.tracker.failOutstanding(e2);
            } finally {
                this.tracker = null;
            }
        } catch (ProducerFencedException | FencedInstanceIdException e3) {
            throw e3;
        } catch (Exception e4) {
            poison();
            throw e4;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.producer.close(this.maxAckWaitMillis, TimeUnit.MILLISECONDS);
        this.tracker = null;
    }

    public InFlightMessageTracker getTracker() {
        if (this.tracker == null) {
            this.tracker = new InFlightMessageTracker(this.logger);
        }
        return this.tracker;
    }

    public List<ConfigVerificationResult> verifyConfiguration(String str) {
        ArrayList arrayList = new ArrayList();
        try {
            arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Determine Topic Partitions").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Determined that there are " + this.producer.partitionsFor(str).size() + " partitions for topic " + str).build());
        } catch (Exception e) {
            this.logger.error("Failed to determine Partition Information for Topic {} in order to verify configuration", new Object[]{str, e});
            arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Determine Topic Partitions").outcome(ConfigVerificationResult.Outcome.FAILED).explanation("Could not fetch Partition Information: " + e).build());
        }
        return arrayList;
    }
}
