package org.apache.hudi.utilities.sources;

import com.google.protobuf.Message;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;

/* loaded from: input_file:org/apache/hudi/utilities/sources/ProtoKafkaSource.class */
public class ProtoKafkaSource extends KafkaSource<JavaRDD<Message>> {
    private final Option<String> className;
    private final String deserializerName;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/utilities/sources/ProtoKafkaSource$ProtoDeserializer.class */
    public static class ProtoDeserializer implements Serializable {
        private final String className;
        private transient Class protoClass;
        private transient Method parseMethod;

        public ProtoDeserializer(String str) {
            this.className = str;
        }

        public Message parse(byte[] bArr) {
            try {
                return (Message) getParseMethod().invoke(getClass(), bArr);
            } catch (IllegalAccessException | InvocationTargetException e) {
                throw new HoodieReadFromSourceException("Failed to parse proto message from kafka", e);
            }
        }

        private Class getProtoClass() {
            if (this.protoClass == null) {
                this.protoClass = ReflectionUtils.getClass(this.className);
            }
            return this.protoClass;
        }

        private Method getParseMethod() {
            if (this.parseMethod == null) {
                try {
                    this.parseMethod = getProtoClass().getMethod("parseFrom", byte[].class);
                } catch (NoSuchMethodException e) {
                    throw new HoodieReadFromSourceException("Unable to get proto parsing method from specified class: " + this.className, e);
                }
            }
            return this.parseMethod;
        }
    }

    public ProtoKafkaSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieIngestionMetrics hoodieIngestionMetrics) {
        this(typedProperties, javaSparkContext, sparkSession, hoodieIngestionMetrics, new DefaultStreamContext(schemaProvider, Option.empty()));
    }

    public ProtoKafkaSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, HoodieIngestionMetrics hoodieIngestionMetrics, StreamContext streamContext) {
        super(typedProperties, javaSparkContext, sparkSession, Source.SourceType.PROTO, hoodieIngestionMetrics, new DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(streamContext.getSchemaProvider(), typedProperties, javaSparkContext), streamContext.getSourceProfileSupplier()));
        this.deserializerName = ConfigUtils.getStringWithAltKeys(this.props, KafkaSourceConfig.KAFKA_PROTO_VALUE_DESERIALIZER_CLASS, true);
        if (!this.deserializerName.equals(ByteArrayDeserializer.class.getName()) && !this.deserializerName.equals(KafkaProtobufDeserializer.class.getName())) {
            throw new HoodieReadFromSourceException("Only ByteArrayDeserializer and KafkaProtobufDeserializer are supported for ProtoKafkaSource");
        }
        if (this.deserializerName.equals(ByteArrayDeserializer.class.getName())) {
            ConfigUtils.checkRequiredConfigProperties(this.props, Collections.singletonList(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
            this.className = Option.of(ConfigUtils.getStringWithAltKeys(this.props, ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
        } else {
            this.className = Option.empty();
        }
        this.props.put("key.deserializer", StringDeserializer.class.getName());
        this.props.put("value.deserializer", this.deserializerName);
        this.offsetGen = new KafkaOffsetGen(this.props);
        if (this.shouldAddOffsets) {
            throw new HoodieReadFromSourceException("Appending kafka offsets to ProtoKafkaSource is not supported");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.hudi.utilities.sources.KafkaSource
    public JavaRDD<Message> toBatch(OffsetRange[] offsetRangeArr) {
        if (!this.deserializerName.equals(ByteArrayDeserializer.class.getName())) {
            return KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRangeArr, LocationStrategies.PreferConsistent()).map((v0) -> {
                return v0.value();
            });
        }
        ValidationUtils.checkArgument(this.className.isPresent(), ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key() + " config must be present.");
        ProtoDeserializer protoDeserializer = new ProtoDeserializer((String) this.className.get());
        return KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRangeArr, LocationStrategies.PreferConsistent()).map(consumerRecord -> {
            return protoDeserializer.parse((byte[]) consumerRecord.value());
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -77919959:
                if (implMethodName.equals("lambda$toBatch$b26784f3$1")) {
                    z = false;
                    break;
                }
                break;
            case 111972721:
                if (implMethodName.equals("value")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/sources/ProtoKafkaSource") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/utilities/sources/ProtoKafkaSource$ProtoDeserializer;Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Lcom/google/protobuf/Message;")) {
                    ProtoDeserializer protoDeserializer = (ProtoDeserializer) serializedLambda.getCapturedArg(0);
                    return consumerRecord -> {
                        return protoDeserializer.parse((byte[]) consumerRecord.value());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/kafka/clients/consumer/ConsumerRecord") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.value();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
