package org.apache.flink.connector.pulsar.source.reader.deserializer;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
import org.apache.pulsar.common.naming.TopicName;

@Internal
/* loaded from: input_file:org/apache/flink/connector/pulsar/source/reader/deserializer/GenericRecordDeserializationSchema.class */
public class GenericRecordDeserializationSchema<T> implements PulsarDeserializationSchema<T> {
    private static final long serialVersionUID = 1133225716807307498L;
    private transient PulsarClientImpl client;
    private transient Map<String, AutoConsumeSchema> schemaMap;
    private final GenericRecordDeserializer<T> deserializer;

    public GenericRecordDeserializationSchema(GenericRecordDeserializer<T> genericRecordDeserializer) {
        this.deserializer = genericRecordDeserializer;
    }

    @Override // org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
    public void deserialize(Message<byte[]> message, Collector<T> collector) throws Exception {
        collector.collect(this.deserializer.deserialize(getSchema(message).decode(message.getData(), message.getSchemaVersion())));
    }

    public TypeInformation<T> getProducedType() {
        return this.deserializer.getProducedType();
    }

    @Override // org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
    public void open(PulsarDeserializationSchema.PulsarInitializationContext pulsarInitializationContext, SourceConfiguration sourceConfiguration) throws Exception {
        this.client = pulsarInitializationContext.getPulsarClient();
        this.schemaMap = new HashMap();
    }

    private AutoConsumeSchema getSchema(Message<byte[]> message) {
        String topicName = message.getTopicName();
        AutoConsumeSchema autoConsumeSchema = this.schemaMap.get(topicName);
        if (autoConsumeSchema != null) {
            return autoConsumeSchema;
        }
        AutoConsumeSchema autoConsumeSchema2 = new AutoConsumeSchema();
        autoConsumeSchema2.setSchemaInfoProvider(new MultiVersionSchemaInfoProvider(TopicName.get(topicName), this.client));
        this.schemaMap.put(topicName, autoConsumeSchema2);
        return autoConsumeSchema2;
    }
}
