package org.apache.hudi.utilities.sources;

import java.net.URL;
import java.util.Objects;
import java.util.UUID;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/hudi/utilities/sources/TestJsonKafkaSource.class */
public class TestJsonKafkaSource extends BaseTestKafkaSource {
    static final URL SCHEMA_FILE_URL = TestJsonKafkaSource.class.getClassLoader().getResource("delta-streamer-config/source.avsc");

    @BeforeEach
    public void init() throws Exception {
        String path = ((URL) Objects.requireNonNull(SCHEMA_FILE_URL)).toURI().getPath();
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.put("hoodie.deltastreamer.schemaprovider.source.schema.file", path);
        this.schemaProvider = new FilebasedSchemaProvider(typedProperties, jsc());
    }

    @Override // org.apache.hudi.utilities.sources.BaseTestKafkaSource
    TypedProperties createPropsForKafkaSource(String str, Long l, String str2) {
        return createPropsForJsonKafkaSource(testUtils.brokerAddress(), str, l, str2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static TypedProperties createPropsForJsonKafkaSource(String str, String str2, Long l, String str3) {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("hoodie.deltastreamer.source.kafka.topic", str2);
        typedProperties.setProperty("bootstrap.servers", str);
        typedProperties.setProperty("auto.offset.reset", str3);
        typedProperties.setProperty("enable.auto.commit", "false");
        typedProperties.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", l != null ? String.valueOf(l) : String.valueOf(KafkaOffsetGen.Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue()));
        typedProperties.setProperty("group.id", UUID.randomUUID().toString());
        return typedProperties;
    }

    @Override // org.apache.hudi.utilities.sources.BaseTestKafkaSource
    SourceFormatAdapter createSource(TypedProperties typedProperties) {
        return new SourceFormatAdapter(new JsonKafkaSource(typedProperties, jsc(), spark(), this.schemaProvider, this.metrics));
    }

    @Test
    public void testJsonKafkaSourceFilterNullMsg() {
        testUtils.createTopic("hoodie_test_testJsonKafkaSourceFilterNullMsg", 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForKafkaSource("hoodie_test_testJsonKafkaSourceFilterNullMsg", null, "earliest"), jsc(), spark(), this.schemaProvider, this.metrics));
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceFilterNullMsg", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceFilterNullMsg", new String[100]);
        Assertions.assertEquals(1000L, ((JavaRDD) sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch().get()).count());
    }

    @Test
    public void testJsonKafkaSourceWithDefaultUpperCap() {
        testUtils.createTopic("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForKafkaSource("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", Long.MAX_VALUE, "earliest"), jsc(), spark(), this.schemaProvider, this.metrics));
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals(1000L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("001", 1000)));
        Assertions.assertEquals(1000L, ((Dataset) sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), 1500L).getBatch().get()).count());
    }

    @Test
    public void testJsonKafkaSourceWithConfigurableUpperCap() {
        testUtils.createTopic("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForKafkaSource("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", 500L, "earliest"), jsc(), spark(), this.schemaProvider, this.metrics));
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals(900L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("001", 1000)));
        InputBatch fetchNewDataInRowFormat = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(500L, ((Dataset) fetchNewDataInRowFormat.getBatch().get()).count());
        Assertions.assertEquals(400L, ((JavaRDD) sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), 400L).getBatch().get()).count());
        InputBatch fetchNewDataInAvroFormat2 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), 600L);
        Assertions.assertEquals(600L, ((JavaRDD) fetchNewDataInAvroFormat2.getBatch().get()).count());
        InputBatch fetchNewDataInAvroFormat3 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(((Dataset) fetchNewDataInRowFormat.getBatch().get()).count(), ((JavaRDD) fetchNewDataInAvroFormat3.getBatch().get()).count());
        Assertions.assertEquals(fetchNewDataInRowFormat.getCheckpointForNextBatch(), fetchNewDataInAvroFormat3.getCheckpointForNextBatch());
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat2.getCheckpointForNextBatch()), Long.MAX_VALUE).getBatch());
    }

    @Override // org.apache.hudi.utilities.sources.BaseTestKafkaSource
    void sendMessagesToKafka(String str, int i, int i2) {
        testUtils.sendMessages(str, UtilitiesTestBase.Helpers.jsonifyRecords(new HoodieTestDataGenerator().generateInserts("000", Integer.valueOf(i))));
    }

    @Override // org.apache.hudi.utilities.sources.BaseTestKafkaSource
    @Test
    public /* bridge */ /* synthetic */ void testFailOnDataLoss() throws Exception {
        super.testFailOnDataLoss();
    }

    @Override // org.apache.hudi.utilities.sources.BaseTestKafkaSource
    @Test
    public /* bridge */ /* synthetic */ void testCommitOffsetToKafka() {
        super.testCommitOffsetToKafka();
    }

    @Override // org.apache.hudi.utilities.sources.BaseTestKafkaSource
    @Test
    public /* bridge */ /* synthetic */ void testProtoKafkaSourceInsertRecordsLessSourceLimit() {
        super.testProtoKafkaSourceInsertRecordsLessSourceLimit();
    }

    @Override // org.apache.hudi.utilities.sources.BaseTestKafkaSource
    @Test
    public /* bridge */ /* synthetic */ void testKafkaSourceResetStrategy() {
        super.testKafkaSourceResetStrategy();
    }

    @Override // org.apache.hudi.utilities.sources.BaseTestKafkaSource
    @Test
    public /* bridge */ /* synthetic */ void testKafkaSource() {
        super.testKafkaSource();
    }
}
