package org.apache.nifi.processors.kafka;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.producer.Partitioner;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.stream.io.util.StreamDemarcator;

/* loaded from: input_file:org/apache/nifi/processors/kafka/KafkaPublisher.class */
class KafkaPublisher implements Closeable {
    private final Producer<byte[], byte[]> kafkaProducer;
    private long ackWaitTime;
    private final ComponentLog componentLog;
    private final Partitioner partitioner;
    private final int ackCheckSize;

    /* loaded from: input_file:org/apache/nifi/processors/kafka/KafkaPublisher$KafkaPublisherResult.class */
    static class KafkaPublisherResult {
        private final int messagesSent;
        private final int lastMessageAcked;

        KafkaPublisherResult(int i, int i2) {
            this.messagesSent = i;
            this.lastMessageAcked = i2;
        }

        public int getMessagesSent() {
            return this.messagesSent;
        }

        public int getLastMessageAcked() {
            return this.lastMessageAcked;
        }

        public boolean isAllAcked() {
            return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked;
        }

        public String toString() {
            return "Sent:" + this.messagesSent + "; Last ACK:" + this.lastMessageAcked;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaPublisher(Properties properties, ComponentLog componentLog) {
        this(properties, 100, componentLog);
    }

    KafkaPublisher(Properties properties, int i, ComponentLog componentLog) {
        this.ackWaitTime = 30000L;
        properties.put("value.serializer", ByteArraySerializer.class.getName());
        properties.put("key.serializer", ByteArraySerializer.class.getName());
        this.kafkaProducer = new KafkaProducer(properties);
        this.ackCheckSize = i;
        try {
            if (properties.containsKey("partitioner.class")) {
                this.partitioner = (Partitioner) Class.forName(properties.getProperty("partitioner.class")).newInstance();
            } else {
                this.partitioner = null;
            }
            this.componentLog = componentLog;
        } catch (Exception e) {
            throw new IllegalStateException("Failed to create partitioner", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaPublisherResult publish(PublishingContext publishingContext) throws IOException {
        byte[] nextToken;
        StreamDemarcator streamDemarcator = new StreamDemarcator(publishingContext.getContentStream(), publishingContext.getDelimiterBytes(), publishingContext.getMaxRequestSize());
        int lastAckedMessageIndex = publishingContext.getLastAckedMessageIndex();
        ArrayList arrayList = new ArrayList();
        int i = 0;
        boolean z = true;
        KafkaPublisherResult kafkaPublisherResult = null;
        while (z && (nextToken = streamDemarcator.nextToken()) != null) {
            if (lastAckedMessageIndex < i) {
                if (publishingContext.getPartitionId() == null && publishingContext.getKeyBytes() != null) {
                    Integer.valueOf(getPartition(publishingContext.getKeyBytes(), publishingContext.getTopic()));
                }
                arrayList.add(this.kafkaProducer.send(new ProducerRecord(publishingContext.getTopic(), publishingContext.getPartitionId(), publishingContext.getKeyBytes(), nextToken)));
                if (i % this.ackCheckSize == 0) {
                    int processAcks = processAcks(arrayList, lastAckedMessageIndex);
                    arrayList.clear();
                    if (processAcks % this.ackCheckSize != 0) {
                        z = false;
                        kafkaPublisherResult = new KafkaPublisherResult(i, processAcks);
                    }
                    lastAckedMessageIndex = processAcks;
                }
            }
            i++;
        }
        if (kafkaPublisherResult == null) {
            int processAcks2 = processAcks(arrayList, lastAckedMessageIndex);
            arrayList.clear();
            kafkaPublisherResult = new KafkaPublisherResult(i, processAcks2);
        }
        return kafkaPublisherResult;
    }

    void setAckWaitTime(long j) {
        this.ackWaitTime = j;
    }

    private int processAcks(List<Future<RecordMetadata>> list, int i) {
        boolean z = false;
        for (int i2 = 0; i2 < list.size() && !z; i2++) {
            try {
                list.get(i2).get(this.ackWaitTime, TimeUnit.MILLISECONDS);
                i++;
            } catch (InterruptedException e) {
                z = true;
                Thread.currentThread().interrupt();
                warnOrError("Interrupted while waiting for acks from Kafka", null);
            } catch (ExecutionException e2) {
                z = true;
                warnOrError("Failed while waiting for acks from Kafka", e2);
            } catch (TimeoutException e3) {
                z = true;
                warnOrError("Timed out while waiting for acks from Kafka", null);
            }
        }
        return i;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.kafkaProducer.close();
    }

    private void warnOrError(String str, Exception exc) {
        if (exc == null) {
            this.componentLog.warn(str);
        } else {
            this.componentLog.error(str);
        }
    }

    private int getPartition(Object obj, String str) {
        if (this.partitioner == null) {
            return 0;
        }
        return this.partitioner.partition(obj, this.kafkaProducer.partitionsFor(str).size());
    }
}
