package org.apache.hudi.utilities.sources;

import java.util.ArrayList;
import java.util.Map;
import java.util.UUID;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hudi/utilities/sources/TestJsonKafkaSource.class */
public class TestJsonKafkaSource extends UtilitiesTestBase {
    private static String TEST_TOPIC_NAME = "hoodie_test";
    private FilebasedSchemaProvider schemaProvider;
    private KafkaTestUtils testUtils;
    private HoodieDeltaStreamerMetrics metrics = (HoodieDeltaStreamerMetrics) Mockito.mock(HoodieDeltaStreamerMetrics.class);

    @BeforeAll
    public static void initClass() throws Exception {
        UtilitiesTestBase.initClass(false);
    }

    @AfterAll
    public static void cleanupClass() {
        UtilitiesTestBase.cleanupClass();
    }

    @Override // org.apache.hudi.utilities.testutils.UtilitiesTestBase
    @BeforeEach
    public void setup() throws Exception {
        super.setup();
        this.schemaProvider = new FilebasedSchemaProvider(UtilitiesTestBase.Helpers.setupSchemaOnDFS(), this.jsc);
        this.testUtils = new KafkaTestUtils();
        this.testUtils.setup();
    }

    @Override // org.apache.hudi.utilities.testutils.UtilitiesTestBase
    @AfterEach
    public void teardown() throws Exception {
        super.teardown();
        this.testUtils.teardown();
    }

    private TypedProperties createPropsForJsonSource(Long l, String str) {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
        typedProperties.setProperty("bootstrap.servers", this.testUtils.brokerAddress());
        typedProperties.setProperty("auto.offset.reset", str);
        typedProperties.setProperty("enable.auto.commit", "false");
        typedProperties.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", l != null ? String.valueOf(l) : String.valueOf(KafkaOffsetGen.Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue()));
        typedProperties.setProperty("group.id", UUID.randomUUID().toString());
        return typedProperties;
    }

    @Test
    public void testJsonKafkaSource() {
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        JsonKafkaSource jsonKafkaSource = new JsonKafkaSource(createPropsForJsonSource(null, "earliest"), this.jsc, this.sparkSession, this.schemaProvider, this.metrics);
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(jsonKafkaSource);
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals(900L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        Assertions.assertEquals(900L, AvroConversionUtils.createDataFrame(JavaRDD.toRDD((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()), this.schemaProvider.getSourceSchema().toString(), jsonKafkaSource.getSparkSession()).count());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("001", 1000)));
        InputBatch fetchNewDataInRowFormat = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(1100L, ((Dataset) fetchNewDataInRowFormat.getBatch().get()).count());
        InputBatch fetchNewDataInAvroFormat2 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(((Dataset) fetchNewDataInRowFormat.getBatch().get()).count(), ((JavaRDD) fetchNewDataInAvroFormat2.getBatch().get()).count());
        Assertions.assertEquals(fetchNewDataInRowFormat.getCheckpointForNextBatch(), fetchNewDataInAvroFormat2.getCheckpointForNextBatch());
        InputBatch fetchNewDataInRowFormat2 = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(((Dataset) fetchNewDataInRowFormat.getBatch().get()).count(), ((Dataset) fetchNewDataInRowFormat2.getBatch().get()).count());
        Assertions.assertEquals(fetchNewDataInRowFormat.getCheckpointForNextBatch(), fetchNewDataInRowFormat2.getCheckpointForNextBatch());
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), Long.MAX_VALUE).getBatch());
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), Long.MAX_VALUE).getBatch());
    }

    @Test
    public void testJsonKafkaSourceResetStrategy() {
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource(null, "earliest"), this.jsc, this.sparkSession, this.schemaProvider, this.metrics));
        SourceFormatAdapter sourceFormatAdapter2 = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource(null, "latest"), this.jsc, this.sparkSession, this.schemaProvider, this.metrics));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        InputBatch fetchNewDataInAvroFormat2 = sourceFormatAdapter2.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals(fetchNewDataInAvroFormat.getBatch(), fetchNewDataInAvroFormat2.getBatch());
        Assertions.assertEquals(fetchNewDataInAvroFormat.getCheckpointForNextBatch(), fetchNewDataInAvroFormat2.getCheckpointForNextBatch());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        Assertions.assertEquals(sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getCheckpointForNextBatch(), sourceFormatAdapter2.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getCheckpointForNextBatch());
    }

    @Test
    public void testJsonKafkaSourceWithDefaultUpperCap() {
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource(Long.MAX_VALUE, "earliest"), this.jsc, this.sparkSession, this.schemaProvider, this.metrics));
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals(1000L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("001", 1000)));
        Assertions.assertEquals(1000L, ((Dataset) sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), 1500L).getBatch().get()).count());
    }

    @Test
    public void testJsonKafkaSourceInsertRecordsLessSourceLimit() {
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        TypedProperties createPropsForJsonSource = createPropsForJsonSource(Long.MAX_VALUE, "earliest");
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource, this.jsc, this.sparkSession, this.schemaProvider, this.metrics));
        createPropsForJsonSource.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", "500");
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 400)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 300L);
        Assertions.assertEquals(300L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("001", 600)));
        Assertions.assertEquals(300L, ((Dataset) sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), 300L).getBatch().get()).count());
    }

    @Test
    public void testJsonKafkaSourceWithConfigurableUpperCap() {
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource(500L, "earliest"), this.jsc, this.sparkSession, this.schemaProvider, this.metrics));
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals(900L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("001", 1000)));
        InputBatch fetchNewDataInRowFormat = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(500L, ((Dataset) fetchNewDataInRowFormat.getBatch().get()).count());
        Assertions.assertEquals(400L, ((JavaRDD) sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), 400L).getBatch().get()).count());
        InputBatch fetchNewDataInAvroFormat2 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), 600L);
        Assertions.assertEquals(600L, ((JavaRDD) fetchNewDataInAvroFormat2.getBatch().get()).count());
        InputBatch fetchNewDataInAvroFormat3 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(((Dataset) fetchNewDataInRowFormat.getBatch().get()).count(), ((JavaRDD) fetchNewDataInAvroFormat3.getBatch().get()).count());
        Assertions.assertEquals(fetchNewDataInRowFormat.getCheckpointForNextBatch(), fetchNewDataInAvroFormat3.getCheckpointForNextBatch());
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat2.getCheckpointForNextBatch()), Long.MAX_VALUE).getBatch());
    }

    @Test
    public void testCommitOffsetToKafka() {
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        ArrayList arrayList = new ArrayList();
        TopicPartition topicPartition = new TopicPartition(TEST_TOPIC_NAME, 0);
        arrayList.add(topicPartition);
        TopicPartition topicPartition2 = new TopicPartition(TEST_TOPIC_NAME, 1);
        arrayList.add(topicPartition2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        TypedProperties createPropsForJsonSource = createPropsForJsonSource(null, "earliest");
        createPropsForJsonSource.put(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource, this.jsc, this.sparkSession, this.schemaProvider, this.metrics));
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 599L);
        sourceFormatAdapter.getSource().onCommit(fetchNewDataInAvroFormat.getCheckpointForNextBatch());
        KafkaConsumer kafkaConsumer = new KafkaConsumer(createPropsForJsonSource);
        Throwable th = null;
        try {
            try {
                kafkaConsumer.assign(arrayList);
                OffsetAndMetadata committed = kafkaConsumer.committed(topicPartition);
                Assertions.assertNotNull(committed);
                Assertions.assertEquals(300L, committed.offset());
                OffsetAndMetadata committed2 = kafkaConsumer.committed(topicPartition2);
                Assertions.assertNotNull(committed2);
                Assertions.assertEquals(299L, committed2.offset());
                Map endOffsets = kafkaConsumer.endOffsets(arrayList);
                Assertions.assertEquals(500L, endOffsets.get(topicPartition));
                Assertions.assertEquals(500L, endOffsets.get(topicPartition2));
                this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("001", 500)));
                sourceFormatAdapter.getSource().onCommit(sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE).getCheckpointForNextBatch());
                OffsetAndMetadata committed3 = kafkaConsumer.committed(topicPartition);
                Assertions.assertNotNull(committed3);
                Assertions.assertEquals(750L, committed3.offset());
                OffsetAndMetadata committed4 = kafkaConsumer.committed(topicPartition2);
                Assertions.assertNotNull(committed4);
                Assertions.assertEquals(750L, committed4.offset());
                Map endOffsets2 = kafkaConsumer.endOffsets(arrayList);
                Assertions.assertEquals(750L, endOffsets2.get(topicPartition));
                Assertions.assertEquals(750L, endOffsets2.get(topicPartition2));
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                createPropsForJsonSource.remove("group.id");
                Assertions.assertThrows(HoodieNotSupportedException.class, () -> {
                    sourceFormatAdapter.getSource().onCommit("");
                });
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (th != null) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }
}
