package org.apache.kafka.clients.producer;

import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
import org.apache.pulsar.shade.org.tukaani.xz.common.Util;

/* loaded from: input_file:org/apache/kafka/clients/producer/KafkaProducer.class */
public class KafkaProducer<K, V> implements Producer<K, V> {
    private final PulsarClient client;
    private final ProducerBuilder<byte[]> pulsarProducerBuilder;
    private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer<byte[]>> producers;
    private final Serializer<K> keySerializer;
    private final Serializer<V> valueSerializer;
    private final ConcurrentMap<String, CompletableFuture<MessageId>> lastSendFuture;

    public KafkaProducer(Map<String, Object> map) {
        this(map, (Serializer) null, (Serializer) null);
    }

    public KafkaProducer(Map<String, Object> map, Serializer<K> serializer, Serializer<V> serializer2) {
        this(map, new Properties(), serializer, serializer2);
    }

    public KafkaProducer(Properties properties) {
        this(properties, (Serializer) null, (Serializer) null);
    }

    public KafkaProducer(Properties properties, Serializer<K> serializer, Serializer<V> serializer2) {
        this(new HashMap(), properties, serializer, serializer2);
    }

    private KafkaProducer(Map<String, Object> map, Properties properties, Serializer<K> serializer, Serializer<V> serializer2) {
        this.producers = new ConcurrentHashMap();
        this.lastSendFuture = new ConcurrentHashMap();
        properties.forEach((obj, obj2) -> {
            map.put((String) obj, obj2);
        });
        ProducerConfig producerConfig = new ProducerConfig(map);
        if (serializer == null) {
            this.keySerializer = (Serializer) producerConfig.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
            this.keySerializer.configure(producerConfig.originals(), true);
        } else {
            this.keySerializer = serializer;
            producerConfig.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
        }
        if (serializer2 == null) {
            this.valueSerializer = (Serializer) producerConfig.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
            this.valueSerializer.configure(producerConfig.originals(), true);
        } else {
            this.valueSerializer = serializer2;
            producerConfig.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
        }
        try {
            this.client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(producerConfig.getList("bootstrap.servers").get(0)).build();
            this.pulsarProducerBuilder = PulsarProducerKafkaConfig.getProducerBuilder(this.client, properties);
            this.pulsarProducerBuilder.batchingMaxPublishDelay(Long.parseLong(properties.getProperty(ProducerConfig.LINGER_MS_CONFIG, "1")), TimeUnit.MILLISECONDS);
            String property = properties.getProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG);
            if ("gzip".equals(property)) {
                this.pulsarProducerBuilder.compressionType(CompressionType.ZLIB);
            } else if ("lz4".equals(property)) {
                this.pulsarProducerBuilder.compressionType(CompressionType.LZ4);
            }
            int parseInt = Integer.parseInt(properties.getProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000"));
            this.pulsarProducerBuilder.sendTimeout(parseInt, TimeUnit.MILLISECONDS);
            this.pulsarProducerBuilder.blockIfQueueFull(parseInt > 0 || Boolean.parseBoolean(properties.getProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")));
        } catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
        return send(producerRecord, null);
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, Callback callback) {
        try {
            TypedMessageBuilder<byte[]> newMessage = this.producers.computeIfAbsent(producerRecord.topic(), str -> {
                return createNewProducer(str);
            }).newMessage();
            int buildMessage = buildMessage(newMessage, producerRecord);
            CompletableFuture completableFuture = new CompletableFuture();
            CompletableFuture<MessageId> sendAsync = newMessage.sendAsync();
            this.lastSendFuture.put(producerRecord.topic(), sendAsync);
            sendAsync.thenAccept(messageId -> {
                completableFuture.complete(getRecordMetadata(producerRecord.topic(), newMessage, messageId, buildMessage));
            }).exceptionally(th -> {
                completableFuture.completeExceptionally(th);
                return null;
            });
            completableFuture.handle((recordMetadata, th2) -> {
                if (callback == null) {
                    return null;
                }
                callback.onCompletion(recordMetadata, th2 != null ? new Exception(th2) : null);
                return null;
            });
            return completableFuture;
        } catch (Exception e) {
            if (callback != null) {
                callback.onCompletion(null, e);
            }
            CompletableFuture completableFuture2 = new CompletableFuture();
            completableFuture2.completeExceptionally(e);
            return completableFuture2;
        }
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public void flush() {
        this.lastSendFuture.forEach((str, completableFuture) -> {
            try {
                completableFuture.get();
                this.lastSendFuture.remove(str, completableFuture);
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public List<PartitionInfo> partitionsFor(String str) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.emptyMap();
    }

    @Override // org.apache.kafka.clients.producer.Producer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        close(Util.VLI_MAX, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public void close(long j, TimeUnit timeUnit) {
        try {
            this.client.closeAsync().get(j, timeUnit);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String str) {
        try {
            return this.pulsarProducerBuilder.m591clone().topic(str).create();
        } catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }

    private int buildMessage(TypedMessageBuilder<byte[]> typedMessageBuilder, ProducerRecord<K, V> producerRecord) {
        if (producerRecord.partition() != null) {
            throw new UnsupportedOperationException("");
        }
        if (producerRecord.key() != null) {
            typedMessageBuilder.key(getKey(producerRecord.topic(), producerRecord.key()));
        }
        if (producerRecord.timestamp() != null) {
            typedMessageBuilder.eventTime(producerRecord.timestamp().longValue());
        }
        byte[] serialize = this.valueSerializer.serialize(producerRecord.topic(), producerRecord.value());
        typedMessageBuilder.value(serialize);
        return serialize.length;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private String getKey(String str, K k) {
        if (this.keySerializer instanceof StringSerializer) {
            return (String) k;
        }
        return Base64.getEncoder().encodeToString(this.keySerializer.serialize(str, k));
    }

    private RecordMetadata getRecordMetadata(String str, TypedMessageBuilder<byte[]> typedMessageBuilder, MessageId messageId, int i) {
        MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
        long offset = MessageIdUtils.getOffset(messageIdImpl);
        TopicPartition topicPartition = new TopicPartition(str, messageIdImpl.getPartitionIndex());
        TypedMessageBuilderImpl typedMessageBuilderImpl = (TypedMessageBuilderImpl) typedMessageBuilder;
        return new RecordMetadata(topicPartition, offset, 0L, typedMessageBuilderImpl.getPublishTime(), 0L, typedMessageBuilderImpl.hasKey() ? typedMessageBuilderImpl.getKey().length() : 0, i);
    }
}
