package org.apache.hudi.utilities.sources.debezium;

import java.util.UUID;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.class */
public abstract class TestAbstractDebeziumSource extends UtilitiesTestBase {
    private final String testTopicName = "hoodie_test_" + UUID.randomUUID();
    private final HoodieIngestionMetrics metrics = (HoodieIngestionMetrics) Mockito.mock(HoodieIngestionMetrics.class);
    private static KafkaTestUtils testUtils;

    /* loaded from: input_file:org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource$MockSchemaRegistryProvider.class */
    private static class MockSchemaRegistryProvider extends SchemaRegistryProvider {
        private final String schema;

        public MockSchemaRegistryProvider(TypedProperties typedProperties, JavaSparkContext javaSparkContext, TestAbstractDebeziumSource testAbstractDebeziumSource) {
            super(typedProperties, javaSparkContext);
            this.schema = testAbstractDebeziumSource.getSchema();
        }

        public String fetchSchemaFromRegistry(String str) {
            return this.schema;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource$Operation.class */
    public enum Operation {
        INSERT("c"),
        UPDATE("u"),
        DELETE("d");

        public final String op;

        Operation(String str) {
            this.op = str;
        }
    }

    @BeforeAll
    public static void initClass() throws Exception {
        UtilitiesTestBase.initTestServices();
        testUtils = new KafkaTestUtils();
        testUtils.setup();
    }

    @AfterAll
    public static void cleanupClass() {
        UtilitiesTestBase.cleanUpUtilitiesTestServices();
        testUtils.teardown();
    }

    private TypedProperties createPropsForJsonSource() {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("hoodie.deltastreamer.source.kafka.topic", this.testTopicName);
        typedProperties.setProperty("bootstrap.servers", testUtils.brokerAddress());
        typedProperties.setProperty("auto.offset.reset", "earliest");
        typedProperties.setProperty("enable.auto.commit", "false");
        typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.registry.url", "localhost");
        typedProperties.setProperty("hoodie.deltastreamer.source.kafka.value.deserializer.class", StringDeserializer.class.getName());
        typedProperties.setProperty("group.id", UUID.randomUUID().toString());
        return typedProperties;
    }

    protected abstract String getIndexName();

    protected abstract String getSourceClass();

    protected abstract String getSchema();

    protected abstract GenericRecord generateMetaFields(GenericRecord genericRecord);

    protected abstract void validateMetaFields(Dataset<Row> dataset);

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testDebeziumEvents(Operation operation) throws Exception {
        String sourceClass = getSourceClass();
        testUtils.createTopic(this.testTopicName, 2);
        TypedProperties createPropsForJsonSource = createPropsForJsonSource();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(UtilHelpers.createSource(sourceClass, createPropsForJsonSource, jsc, sparkSession, new MockSchemaRegistryProvider(createPropsForJsonSource, jsc, this), this.metrics));
        testUtils.sendMessages(this.testTopicName, new String[]{generateDebeziumEvent(operation).toString()});
        InputBatch fetchNewDataInRowFormat = sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), 10L);
        Assertions.assertEquals(1L, ((Dataset) fetchNewDataInRowFormat.getBatch().get()).count());
        String str = operation.equals(Operation.DELETE) ? "before_" : "after_";
        Assertions.assertTrue(((Dataset) fetchNewDataInRowFormat.getBatch().get()).select("type", new String[0]).collectAsList().stream().allMatch(row -> {
            return row.getString(0).startsWith(str);
        }));
        Assertions.assertTrue(((Dataset) fetchNewDataInRowFormat.getBatch().get()).select("type", new String[0]).collectAsList().stream().allMatch(row2 -> {
            return row2.getString(0).startsWith(str);
        }));
        validateMetaFields((Dataset) fetchNewDataInRowFormat.getBatch().get());
    }

    private GenericRecord generateDebeziumEvent(Operation operation) {
        Schema parse = new Schema.Parser().parse(getSchema());
        String concat = getIndexName().concat(".ghschema.gharchive.Value");
        GenericData.Record record = new GenericData.Record(parse);
        record.put("op", operation.op);
        record.put("ts_ms", 100L);
        Schema.Field field = parse.getField("before");
        GenericData.Record record2 = new GenericData.Record((Schema) field.schema().getTypes().get(field.schema().getIndexNamed(concat).intValue()));
        record2.put("id", 1);
        record2.put("date", "1/1/2020");
        record2.put("type", "before_type");
        record2.put("payload", "before_payload");
        record2.put("timestamp", 1000L);
        record.put("before", record2);
        Schema.Field field2 = parse.getField("after");
        GenericData.Record record3 = new GenericData.Record((Schema) field2.schema().getTypes().get(field2.schema().getIndexNamed(concat).intValue()));
        record3.put("id", 1);
        record3.put("date", "1/1/2021");
        record3.put("type", "after_type");
        record3.put("payload", "after_payload");
        record3.put("timestamp", 3000L);
        record.put("after", record3);
        return generateMetaFields(record);
    }

    private static Stream<Arguments> testArguments() {
        return Stream.of((Object[]) new Arguments[]{Arguments.arguments(new Object[]{Operation.INSERT}), Arguments.arguments(new Object[]{Operation.UPDATE}), Arguments.arguments(new Object[]{Operation.DELETE})});
    }
}
