package org.apache.flink.statefun.flink.io.kafka.binders.egress.v1;

import com.google.protobuf.InvalidProtocolBufferException;
import java.nio.charset.StandardCharsets;
import org.apache.flink.statefun.flink.common.types.TypedValueUtil;
import org.apache.flink.statefun.sdk.egress.generated.KafkaProducerRecord;
import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:org/apache/flink/statefun/flink/io/kafka/binders/egress/v1/GenericKafkaEgressSerializer.class */
public final class GenericKafkaEgressSerializer implements KafkaEgressSerializer<TypedValue> {
    private static final long serialVersionUID = 1;

    @Override // org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer
    public ProducerRecord<byte[], byte[]> serialize(TypedValue typedValue) {
        return toProducerRecord(asKafkaProducerRecord(typedValue));
    }

    private static KafkaProducerRecord asKafkaProducerRecord(TypedValue typedValue) {
        if (!TypedValueUtil.isProtobufTypeOf(typedValue, KafkaProducerRecord.getDescriptor())) {
            throw new IllegalStateException("The generic Kafka egress expects only messages of type " + KafkaProducerRecord.class.getName());
        }
        try {
            return KafkaProducerRecord.parseFrom(typedValue.getValue());
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException("Unable to unpack message as a " + KafkaProducerRecord.class.getName(), e);
        }
    }

    private static ProducerRecord<byte[], byte[]> toProducerRecord(KafkaProducerRecord kafkaProducerRecord) {
        String key = kafkaProducerRecord.getKey();
        String topic = kafkaProducerRecord.getTopic();
        byte[] byteArray = kafkaProducerRecord.getValueBytes().toByteArray();
        return (key == null || key.isEmpty()) ? new ProducerRecord<>(topic, byteArray) : new ProducerRecord<>(topic, key.getBytes(StandardCharsets.UTF_8), byteArray);
    }
}
