package org.apache.nifi.processors.kafka.pubsub;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
import org.apache.nifi.stream.io.util.StreamDemarcator;

/* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/PublisherLease.class */
public class PublisherLease implements Closeable {
    private final ComponentLog logger;
    private final Producer<byte[], byte[]> producer;
    private final int maxMessageSize;
    private final long maxAckWaitMillis;
    private volatile boolean poisoned = false;
    private InFlightMessageTracker tracker;

    public PublisherLease(Producer<byte[], byte[]> producer, int i, long j, ComponentLog componentLog) {
        this.producer = producer;
        this.maxMessageSize = i;
        this.logger = componentLog;
        this.maxAckWaitMillis = j;
    }

    protected void poison() {
        this.poisoned = true;
    }

    public boolean isPoisoned() {
        return this.poisoned;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void publish(FlowFile flowFile, InputStream inputStream, byte[] bArr, byte[] bArr2, String str) throws IOException {
        byte[] nextToken;
        if (this.tracker == null) {
            this.tracker = new InFlightMessageTracker();
        }
        try {
            StreamDemarcator streamDemarcator = new StreamDemarcator(inputStream, bArr2, this.maxMessageSize);
            Throwable th = null;
            do {
                try {
                    try {
                        try {
                            nextToken = streamDemarcator.nextToken();
                        } catch (Throwable th2) {
                            th = th2;
                            throw th2;
                        }
                    } finally {
                    }
                } catch (TokenTooLargeException e) {
                    this.tracker.fail(flowFile, e);
                }
                if (nextToken == null) {
                    if (streamDemarcator != null) {
                        if (0 != 0) {
                            try {
                                streamDemarcator.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            streamDemarcator.close();
                        }
                    }
                    return;
                }
                publish(flowFile, bArr2 == null ? bArr : null, nextToken, str, this.tracker);
            } while (!this.tracker.isFailed(flowFile));
            if (streamDemarcator != null) {
                if (0 == 0) {
                    streamDemarcator.close();
                    return;
                }
                try {
                    streamDemarcator.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            }
        } catch (Exception e2) {
            this.tracker.fail(flowFile, e2);
            poison();
            throw e2;
        }
    }

    private void publish(final FlowFile flowFile, byte[] bArr, byte[] bArr2, String str, final InFlightMessageTracker inFlightMessageTracker) {
        this.producer.send(new ProducerRecord(str, (Integer) null, bArr, bArr2), new Callback() { // from class: org.apache.nifi.processors.kafka.pubsub.PublisherLease.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc == null) {
                    inFlightMessageTracker.incrementAcknowledgedCount(flowFile);
                } else {
                    inFlightMessageTracker.fail(flowFile, exc);
                    PublisherLease.this.poison();
                }
            }
        });
        inFlightMessageTracker.incrementSentCount(flowFile);
    }

    public PublishResult complete() {
        if (this.tracker == null) {
            throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
        }
        this.producer.flush();
        try {
            this.tracker.awaitCompletion(this.maxAckWaitMillis);
            return this.tracker.createPublishResult();
        } catch (InterruptedException e) {
            this.logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
            Thread.currentThread().interrupt();
            return this.tracker.failOutstanding(e);
        } catch (TimeoutException e2) {
            this.logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
            return this.tracker.failOutstanding(e2);
        } finally {
            this.tracker = null;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.producer.close(this.maxAckWaitMillis, TimeUnit.MILLISECONDS);
        this.tracker = null;
    }
}
