package org.apache.hudi.utilities.sources;

import java.net.URL;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hudi/utilities/sources/TestJsonKafkaSource.class */
public class TestJsonKafkaSource extends SparkClientFunctionalTestHarness {
    protected static final String TEST_TOPIC_PREFIX = "hoodie_test_";
    private static final URL SCHEMA_FILE_URL = TestJsonKafkaSource.class.getClassLoader().getResource("delta-streamer-config/source.avsc");
    protected static KafkaTestUtils testUtils;
    protected final HoodieDeltaStreamerMetrics metrics = (HoodieDeltaStreamerMetrics) Mockito.mock(HoodieDeltaStreamerMetrics.class);
    protected FilebasedSchemaProvider schemaProvider;

    @BeforeAll
    public static void initClass() throws Exception {
        testUtils = new KafkaTestUtils();
        testUtils.setup();
    }

    @AfterAll
    public static void cleanupClass() {
        testUtils.teardown();
    }

    @BeforeEach
    public void init() throws Exception {
        String path = ((URL) Objects.requireNonNull(SCHEMA_FILE_URL)).toURI().getPath();
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.put("hoodie.deltastreamer.schemaprovider.source.schema.file", path);
        this.schemaProvider = new FilebasedSchemaProvider(typedProperties, jsc());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TypedProperties createPropsForJsonSource(String str, Long l, String str2) {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("hoodie.deltastreamer.source.kafka.topic", str);
        typedProperties.setProperty("bootstrap.servers", testUtils.brokerAddress());
        typedProperties.setProperty("auto.offset.reset", str2);
        typedProperties.setProperty("enable.auto.commit", "false");
        typedProperties.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", l != null ? String.valueOf(l) : String.valueOf(KafkaOffsetGen.Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue()));
        typedProperties.setProperty("group.id", UUID.randomUUID().toString());
        return typedProperties;
    }

    @Test
    public void testJsonKafkaSource() {
        testUtils.createTopic("hoodie_test_testJsonKafkaSource", 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        JsonKafkaSource jsonKafkaSource = new JsonKafkaSource(createPropsForJsonSource("hoodie_test_testJsonKafkaSource", null, "earliest"), jsc(), spark(), this.schemaProvider, this.metrics);
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(jsonKafkaSource);
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        testUtils.sendMessages("hoodie_test_testJsonKafkaSource", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals(900L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        Assertions.assertEquals(900L, AvroConversionUtils.createDataFrame(JavaRDD.toRDD((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()), this.schemaProvider.getSourceSchema().toString(), jsonKafkaSource.getSparkSession()).count());
        testUtils.sendMessages("hoodie_test_testJsonKafkaSource", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("001", 1000)));
        InputBatch fetchNewDataInRowFormat = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(1100L, ((Dataset) fetchNewDataInRowFormat.getBatch().get()).count());
        InputBatch fetchNewDataInAvroFormat2 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(((Dataset) fetchNewDataInRowFormat.getBatch().get()).count(), ((JavaRDD) fetchNewDataInAvroFormat2.getBatch().get()).count());
        Assertions.assertEquals(fetchNewDataInRowFormat.getCheckpointForNextBatch(), fetchNewDataInAvroFormat2.getCheckpointForNextBatch());
        InputBatch fetchNewDataInRowFormat2 = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(((Dataset) fetchNewDataInRowFormat.getBatch().get()).count(), ((Dataset) fetchNewDataInRowFormat2.getBatch().get()).count());
        Assertions.assertEquals(fetchNewDataInRowFormat.getCheckpointForNextBatch(), fetchNewDataInRowFormat2.getCheckpointForNextBatch());
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), Long.MAX_VALUE).getBatch());
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), Long.MAX_VALUE).getBatch());
    }

    @Test
    public void testJsonKafkaSourceFilterNullMsg() {
        testUtils.createTopic("hoodie_test_testJsonKafkaSourceFilterNullMsg", 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource("hoodie_test_testJsonKafkaSourceFilterNullMsg", null, "earliest"), jsc(), spark(), this.schemaProvider, this.metrics));
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceFilterNullMsg", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceFilterNullMsg", new String[100]);
        Assertions.assertEquals(1000L, ((JavaRDD) sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch().get()).count());
    }

    @Test
    public void testJsonKafkaSourceResetStrategy() {
        testUtils.createTopic("hoodie_test_testJsonKafkaSourceResetStrategy", 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource("hoodie_test_testJsonKafkaSourceResetStrategy", null, "earliest"), jsc(), spark(), this.schemaProvider, this.metrics));
        SourceFormatAdapter sourceFormatAdapter2 = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource("hoodie_test_testJsonKafkaSourceResetStrategy", null, "latest"), jsc(), spark(), this.schemaProvider, this.metrics));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        InputBatch fetchNewDataInAvroFormat2 = sourceFormatAdapter2.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals(fetchNewDataInAvroFormat.getBatch(), fetchNewDataInAvroFormat2.getBatch());
        Assertions.assertEquals(fetchNewDataInAvroFormat.getCheckpointForNextBatch(), fetchNewDataInAvroFormat2.getCheckpointForNextBatch());
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceResetStrategy", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        Assertions.assertEquals(sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getCheckpointForNextBatch(), sourceFormatAdapter2.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getCheckpointForNextBatch());
    }

    @Test
    public void testJsonKafkaSourceWithDefaultUpperCap() {
        testUtils.createTopic("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", Long.MAX_VALUE, "earliest"), jsc(), spark(), this.schemaProvider, this.metrics));
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals(1000L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("001", 1000)));
        Assertions.assertEquals(1000L, ((Dataset) sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), 1500L).getBatch().get()).count());
    }

    @Test
    public void testJsonKafkaSourceInsertRecordsLessSourceLimit() {
        testUtils.createTopic("hoodie_test_testJsonKafkaSourceInsertRecordsLessSourceLimit", 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        TypedProperties createPropsForJsonSource = createPropsForJsonSource("hoodie_test_testJsonKafkaSourceInsertRecordsLessSourceLimit", Long.MAX_VALUE, "earliest");
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource, jsc(), spark(), this.schemaProvider, this.metrics));
        createPropsForJsonSource.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", "500");
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceInsertRecordsLessSourceLimit", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 400)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 300L);
        Assertions.assertEquals(300L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceInsertRecordsLessSourceLimit", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("001", 600)));
        Assertions.assertEquals(300L, ((Dataset) sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), 300L).getBatch().get()).count());
    }

    @Test
    public void testJsonKafkaSourceWithConfigurableUpperCap() {
        testUtils.createTopic("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", 500L, "earliest"), jsc(), spark(), this.schemaProvider, this.metrics));
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals(900L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("001", 1000)));
        InputBatch fetchNewDataInRowFormat = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(500L, ((Dataset) fetchNewDataInRowFormat.getBatch().get()).count());
        Assertions.assertEquals(400L, ((JavaRDD) sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), 400L).getBatch().get()).count());
        InputBatch fetchNewDataInAvroFormat2 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), 600L);
        Assertions.assertEquals(600L, ((JavaRDD) fetchNewDataInAvroFormat2.getBatch().get()).count());
        InputBatch fetchNewDataInAvroFormat3 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(((Dataset) fetchNewDataInRowFormat.getBatch().get()).count(), ((JavaRDD) fetchNewDataInAvroFormat3.getBatch().get()).count());
        Assertions.assertEquals(fetchNewDataInRowFormat.getCheckpointForNextBatch(), fetchNewDataInAvroFormat3.getCheckpointForNextBatch());
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat2.getCheckpointForNextBatch()), Long.MAX_VALUE).getBatch());
    }

    @Test
    public void testCommitOffsetToKafka() {
        testUtils.createTopic("hoodie_test_testCommitOffsetToKafka", 2);
        ArrayList arrayList = new ArrayList();
        TopicPartition topicPartition = new TopicPartition("hoodie_test_testCommitOffsetToKafka", 0);
        arrayList.add(topicPartition);
        TopicPartition topicPartition2 = new TopicPartition("hoodie_test_testCommitOffsetToKafka", 1);
        arrayList.add(topicPartition2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        TypedProperties createPropsForJsonSource = createPropsForJsonSource("hoodie_test_testCommitOffsetToKafka", null, "earliest");
        createPropsForJsonSource.put(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource, jsc(), spark(), this.schemaProvider, this.metrics));
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        testUtils.sendMessages("hoodie_test_testCommitOffsetToKafka", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 599L);
        sourceFormatAdapter.getSource().onCommit(fetchNewDataInAvroFormat.getCheckpointForNextBatch());
        KafkaConsumer kafkaConsumer = new KafkaConsumer(createPropsForJsonSource);
        Throwable th = null;
        try {
            try {
                kafkaConsumer.assign(arrayList);
                OffsetAndMetadata committed = kafkaConsumer.committed(topicPartition);
                Assertions.assertNotNull(committed);
                Assertions.assertEquals(300L, committed.offset());
                OffsetAndMetadata committed2 = kafkaConsumer.committed(topicPartition2);
                Assertions.assertNotNull(committed2);
                Assertions.assertEquals(299L, committed2.offset());
                Map endOffsets = kafkaConsumer.endOffsets(arrayList);
                Assertions.assertEquals(500L, endOffsets.get(topicPartition));
                Assertions.assertEquals(500L, endOffsets.get(topicPartition2));
                testUtils.sendMessages("hoodie_test_testCommitOffsetToKafka", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("001", 500)));
                sourceFormatAdapter.getSource().onCommit(sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE).getCheckpointForNextBatch());
                OffsetAndMetadata committed3 = kafkaConsumer.committed(topicPartition);
                Assertions.assertNotNull(committed3);
                Assertions.assertEquals(750L, committed3.offset());
                OffsetAndMetadata committed4 = kafkaConsumer.committed(topicPartition2);
                Assertions.assertNotNull(committed4);
                Assertions.assertEquals(750L, committed4.offset());
                Map endOffsets2 = kafkaConsumer.endOffsets(arrayList);
                Assertions.assertEquals(750L, endOffsets2.get(topicPartition));
                Assertions.assertEquals(750L, endOffsets2.get(topicPartition2));
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                createPropsForJsonSource.remove("group.id");
                Assertions.assertThrows(HoodieNotSupportedException.class, () -> {
                    sourceFormatAdapter.getSource().onCommit("");
                });
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (th != null) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testFailOnDataLoss() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("retention.ms", "10000");
        testUtils.createTopic("hoodie_test_testFailOnDataLoss", 1, properties);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        TypedProperties createPropsForJsonSource = createPropsForJsonSource("hoodie_test_testFailOnDataLoss", null, "earliest");
        createPropsForJsonSource.setProperty(KafkaOffsetGen.Config.ENABLE_FAIL_ON_DATA_LOSS.key(), Boolean.toString(true));
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource, jsc(), spark(), this.schemaProvider, this.metrics));
        testUtils.sendMessages("hoodie_test_testFailOnDataLoss", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 10)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 2L);
        Assertions.assertEquals(2L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        Thread.sleep(10001L);
        Assertions.assertEquals("Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed.", Assertions.assertThrows(HoodieDeltaStreamerException.class, () -> {
            sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        }).getMessage());
        Assertions.assertEquals("Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed.", Assertions.assertThrows(HoodieDeltaStreamerException.class, () -> {
            sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        }).getMessage());
    }
}
