package org.apache.hudi.utilities.sources.debezium;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.debezium.DebeziumConstants;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;

/* loaded from: input_file:org/apache/hudi/utilities/sources/debezium/DebeziumSource.class */
public abstract class DebeziumSource extends RowSource {
    private static final Logger LOG = LogManager.getLogger(DebeziumSource.class);
    private static final String NATIVE_KAFKA_KEY_DESERIALIZER_PROP = "key.deserializer";
    private static final String NATIVE_KAFKA_VALUE_DESERIALIZER_PROP = "value.deserializer";
    private static final String OVERRIDE_CHECKPOINT_STRING = "hoodie.debezium.override.initial.checkpoint.key";
    private static final String CONNECT_NAME_KEY = "connect.name";
    private static final String DATE_CONNECT_NAME = "custom.debezium.DateString";
    private final KafkaOffsetGen offsetGen;
    private final HoodieDeltaStreamerMetrics metrics;
    private final SchemaRegistryProvider schemaRegistryProvider;
    private final String deserializerClassName;

    public DebeziumSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics hoodieDeltaStreamerMetrics) {
        super(typedProperties, javaSparkContext, sparkSession, schemaProvider);
        typedProperties.put("key.deserializer", StringDeserializer.class.getName());
        this.deserializerClassName = typedProperties.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(), (String) DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
        try {
            typedProperties.put("value.deserializer", Class.forName(this.deserializerClassName).getName());
            if (schemaProvider == null || !(schemaProvider instanceof SchemaRegistryProvider)) {
                this.schemaRegistryProvider = new SchemaRegistryProvider(typedProperties, javaSparkContext);
            } else {
                this.schemaRegistryProvider = (SchemaRegistryProvider) schemaProvider;
            }
            this.offsetGen = new KafkaOffsetGen(typedProperties);
            this.metrics = hoodieDeltaStreamerMetrics;
        } catch (ClassNotFoundException e) {
            String str = "Could not load custom avro kafka deserializer: " + this.deserializerClassName;
            LOG.error(str);
            throw new HoodieException(str, e);
        }
    }

    @Override // org.apache.hudi.utilities.sources.RowSource
    protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> option, long j) {
        String string = this.props.getString(OVERRIDE_CHECKPOINT_STRING, "");
        OffsetRange[] nextOffsetRanges = this.offsetGen.getNextOffsetRanges(option, j, this.metrics);
        long j2 = KafkaOffsetGen.CheckpointUtils.totalNewMessages(nextOffsetRanges);
        LOG.info("About to read " + j2 + " from Kafka for topic :" + this.offsetGen.getTopicName());
        if (j2 == 0) {
            return Pair.of(Option.of(this.sparkSession.emptyDataFrame()), string.isEmpty() ? KafkaOffsetGen.CheckpointUtils.offsetsToStr(nextOffsetRanges) : string);
        }
        try {
            Dataset<Row> dataset = toDataset(nextOffsetRanges, this.offsetGen, this.schemaRegistryProvider.fetchSchemaFromRegistry(this.props.getString(SchemaRegistryProvider.Config.SRC_SCHEMA_REGISTRY_URL_PROP)));
            LOG.info(String.format("Spark schema of Kafka Payload for topic %s:\n%s", this.offsetGen.getTopicName(), dataset.schema().treeString()));
            LOG.info(String.format("New checkpoint string: %s", KafkaOffsetGen.CheckpointUtils.offsetsToStr(nextOffsetRanges)));
            return Pair.of(Option.of(dataset), string.isEmpty() ? KafkaOffsetGen.CheckpointUtils.offsetsToStr(nextOffsetRanges) : string);
        } catch (IOException e) {
            LOG.error("Fatal error reading and parsing incoming debezium event", e);
            throw new HoodieException("Fatal error reading and parsing incoming debezium event", e);
        }
    }

    protected abstract Dataset<Row> processDataset(Dataset<Row> dataset);

    private Dataset<Row> toDataset(OffsetRange[] offsetRangeArr, KafkaOffsetGen kafkaOffsetGen, String str) {
        AvroConvertor avroConvertor = new AvroConvertor(str);
        return convertArrayColumnsToString(convertColumnToNullable(this.sparkSession, convertDateColumns(processDataset(this.deserializerClassName.equals(StringDeserializer.class.getName()) ? AvroConversionUtils.createDataFrame(KafkaUtils.createRDD(this.sparkContext, kafkaOffsetGen.getKafkaParams(), offsetRangeArr, LocationStrategies.PreferConsistent()).map(consumerRecord -> {
            return avroConvertor.fromJson((String) consumerRecord.value());
        }).rdd(), str, this.sparkSession) : AvroConversionUtils.createDataFrame(KafkaUtils.createRDD(this.sparkContext, kafkaOffsetGen.getKafkaParams(), offsetRangeArr, LocationStrategies.PreferConsistent()).map(consumerRecord2 -> {
            return (GenericRecord) consumerRecord2.value();
        }).rdd(), str, this.sparkSession)), new Schema.Parser().parse(str))));
    }

    public static Dataset<Row> convertDateColumns(Dataset<Row> dataset, Schema schema) {
        if (schema.getField(DebeziumConstants.INCOMING_BEFORE_FIELD) != null) {
            List<String> list = (List) ((Schema) schema.getField(DebeziumConstants.INCOMING_BEFORE_FIELD).schema().getTypes().get(1)).getFields().stream().filter(field -> {
                return field.schema().getType() == Schema.Type.UNION ? field.schema().getTypes().stream().anyMatch(schema2 -> {
                    return DATE_CONNECT_NAME.equals(schema2.getProp(CONNECT_NAME_KEY));
                }) : DATE_CONNECT_NAME.equals(field.schema().getProp(CONNECT_NAME_KEY));
            }).map((v0) -> {
                return v0.name();
            }).collect(Collectors.toList());
            LOG.info("Date fields: " + list.toString());
            for (String str : list) {
                dataset = dataset.withColumn(str, functions.col(str).cast(DataTypes.DateType));
            }
        }
        return dataset;
    }

    private static Dataset<Row> convertColumnToNullable(SparkSession sparkSession, Dataset<Row> dataset) {
        List asList = Arrays.asList(dataset.columns());
        return sparkSession.createDataFrame(dataset.rdd(), new StructType((StructField[]) Arrays.stream(dataset.schema().fields()).map(structField -> {
            return asList.contains(structField.name()) ? new StructField(structField.name(), structField.dataType(), true, structField.metadata()) : structField;
        }).toArray(i -> {
            return new StructField[i];
        })));
    }

    private static Dataset<Row> convertArrayColumnsToString(Dataset<Row> dataset) {
        for (String str : (List) Arrays.stream(dataset.schema().fields()).filter(structField -> {
            return structField.dataType().typeName().toLowerCase().startsWith("array");
        }).map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList())) {
            dataset = dataset.withColumn(str, functions.col(str).cast(DataTypes.StringType));
        }
        return dataset;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 608725776:
                if (implMethodName.equals("lambda$toDataset$32802ec8$1")) {
                    z = true;
                    break;
                }
                break;
            case 1762535422:
                if (implMethodName.equals("lambda$toDataset$d7624e8e$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/sources/debezium/DebeziumSource") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/utilities/sources/helpers/AvroConvertor;Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Lorg/apache/avro/generic/GenericRecord;")) {
                    AvroConvertor avroConvertor = (AvroConvertor) serializedLambda.getCapturedArg(0);
                    return consumerRecord -> {
                        return avroConvertor.fromJson((String) consumerRecord.value());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/sources/debezium/DebeziumSource") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Lorg/apache/avro/generic/GenericRecord;")) {
                    return consumerRecord2 -> {
                        return (GenericRecord) consumerRecord2.value();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
