package org.apache.nifi.processors.kafka.pubsub;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
import org.apache.nifi.stream.io.util.StreamDemarcator;

/* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/PublisherLease.class */
public class PublisherLease implements Closeable {
    private final ComponentLog logger;
    private final Producer<byte[], byte[]> producer;
    private final int maxMessageSize;
    private final long maxAckWaitMillis;
    private final boolean useTransactions;
    private final Pattern attributeNameRegex;
    private final Charset headerCharacterSet;
    private volatile boolean poisoned = false;
    private final AtomicLong messagesSent = new AtomicLong(0);
    private volatile boolean transactionsInitialized = false;
    private volatile boolean activeTransaction = false;
    private InFlightMessageTracker tracker;

    public PublisherLease(Producer<byte[], byte[]> producer, int i, long j, ComponentLog componentLog, boolean z, Pattern pattern, Charset charset) {
        this.producer = producer;
        this.maxMessageSize = i;
        this.logger = componentLog;
        this.maxAckWaitMillis = j;
        this.useTransactions = z;
        this.attributeNameRegex = pattern;
        this.headerCharacterSet = charset;
    }

    protected void poison() {
        this.poisoned = true;
    }

    public boolean isPoisoned() {
        return this.poisoned;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void beginTransaction() {
        if (this.useTransactions) {
            if (!this.transactionsInitialized) {
                this.producer.initTransactions();
                this.transactionsInitialized = true;
            }
            this.producer.beginTransaction();
            this.activeTransaction = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void rollback() {
        if (this.useTransactions && this.activeTransaction) {
            this.producer.abortTransaction();
            this.activeTransaction = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void fail(FlowFile flowFile, Exception exc) {
        getTracker().fail(flowFile, exc);
        rollback();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void publish(FlowFile flowFile, InputStream inputStream, byte[] bArr, byte[] bArr2, String str) throws IOException {
        if (this.tracker == null) {
            this.tracker = new InFlightMessageTracker(this.logger);
        }
        if (bArr2 != null) {
            try {
                if (bArr2.length != 0) {
                    try {
                        StreamDemarcator streamDemarcator = new StreamDemarcator(inputStream, bArr2, this.maxMessageSize);
                        Throwable th = null;
                        while (true) {
                            try {
                                try {
                                    byte[] nextToken = streamDemarcator.nextToken();
                                    if (nextToken != null) {
                                        publish(flowFile, bArr, nextToken, str, this.tracker);
                                        if (this.tracker.isFailed(flowFile)) {
                                            if (streamDemarcator != null) {
                                                if (0 == 0) {
                                                    streamDemarcator.close();
                                                    return;
                                                }
                                                try {
                                                    streamDemarcator.close();
                                                    return;
                                                } catch (Throwable th2) {
                                                    th.addSuppressed(th2);
                                                    return;
                                                }
                                            }
                                            return;
                                        }
                                        this.tracker.trackEmpty(flowFile);
                                    } else if (streamDemarcator != null) {
                                        if (0 != 0) {
                                            try {
                                                streamDemarcator.close();
                                            } catch (Throwable th3) {
                                                th.addSuppressed(th3);
                                            }
                                        } else {
                                            streamDemarcator.close();
                                        }
                                    }
                                } catch (Throwable th4) {
                                    if (streamDemarcator != null) {
                                        if (th != null) {
                                            try {
                                                streamDemarcator.close();
                                            } catch (Throwable th5) {
                                                th.addSuppressed(th5);
                                            }
                                        } else {
                                            streamDemarcator.close();
                                        }
                                    }
                                    throw th4;
                                }
                            } catch (Throwable th6) {
                                th = th6;
                                throw th6;
                            }
                        }
                    } catch (TokenTooLargeException e) {
                        this.tracker.fail(flowFile, e);
                    }
                }
            } catch (Exception e2) {
                this.tracker.fail(flowFile, e2);
                poison();
                throw e2;
            }
        }
        if (flowFile.getSize() > this.maxMessageSize) {
            this.tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + this.maxMessageSize + " bytes."));
            return;
        }
        byte[] bArr3 = new byte[(int) flowFile.getSize()];
        StreamUtils.fillBuffer(inputStream, bArr3);
        publish(flowFile, bArr, bArr3, str, this.tracker);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void publish(FlowFile flowFile, RecordSet recordSet, RecordSetWriterFactory recordSetWriterFactory, RecordSchema recordSchema, String str, String str2) throws IOException {
        if (this.tracker == null) {
            this.tracker = new InFlightMessageTracker(this.logger);
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(1024);
        int i = 0;
        do {
            try {
                Record next = recordSet.next();
                if (next == null) {
                    if (i == 0) {
                        this.tracker.trackEmpty(flowFile);
                    }
                    return;
                }
                i++;
                byteArrayOutputStream.reset();
                Collections.emptyMap();
                RecordSetWriter createWriter = recordSetWriterFactory.createWriter(this.logger, recordSchema, byteArrayOutputStream);
                Throwable th = null;
                try {
                    try {
                        Map<String, String> attributes = createWriter.write(next).getAttributes();
                        createWriter.flush();
                        if (createWriter != null) {
                            if (0 != 0) {
                                try {
                                    createWriter.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                createWriter.close();
                            }
                        }
                        byte[] byteArray = byteArrayOutputStream.toByteArray();
                        String asString = str == null ? null : next.getAsString(str);
                        publish(flowFile, attributes, asString == null ? null : asString.getBytes(StandardCharsets.UTF_8), byteArray, str2, this.tracker);
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (createWriter != null) {
                        if (th != null) {
                            try {
                                createWriter.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            createWriter.close();
                        }
                    }
                    throw th4;
                }
            } catch (Exception e) {
                this.tracker.fail(flowFile, e);
                poison();
                throw e;
            } catch (TokenTooLargeException e2) {
                this.tracker.fail(flowFile, e2);
                return;
            } catch (SchemaNotFoundException e3) {
                throw new IOException((Throwable) e3);
            }
        } while (!this.tracker.isFailed(flowFile));
    }

    private void addHeaders(FlowFile flowFile, Map<String, String> map, ProducerRecord<?, ?> producerRecord) {
        if (this.attributeNameRegex == null) {
            return;
        }
        Headers headers = producerRecord.headers();
        for (Map.Entry entry : flowFile.getAttributes().entrySet()) {
            if (this.attributeNameRegex.matcher((CharSequence) entry.getKey()).matches()) {
                headers.add((String) entry.getKey(), ((String) entry.getValue()).getBytes(this.headerCharacterSet));
            }
        }
        for (Map.Entry<String, String> entry2 : map.entrySet()) {
            if (this.attributeNameRegex.matcher(entry2.getKey()).matches()) {
                headers.add(entry2.getKey(), entry2.getValue().getBytes(this.headerCharacterSet));
            }
        }
    }

    protected void publish(FlowFile flowFile, byte[] bArr, byte[] bArr2, String str, InFlightMessageTracker inFlightMessageTracker) {
        publish(flowFile, Collections.emptyMap(), bArr, bArr2, str, inFlightMessageTracker);
    }

    protected void publish(final FlowFile flowFile, Map<String, String> map, byte[] bArr, byte[] bArr2, String str, final InFlightMessageTracker inFlightMessageTracker) {
        ProducerRecord<?, ?> producerRecord = new ProducerRecord<>(str, (Integer) null, bArr, bArr2);
        addHeaders(flowFile, map, producerRecord);
        this.producer.send(producerRecord, new Callback() { // from class: org.apache.nifi.processors.kafka.pubsub.PublisherLease.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc == null) {
                    inFlightMessageTracker.incrementAcknowledgedCount(flowFile);
                } else {
                    inFlightMessageTracker.fail(flowFile, exc);
                    PublisherLease.this.poison();
                }
            }
        });
        this.messagesSent.incrementAndGet();
        inFlightMessageTracker.incrementSentCount(flowFile);
    }

    public PublishResult complete() {
        if (this.tracker == null) {
            if (this.messagesSent.get() == 0) {
                return PublishResult.EMPTY;
            }
            rollback();
            throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
        }
        this.producer.flush();
        if (this.activeTransaction) {
            this.producer.commitTransaction();
            this.activeTransaction = false;
        }
        try {
            this.tracker.awaitCompletion(this.maxAckWaitMillis);
            return this.tracker.createPublishResult();
        } catch (InterruptedException e) {
            this.logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
            Thread.currentThread().interrupt();
            return this.tracker.failOutstanding(e);
        } catch (TimeoutException e2) {
            this.logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
            return this.tracker.failOutstanding(e2);
        } finally {
            this.tracker = null;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.producer.close(this.maxAckWaitMillis, TimeUnit.MILLISECONDS);
        this.tracker = null;
    }

    public InFlightMessageTracker getTracker() {
        if (this.tracker == null) {
            this.tracker = new InFlightMessageTracker(this.logger);
        }
        return this.tracker;
    }
}
