package org.apache.hudi.utilities.sources;

import java.lang.invoke.SerializedLambda;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/sources/AvroKafkaSource.class */
public class AvroKafkaSource extends KafkaSource<JavaRDD<GenericRecord>> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AvroKafkaSource.class);
    public static final String KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX = "hoodie.streamer.source.kafka.value.deserializer.";

    @Deprecated
    public static final String OLD_KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX = "hoodie.deltastreamer.source.kafka.value.deserializer.";

    @Deprecated
    public static final String KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA = "hoodie.deltastreamer.source.kafka.value.deserializer.schema";
    private final String deserializerClassName;
    protected final SchemaProvider originalSchemaProvider;

    public AvroKafkaSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieIngestionMetrics hoodieIngestionMetrics) {
        this(typedProperties, javaSparkContext, sparkSession, hoodieIngestionMetrics, new DefaultStreamContext(schemaProvider, Option.empty()));
    }

    public AvroKafkaSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, HoodieIngestionMetrics hoodieIngestionMetrics, StreamContext streamContext) {
        super(typedProperties, javaSparkContext, sparkSession, Source.SourceType.AVRO, hoodieIngestionMetrics, new DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(streamContext.getSchemaProvider(), typedProperties, javaSparkContext), streamContext.getSourceProfileSupplier()));
        this.originalSchemaProvider = streamContext.getSchemaProvider();
        this.props.put("key.deserializer", StringDeserializer.class.getName());
        this.deserializerClassName = ConfigUtils.getStringWithAltKeys(this.props, KafkaSourceConfig.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS, true);
        try {
            this.props.put("value.deserializer", Class.forName(this.deserializerClassName).getName());
            if (this.deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
                configureSchemaDeserializer();
            }
            this.offsetGen = new KafkaOffsetGen(this.props);
        } catch (ClassNotFoundException e) {
            String str = "Could not load custom avro kafka deserializer: " + this.deserializerClassName;
            LOG.error(str);
            throw new HoodieReadFromSourceException(str, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hudi.utilities.sources.KafkaSource, org.apache.hudi.utilities.sources.Source
    public InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> option, long j) {
        if (this.deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
            configureSchemaDeserializer();
            this.offsetGen = new KafkaOffsetGen(this.props);
        }
        return super.fetchNewData(option, j);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.hudi.utilities.sources.KafkaSource
    public JavaRDD<GenericRecord> toBatch(OffsetRange[] offsetRangeArr) {
        JavaRDD createRDD;
        if (!this.deserializerClassName.equals(ByteArrayDeserializer.class.getName())) {
            createRDD = KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRangeArr, LocationStrategies.PreferConsistent());
        } else {
            if (this.schemaProvider == null) {
                throw new HoodieReadFromSourceException("Please provide a valid schema provider class when use ByteArrayDeserializer!");
            }
            AvroConvertor avroConvertor = new AvroConvertor(this.originalSchemaProvider.getSourceSchema());
            createRDD = KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRangeArr, LocationStrategies.PreferConsistent()).filter(consumerRecord -> {
                return Boolean.valueOf(consumerRecord.value() != null);
            }).map(consumerRecord2 -> {
                return new ConsumerRecord(consumerRecord2.topic(), consumerRecord2.partition(), consumerRecord2.offset(), consumerRecord2.key(), avroConvertor.fromAvroBinary((byte[]) consumerRecord2.value()));
            });
        }
        return maybeAppendKafkaOffsets(createRDD.filter(consumerRecord3 -> {
            return Boolean.valueOf(consumerRecord3.value() != null);
        }));
    }

    protected JavaRDD<GenericRecord> maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> javaRDD) {
        if (!this.shouldAddOffsets) {
            return javaRDD.map(consumerRecord -> {
                return (GenericRecord) consumerRecord.value();
            });
        }
        AvroConvertor avroConvertor = new AvroConvertor(this.schemaProvider.getSourceSchema());
        avroConvertor.getClass();
        return javaRDD.map(avroConvertor::withKafkaFieldsAppended);
    }

    private void configureSchemaDeserializer() {
        if (this.schemaProvider == null) {
            throw new HoodieReadFromSourceException("SchemaProvider has to be set to use KafkaAvroSchemaDeserializer");
        }
        this.props.put(KafkaSourceConfig.KAFKA_VALUE_DESERIALIZER_SCHEMA.key(), this.schemaProvider.getSourceSchema().toString());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -898251594:
                if (implMethodName.equals("lambda$toBatch$29bf7102$1")) {
                    z = false;
                    break;
                }
                break;
            case -181934180:
                if (implMethodName.equals("lambda$maybeAppendKafkaOffsets$7d5df389$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1847280946:
                if (implMethodName.equals("withKafkaFieldsAppended")) {
                    z = 3;
                    break;
                }
                break;
            case 2110303935:
                if (implMethodName.equals("lambda$toBatch$3b364b3c$1")) {
                    z = true;
                    break;
                }
                break;
            case 2110303936:
                if (implMethodName.equals("lambda$toBatch$3b364b3c$2")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/sources/AvroKafkaSource") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/utilities/sources/helpers/AvroConvertor;Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Lorg/apache/kafka/clients/consumer/ConsumerRecord;")) {
                    AvroConvertor avroConvertor = (AvroConvertor) serializedLambda.getCapturedArg(0);
                    return consumerRecord2 -> {
                        return new ConsumerRecord(consumerRecord2.topic(), consumerRecord2.partition(), consumerRecord2.offset(), consumerRecord2.key(), avroConvertor.fromAvroBinary((byte[]) consumerRecord2.value()));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/sources/AvroKafkaSource") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/lang/Boolean;")) {
                    return consumerRecord -> {
                        return Boolean.valueOf(consumerRecord.value() != null);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/sources/AvroKafkaSource") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/lang/Boolean;")) {
                    return consumerRecord3 -> {
                        return Boolean.valueOf(consumerRecord3.value() != null);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/sources/helpers/AvroConvertor") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Lorg/apache/avro/generic/GenericRecord;")) {
                    AvroConvertor avroConvertor2 = (AvroConvertor) serializedLambda.getCapturedArg(0);
                    return avroConvertor2::withKafkaFieldsAppended;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/sources/AvroKafkaSource") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Lorg/apache/avro/generic/GenericRecord;")) {
                    return consumerRecord4 -> {
                        return (GenericRecord) consumerRecord4.value();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
