package org.apache.kafka.clients.producer;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.clients.producer.internals.Partitioner;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.ClientUtils;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/kafka-clients-0.8.2.2.jar:org/apache/kafka/clients/producer/KafkaProducer.class */
public class KafkaProducer<K, V> implements Producer<K, V> {
    private final Partitioner partitioner;
    private final int maxRequestSize;
    private final long metadataFetchTimeoutMs;
    private final long totalMemorySize;
    private final Metadata metadata;
    private final RecordAccumulator accumulator;
    private final Sender sender;
    private final Metrics metrics;
    private final Thread ioThread;
    private final CompressionType compressionType;
    private final Sensor errors;
    private final Time time;
    private final Serializer<K> keySerializer;
    private final Serializer<V> valueSerializer;
    private final ProducerConfig producerConfig;
    private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
    private static final AtomicInteger producerAutoId = new AtomicInteger(1);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/kafka-clients-0.8.2.2.jar:org/apache/kafka/clients/producer/KafkaProducer$FutureFailure.class */
    public static class FutureFailure implements Future<RecordMetadata> {
        private final ExecutionException exception;

        public FutureFailure(Exception exc) {
            this.exception = new ExecutionException(exc);
        }

        @Override // java.util.concurrent.Future
        public boolean cancel(boolean z) {
            return false;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Future
        public RecordMetadata get() throws ExecutionException {
            throw this.exception;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Future
        public RecordMetadata get(long j, TimeUnit timeUnit) throws ExecutionException {
            throw this.exception;
        }

        @Override // java.util.concurrent.Future
        public boolean isCancelled() {
            return false;
        }

        @Override // java.util.concurrent.Future
        public boolean isDone() {
            return true;
        }
    }

    public KafkaProducer(Map<String, Object> map) {
        this(new ProducerConfig(map), (Serializer) null, (Serializer) null);
    }

    public KafkaProducer(Map<String, Object> map, Serializer<K> serializer, Serializer<V> serializer2) {
        this(new ProducerConfig(addSerializerToConfig(map, (Serializer<?>) serializer, (Serializer<?>) serializer2)), serializer, serializer2);
    }

    private static Map<String, Object> addSerializerToConfig(Map<String, Object> map, Serializer<?> serializer, Serializer<?> serializer2) {
        HashMap hashMap = new HashMap();
        hashMap.putAll(map);
        if (serializer != null) {
            hashMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serializer.getClass());
        }
        if (serializer2 != null) {
            hashMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer2.getClass());
        }
        return hashMap;
    }

    public KafkaProducer(Properties properties) {
        this(new ProducerConfig(properties), (Serializer) null, (Serializer) null);
    }

    public KafkaProducer(Properties properties, Serializer<K> serializer, Serializer<V> serializer2) {
        this(new ProducerConfig(addSerializerToConfig(properties, (Serializer<?>) serializer, (Serializer<?>) serializer2)), serializer, serializer2);
    }

    private static Properties addSerializerToConfig(Properties properties, Serializer<?> serializer, Serializer<?> serializer2) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        if (serializer != null) {
            properties2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serializer.getClass().getName());
        }
        if (serializer2 != null) {
            properties2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer2.getClass().getName());
        }
        return properties2;
    }

    private KafkaProducer(ProducerConfig producerConfig, Serializer<K> serializer, Serializer<V> serializer2) {
        log.trace("Starting the Kafka producer");
        this.producerConfig = producerConfig;
        this.time = new SystemTime();
        MetricConfig timeWindow = new MetricConfig().samples(producerConfig.getInt("metrics.num.samples")).timeWindow(producerConfig.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS);
        String string = producerConfig.getString("client.id");
        string = string.length() <= 0 ? "producer-" + producerAutoId.getAndIncrement() : string;
        List configuredInstances = producerConfig.getConfiguredInstances("metric.reporters", MetricsReporter.class);
        configuredInstances.add(new JmxReporter("kafka.producer"));
        this.metrics = new Metrics(timeWindow, configuredInstances, this.time);
        this.partitioner = new Partitioner();
        long j = producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
        this.metadataFetchTimeoutMs = producerConfig.getLong("metadata.fetch.timeout.ms");
        this.metadata = new Metadata(j, producerConfig.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
        this.maxRequestSize = producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
        this.totalMemorySize = producerConfig.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
        this.compressionType = CompressionType.forName(producerConfig.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("client-id", string);
        this.accumulator = new RecordAccumulator(producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, producerConfig.getLong(ProducerConfig.LINGER_MS_CONFIG), j, producerConfig.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), this.metrics, this.time, linkedHashMap);
        this.metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(producerConfig.getList("bootstrap.servers"))), this.time.milliseconds());
        this.sender = new Sender(new NetworkClient(new Selector(this.metrics, this.time, "producer", linkedHashMap), this.metadata, string, producerConfig.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), producerConfig.getLong("reconnect.backoff.ms"), producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG), producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG)), this.metadata, this.accumulator, producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), (short) parseAcks(producerConfig.getString(ProducerConfig.ACKS_CONFIG)), producerConfig.getInt(ProducerConfig.RETRIES_CONFIG), producerConfig.getInt(ProducerConfig.TIMEOUT_CONFIG), this.metrics, new SystemTime(), string);
        this.ioThread = new KafkaThread("kafka-producer-network-thread" + (string.length() > 0 ? " | " + string : ""), this.sender, true);
        this.ioThread.start();
        this.errors = this.metrics.sensor("errors");
        if (serializer == null) {
            this.keySerializer = (Serializer) producerConfig.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
            this.keySerializer.configure(producerConfig.originals(), true);
        } else {
            this.keySerializer = serializer;
        }
        if (serializer2 == null) {
            this.valueSerializer = (Serializer) producerConfig.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
            this.valueSerializer.configure(producerConfig.originals(), false);
        } else {
            this.valueSerializer = serializer2;
        }
        producerConfig.logUnused();
        log.debug("Kafka producer started");
    }

    private static int parseAcks(String str) {
        try {
            if (str.trim().toLowerCase().equals(ChannelPipelineCoverage.ALL)) {
                return -1;
            }
            return Integer.parseInt(str.trim());
        } catch (NumberFormatException e) {
            throw new ConfigException("Invalid configuration value for 'acks': " + str);
        }
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
        return send(producerRecord, null);
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, Callback callback) {
        try {
            waitOnMetadata(producerRecord.topic(), this.metadataFetchTimeoutMs);
            try {
                byte[] serialize = this.keySerializer.serialize(producerRecord.topic(), producerRecord.key());
                try {
                    byte[] serialize2 = this.valueSerializer.serialize(producerRecord.topic(), producerRecord.value());
                    int partition = this.partitioner.partition(new ProducerRecord<>(producerRecord.topic(), producerRecord.partition(), serialize, serialize2), this.metadata.fetch());
                    ensureValidRecordSize(12 + Record.recordSize(serialize, serialize2));
                    TopicPartition topicPartition = new TopicPartition(producerRecord.topic(), partition);
                    log.trace("Sending record {} with callback {} to topic {} partition {}", new Object[]{producerRecord, callback, producerRecord.topic(), Integer.valueOf(partition)});
                    RecordAccumulator.RecordAppendResult append = this.accumulator.append(topicPartition, serialize, serialize2, this.compressionType, callback);
                    if (append.batchIsFull || append.newBatchCreated) {
                        log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", producerRecord.topic(), Integer.valueOf(partition));
                        this.sender.wakeup();
                    }
                    return append.future;
                } catch (ClassCastException e) {
                    throw new SerializationException("Can't convert value of class " + producerRecord.value().getClass().getName() + " to class " + this.producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer");
                }
            } catch (ClassCastException e2) {
                throw new SerializationException("Can't convert key of class " + producerRecord.key().getClass().getName() + " to class " + this.producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer");
            }
        } catch (InterruptedException e3) {
            this.errors.record();
            throw new KafkaException(e3);
        } catch (ApiException e4) {
            log.debug("Exception occurred during message send:", e4);
            if (callback != null) {
                callback.onCompletion(null, e4);
            }
            this.errors.record();
            return new FutureFailure(e4);
        } catch (KafkaException e5) {
            this.errors.record();
            throw e5;
        }
    }

    private void waitOnMetadata(String str, long j) {
        if (this.metadata.fetch().partitionsForTopic(str) != null) {
            return;
        }
        long milliseconds = this.time.milliseconds();
        long j2 = j;
        while (true) {
            long j3 = j2;
            if (this.metadata.fetch().partitionsForTopic(str) != null) {
                return;
            }
            log.trace("Requesting metadata update for topic {}.", str);
            int requestUpdate = this.metadata.requestUpdate();
            this.metadata.add(str);
            this.sender.wakeup();
            this.metadata.awaitUpdate(requestUpdate, j3);
            long milliseconds2 = this.time.milliseconds() - milliseconds;
            if (milliseconds2 >= j) {
                throw new TimeoutException("Failed to update metadata after " + j + " ms.");
            }
            j2 = j - milliseconds2;
        }
    }

    private void ensureValidRecordSize(int i) {
        if (i > this.maxRequestSize) {
            throw new RecordTooLargeException("The message is " + i + " bytes when serialized which is larger than the maximum request size you have configured with the " + ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration.");
        }
        if (i > this.totalMemorySize) {
            throw new RecordTooLargeException("The message is " + i + " bytes when serialized which is larger than the total memory buffer you have configured with the " + ProducerConfig.BUFFER_MEMORY_CONFIG + " configuration.");
        }
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public List<PartitionInfo> partitionsFor(String str) {
        waitOnMetadata(str, this.metadataFetchTimeoutMs);
        return this.metadata.fetch().partitionsForTopic(str);
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    @Override // org.apache.kafka.clients.producer.Producer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        log.trace("Closing the Kafka producer.");
        this.sender.initiateClose();
        try {
            this.ioThread.join();
            this.metrics.close();
            this.keySerializer.close();
            this.valueSerializer.close();
            log.debug("The Kafka producer has closed.");
        } catch (InterruptedException e) {
            throw new KafkaException(e);
        }
    }
}
