package org.apache.flink.statefun.flink.io.kafka;

import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.Parser;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import org.apache.flink.statefun.flink.common.ResourceLocator;
import org.apache.flink.statefun.flink.common.protobuf.ProtobufDescriptorMap;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/* loaded from: input_file:org/apache/flink/statefun/flink/io/kafka/ProtobufKafkaIngressDeserializer.class */
final class ProtobufKafkaIngressDeserializer implements KafkaIngressDeserializer<Message> {
    private static final long serialVersionUID = 1;
    private final String descriptorSetPath;
    private final String messageType;
    private transient Parser<? extends Message> parser;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProtobufKafkaIngressDeserializer(String str, String str2) {
        this.descriptorSetPath = (String) Objects.requireNonNull(str);
        this.messageType = (String) Objects.requireNonNull(str2);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer
    public Message deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) {
        try {
            return parser().parseFrom(consumerRecord.value());
        } catch (InvalidProtocolBufferException e) {
            throw new IllegalStateException(e);
        }
    }

    private Parser<? extends Message> parser() {
        if (this.parser != null) {
            return this.parser;
        }
        Optional<Descriptors.GenericDescriptor> descriptorByName = protobufDescriptorMap(this.descriptorSetPath).getDescriptorByName(this.messageType);
        if (!descriptorByName.isPresent()) {
            throw new IllegalStateException("Unable to read the descriptor set locate at  " + this.descriptorSetPath);
        }
        Parser<DynamicMessage> parserForType = DynamicMessage.getDefaultInstance((Descriptors.Descriptor) descriptorByName.get()).getParserForType();
        this.parser = parserForType;
        return parserForType;
    }

    private static ProtobufDescriptorMap protobufDescriptorMap(String str) {
        try {
            return ProtobufDescriptorMap.from(ResourceLocator.findNamedResource(str));
        } catch (IOException e) {
            throw new IllegalStateException("Error while processing an ingress definition. Unable to read the descriptor set at  " + str, e);
        }
    }

    @Override // org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer
    public /* bridge */ /* synthetic */ Message deserialize(ConsumerRecord consumerRecord) {
        return deserialize((ConsumerRecord<byte[], byte[]>) consumerRecord);
    }
}
