package org.apache.nifi.processors.kafka.pubsub;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.stream.io.util.StreamDemarcator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.class */
class KafkaPublisher implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class);
    private final Producer<byte[], byte[]> kafkaProducer;
    private volatile long ackWaitTime;
    private volatile ProcessorLog processLog;
    private final int ackCheckSize;

    /* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/KafkaPublisher$KafkaPublisherResult.class */
    static class KafkaPublisherResult {
        private final int messagesSent;
        private final int lastMessageAcked;

        KafkaPublisherResult(int i, int i2) {
            this.messagesSent = i;
            this.lastMessageAcked = i2;
        }

        public int getMessagesSent() {
            return this.messagesSent;
        }

        public int getLastMessageAcked() {
            return this.lastMessageAcked;
        }

        public boolean isAllAcked() {
            return this.messagesSent - 1 == this.lastMessageAcked;
        }

        public String toString() {
            return "Sent:" + this.messagesSent + "; Last ACK:" + this.lastMessageAcked;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaPublisher(Properties properties) {
        this(properties, 100);
    }

    KafkaPublisher(Properties properties, int i) {
        this.ackWaitTime = 30000L;
        this.kafkaProducer = new KafkaProducer(properties);
        this.ackCheckSize = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaPublisherResult publish(PublishingContext publishingContext) {
        byte[] nextToken;
        StreamDemarcator streamDemarcator = new StreamDemarcator(publishingContext.getContentStream(), publishingContext.getDelimiterBytes(), publishingContext.getMaxRequestSize());
        int lastAckedMessageIndex = publishingContext.getLastAckedMessageIndex();
        ArrayList arrayList = new ArrayList();
        int i = 0;
        boolean z = true;
        KafkaPublisherResult kafkaPublisherResult = null;
        while (z && (nextToken = streamDemarcator.nextToken()) != null) {
            if (lastAckedMessageIndex < i) {
                arrayList.add(this.kafkaProducer.send(new ProducerRecord(publishingContext.getTopic(), publishingContext.getKeyBytes(), nextToken)));
                if (i % this.ackCheckSize == 0) {
                    int processAcks = processAcks(arrayList, lastAckedMessageIndex);
                    arrayList.clear();
                    if (processAcks % this.ackCheckSize != 0) {
                        z = false;
                        kafkaPublisherResult = new KafkaPublisherResult(i, processAcks);
                    }
                    lastAckedMessageIndex = processAcks;
                }
            }
            i++;
        }
        if (kafkaPublisherResult == null) {
            int processAcks2 = processAcks(arrayList, lastAckedMessageIndex);
            arrayList.clear();
            kafkaPublisherResult = new KafkaPublisherResult(i, processAcks2);
        }
        return kafkaPublisherResult;
    }

    void setAckWaitTime(long j) {
        this.ackWaitTime = j;
    }

    private int processAcks(List<Future<RecordMetadata>> list, int i) {
        boolean z = false;
        for (int i2 = 0; i2 < list.size() && !z; i2++) {
            try {
                list.get(i2).get(this.ackWaitTime, TimeUnit.MILLISECONDS);
                i++;
            } catch (InterruptedException e) {
                z = true;
                Thread.currentThread().interrupt();
                warnOrError("Interrupted while waiting for acks from Kafka", null);
            } catch (ExecutionException e2) {
                z = true;
                warnOrError("Failed while waiting for acks from Kafka", e2);
            } catch (TimeoutException e3) {
                z = true;
                warnOrError("Timed out while waiting for acks from Kafka", null);
            }
        }
        return i;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.kafkaProducer.close(this.ackWaitTime, TimeUnit.MILLISECONDS);
    }

    void setProcessLog(ProcessorLog processorLog) {
        this.processLog = processorLog;
    }

    private void warnOrError(String str, Exception exc) {
        if (exc == null) {
            logger.warn(str);
            if (this.processLog != null) {
                this.processLog.warn(str);
                return;
            }
            return;
        }
        logger.error(str, exc);
        if (this.processLog != null) {
            this.processLog.error(str, exc);
        }
    }
}
