package org.apache.pulsar.client.kafka.compat;

import java.util.Base64;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.ByteBufferSerializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.DoubleSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerInterceptor;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.org.apache.commons.lang3.reflect.FieldUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.class */
public class KafkaProducerInterceptorWrapper<K, V> implements ProducerInterceptor<byte[]> {
    private static final Logger log = LoggerFactory.getLogger(KafkaProducerInterceptorWrapper.class);
    private final org.apache.kafka.clients.producer.ProducerInterceptor<K, V> kafkaProducerInterceptor;
    private final Schema<K> keySchema;
    private final Schema<V> valueSchema;
    private final String topic;
    private Schema<byte[]> scheme;
    private long eventTime;
    private String partitionID;

    public KafkaProducerInterceptorWrapper(org.apache.kafka.clients.producer.ProducerInterceptor<K, V> producerInterceptor, Schema<K> schema, Schema<V> schema2, String str) {
        this.kafkaProducerInterceptor = producerInterceptor;
        this.keySchema = schema;
        this.valueSchema = schema2;
        this.topic = str;
    }

    public void close() {
        this.kafkaProducerInterceptor.close();
    }

    public Message<byte[]> beforeSend(Producer<byte[]> producer, Message<byte[]> message) {
        return toPulsarMessage(this.kafkaProducerInterceptor.onSend(toKafkaRecord(message)));
    }

    public void onSendAcknowledgement(Producer<byte[]> producer, Message<byte[]> message, MessageId messageId, Throwable th) {
        try {
            PulsarApi.MessageMetadata.Builder messageBuilder = ((MessageImpl) message).getMessageBuilder();
            this.partitionID = getPartitionID(messageBuilder);
            this.kafkaProducerInterceptor.onAcknowledgement(new RecordMetadata(new TopicPartition(this.topic, Integer.parseInt(this.partitionID)), -1L, -1L, messageBuilder.getEventTime(), -1L, message.getKeyBytes().length, ((byte[]) message.getValue()).length), new Exception(th));
        } catch (NumberFormatException e) {
            String str = "Unable to convert partitionID to integer: " + e.getMessage();
            log.error(str);
            throw new RuntimeException(str);
        }
    }

    private Message<byte[]> toPulsarMessage(ProducerRecord<K, V> producerRecord) {
        TypedMessageBuilderImpl typedMessageBuilderImpl = new TypedMessageBuilderImpl(null, this.scheme);
        typedMessageBuilderImpl.key(serializeKey(this.topic, producerRecord.key()));
        if (this.valueSchema instanceof PulsarKafkaSchema) {
            ((PulsarKafkaSchema) this.valueSchema).setTopic(this.topic);
        }
        typedMessageBuilderImpl.value(this.valueSchema.encode(producerRecord.value()));
        typedMessageBuilderImpl.eventTime(this.eventTime);
        typedMessageBuilderImpl.property(KafkaMessageRouter.PARTITION_ID, this.partitionID);
        return typedMessageBuilderImpl.getMessage();
    }

    private ProducerRecord<K, V> toKafkaRecord(Message<byte[]> message) {
        Object deserialize = this.valueSchema instanceof PulsarKafkaSchema ? getDeserializer(((PulsarKafkaSchema) this.valueSchema).getKafkaSerializer()).deserialize(this.topic, (byte[]) message.getValue()) : this.valueSchema.decode((byte[]) message.getValue());
        try {
            this.scheme = (Schema) FieldUtils.readField((Object) message, "schema", true);
            this.partitionID = getPartitionID(((MessageImpl) message).getMessageBuilder());
            this.eventTime = message.getEventTime();
            return new ProducerRecord<>(this.topic, Integer.valueOf(Integer.parseInt(this.partitionID)), Long.valueOf(this.eventTime), deserializeKey(this.topic, message.getKey()), deserialize);
        } catch (IllegalAccessException e) {
            String str = "Unable to get the schema of message due to " + e.getMessage();
            log.error(str);
            throw new RuntimeException(str);
        } catch (NumberFormatException e2) {
            return new ProducerRecord<>(this.topic, deserializeKey(this.topic, message.getKey()), deserialize);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private String serializeKey(String str, K k) {
        if (k instanceof String) {
            return (String) k;
        }
        if (this.keySchema instanceof PulsarKafkaSchema) {
            ((PulsarKafkaSchema) this.keySchema).setTopic(str);
        }
        return Base64.getEncoder().encodeToString(this.keySchema.encode(k));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private K deserializeKey(String str, String str2) {
        if (!(this.keySchema instanceof PulsarKafkaSchema)) {
            return (K) this.keySchema.decode(Base64.getDecoder().decode(str2));
        }
        PulsarKafkaSchema pulsarKafkaSchema = (PulsarKafkaSchema) this.keySchema;
        return pulsarKafkaSchema.getKafkaSerializer() instanceof StringSerializer ? str2 : (K) getDeserializer(pulsarKafkaSchema.getKafkaSerializer()).deserialize(str, Base64.getDecoder().decode(str2));
    }

    private String getPartitionID(PulsarApi.MessageMetadata.Builder builder) {
        return builder.getPropertiesList().stream().filter(keyValue -> {
            return keyValue.getKey().equals(KafkaMessageRouter.PARTITION_ID);
        }).findFirst().get().getValue();
    }

    static Deserializer getDeserializer(Serializer serializer) {
        if (serializer instanceof StringSerializer) {
            return new StringDeserializer();
        }
        if (serializer instanceof LongSerializer) {
            return new LongDeserializer();
        }
        if (serializer instanceof IntegerSerializer) {
            return new IntegerDeserializer();
        }
        if (serializer instanceof DoubleSerializer) {
            return new DoubleDeserializer();
        }
        if (serializer instanceof BytesSerializer) {
            return new BytesDeserializer();
        }
        if (serializer instanceof ByteBufferSerializer) {
            return new ByteBufferDeserializer();
        }
        if (serializer instanceof ByteArraySerializer) {
            return new ByteArrayDeserializer();
        }
        throw new IllegalArgumentException(serializer.getClass().getName() + " is not a valid or supported subclass of org.apache.kafka.common.serialization.Serializer.");
    }
}
