package org.apache.kafka.streams.processor.internals;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/RecordCollectorImpl.class */
public class RecordCollectorImpl implements RecordCollector {
    private static final int MAX_SEND_ATTEMPTS = 3;
    private static final long SEND_RETRY_BACKOFF = 100;
    private static final Logger log = LoggerFactory.getLogger(RecordCollectorImpl.class);
    private final Producer<byte[], byte[]> producer;
    private final Map<TopicPartition, Long> offsets = new HashMap();
    private final String logPrefix;
    private volatile Exception sendException;

    public RecordCollectorImpl(Producer<byte[], byte[]> producer, String str) {
        this.producer = producer;
        this.logPrefix = String.format("task [%s]", str);
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public <K, V> void send(String str, K k, V v, Integer num, Long l, Serializer<K> serializer, Serializer<V> serializer2) {
        send(str, k, v, num, l, serializer, serializer2, null);
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public <K, V> void send(final String str, K k, V v, Integer num, Long l, Serializer<K> serializer, Serializer<V> serializer2, StreamPartitioner<? super K, ? super V> streamPartitioner) {
        List partitionsFor;
        checkForException();
        byte[] serialize = serializer.serialize(str, k);
        byte[] serialize2 = serializer2.serialize(str, v);
        if (num == null && streamPartitioner != null && (partitionsFor = this.producer.partitionsFor(str)) != null && partitionsFor.size() > 0) {
            num = streamPartitioner.partition(k, v, partitionsFor.size());
        }
        ProducerRecord producerRecord = new ProducerRecord(str, num, l, serialize, serialize2);
        for (int i = 1; i <= MAX_SEND_ATTEMPTS; i++) {
            try {
                this.producer.send(producerRecord, new Callback() { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorImpl.1
                    public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                        if (exc != null) {
                            RecordCollectorImpl.this.sendException = exc;
                            RecordCollectorImpl.log.error("{} Error sending record to topic {}. No more offsets will be recorded for this task and the exception will eventually be thrown", new Object[]{RecordCollectorImpl.this.logPrefix, str, exc});
                        } else {
                            if (RecordCollectorImpl.this.sendException != null) {
                                return;
                            }
                            RecordCollectorImpl.this.offsets.put(new TopicPartition(recordMetadata.topic(), recordMetadata.partition()), Long.valueOf(recordMetadata.offset()));
                        }
                    }
                });
                return;
            } catch (TimeoutException e) {
                if (i == MAX_SEND_ATTEMPTS) {
                    throw new StreamsException(String.format("%s Failed to send record to topic %s after %d attempts", this.logPrefix, str, Integer.valueOf(i)));
                }
                log.warn("{} Timeout exception caught when sending record to topic {} attempt {}", new Object[]{this.logPrefix, str, Integer.valueOf(i)});
                Utils.sleep(SEND_RETRY_BACKOFF);
            }
        }
    }

    private void checkForException() {
        if (this.sendException != null) {
            throw new StreamsException(String.format("%s exception caught when producing", this.logPrefix), this.sendException);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public void flush() {
        log.debug("{} Flushing producer", this.logPrefix);
        this.producer.flush();
        checkForException();
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public void close() {
        this.producer.close();
        checkForException();
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public Map<TopicPartition, Long> offsets() {
        return this.offsets;
    }
}
