package org.apache.flink.statefun.flink.io.kafka;

import com.google.protobuf.Message;
import com.google.protobuf.MoreByteStrings;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.flink.statefun.flink.io.generated.AutoRoutable;
import org.apache.flink.statefun.flink.io.generated.RoutingConfig;
import org.apache.flink.statefun.sdk.IngressType;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/* loaded from: input_file:org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaIngressDeserializer.class */
public final class RoutableProtobufKafkaIngressDeserializer implements KafkaIngressDeserializer<Message> {
    private static final long serialVersionUID = 1;
    private final Map<String, RoutingConfig> routingConfigs;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RoutableProtobufKafkaIngressDeserializer(Map<String, RoutingConfig> map) {
        if (map == null || map.isEmpty()) {
            throw new IllegalArgumentException("Routing config for routable Kafka ingress cannot be empty.");
        }
        this.routingConfigs = map;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer
    public Message deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) {
        String str = consumerRecord.topic();
        byte[] value = consumerRecord.value();
        String str2 = new String(requireNonNullKey(consumerRecord.key()), StandardCharsets.UTF_8);
        RoutingConfig routingConfig = this.routingConfigs.get(str);
        if (routingConfig == null) {
            throw new IllegalStateException("Consumed a record from topic [" + str + "], but no routing config was specified.");
        }
        return AutoRoutable.newBuilder().setConfig(routingConfig).setId(str2).setPayloadBytes(MoreByteStrings.wrap(value)).build();
    }

    private byte[] requireNonNullKey(byte[] bArr) {
        if (bArr != null) {
            return bArr;
        }
        IngressType ingressType = ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE;
        throw new IllegalStateException("The " + ingressType.namespace() + "/" + ingressType.type() + " ingress requires a UTF-8 key set for each record.");
    }

    @Override // org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer
    public /* bridge */ /* synthetic */ Message deserialize(ConsumerRecord consumerRecord) {
        return deserialize((ConsumerRecord<byte[], byte[]>) consumerRecord);
    }
}
