package org.apache.hudi.utilities.sources;

import java.io.IOException;
import java.util.HashMap;
import java.util.UUID;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.utilities.UtilitiesTestBase;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/hudi/utilities/sources/TestKafkaSource.class */
public class TestKafkaSource extends UtilitiesTestBase {
    private static String TEST_TOPIC_NAME = "hoodie_test";
    private FilebasedSchemaProvider schemaProvider;
    private KafkaTestUtils testUtils;

    @BeforeClass
    public static void initClass() throws Exception {
        UtilitiesTestBase.initClass();
    }

    @AfterClass
    public static void cleanupClass() throws Exception {
        UtilitiesTestBase.cleanupClass();
    }

    @Override // org.apache.hudi.utilities.UtilitiesTestBase
    @Before
    public void setup() throws Exception {
        super.setup();
        this.schemaProvider = new FilebasedSchemaProvider(UtilitiesTestBase.Helpers.setupSchemaOnDFS(), this.jsc);
        this.testUtils = new KafkaTestUtils();
        this.testUtils.setup();
    }

    @Override // org.apache.hudi.utilities.UtilitiesTestBase
    @After
    public void teardown() throws Exception {
        super.teardown();
        this.testUtils.teardown();
    }

    private TypedProperties createPropsForJsonSource(Long l) {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
        typedProperties.setProperty("bootstrap.servers", this.testUtils.brokerAddress());
        typedProperties.setProperty("auto.offset.reset", "earliest");
        typedProperties.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", l != null ? String.valueOf(l) : String.valueOf(KafkaOffsetGen.Config.maxEventsFromKafkaSource));
        typedProperties.setProperty("group.id", UUID.randomUUID().toString());
        return typedProperties;
    }

    @Test
    public void testJsonKafkaSource() throws IOException {
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        JsonKafkaSource jsonKafkaSource = new JsonKafkaSource(createPropsForJsonSource(null), this.jsc, this.sparkSession, this.schemaProvider);
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(jsonKafkaSource);
        Assert.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assert.assertEquals(900L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        Assert.assertEquals(900L, AvroConversionUtils.createDataFrame(JavaRDD.toRDD((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()), this.schemaProvider.getSourceSchema().toString(), jsonKafkaSource.getSparkSession()).count());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("001", 1000)));
        InputBatch fetchNewDataInRowFormat = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assert.assertEquals(1100L, ((Dataset) fetchNewDataInRowFormat.getBatch().get()).count());
        InputBatch fetchNewDataInAvroFormat2 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assert.assertEquals(((Dataset) fetchNewDataInRowFormat.getBatch().get()).count(), ((JavaRDD) fetchNewDataInAvroFormat2.getBatch().get()).count());
        Assert.assertEquals(fetchNewDataInRowFormat.getCheckpointForNextBatch(), fetchNewDataInAvroFormat2.getCheckpointForNextBatch());
        InputBatch fetchNewDataInRowFormat2 = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assert.assertEquals(((Dataset) fetchNewDataInRowFormat.getBatch().get()).count(), ((Dataset) fetchNewDataInRowFormat2.getBatch().get()).count());
        Assert.assertEquals(fetchNewDataInRowFormat.getCheckpointForNextBatch(), fetchNewDataInRowFormat2.getCheckpointForNextBatch());
        Assert.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), Long.MAX_VALUE).getBatch());
        Assert.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), Long.MAX_VALUE).getBatch());
    }

    @Test
    public void testJsonKafkaSourceWithDefaultUpperCap() throws IOException {
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource(Long.MAX_VALUE), this.jsc, this.sparkSession, this.schemaProvider));
        KafkaOffsetGen.Config.maxEventsFromKafkaSource = 500L;
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assert.assertEquals(500L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("001", 1000)));
        Assert.assertEquals(1500L, ((Dataset) sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), 1500L).getBatch().get()).count());
        KafkaOffsetGen.Config.maxEventsFromKafkaSource = 5000000L;
    }

    @Test
    public void testJsonKafkaSourceWithConfigurableUpperCap() throws IOException {
        this.testUtils.createTopic(TEST_TOPIC_NAME, 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(new JsonKafkaSource(createPropsForJsonSource(500L), this.jsc, this.sparkSession, this.schemaProvider));
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("000", 1000)));
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assert.assertEquals(900L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        this.testUtils.sendMessages(TEST_TOPIC_NAME, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInserts("001", 1000)));
        InputBatch fetchNewDataInRowFormat = sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assert.assertEquals(500L, ((Dataset) fetchNewDataInRowFormat.getBatch().get()).count());
        Assert.assertEquals(400L, ((JavaRDD) sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), 400L).getBatch().get()).count());
        InputBatch fetchNewDataInAvroFormat2 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), 600L);
        Assert.assertEquals(600L, ((JavaRDD) fetchNewDataInAvroFormat2.getBatch().get()).count());
        InputBatch fetchNewDataInAvroFormat3 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assert.assertEquals(((Dataset) fetchNewDataInRowFormat.getBatch().get()).count(), ((JavaRDD) fetchNewDataInAvroFormat3.getBatch().get()).count());
        Assert.assertEquals(fetchNewDataInRowFormat.getCheckpointForNextBatch(), fetchNewDataInAvroFormat3.getCheckpointForNextBatch());
        Assert.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat2.getCheckpointForNextBatch()), Long.MAX_VALUE).getBatch());
    }

    private static HashMap<TopicPartition, Long> makeOffsetMap(int[] iArr, long[] jArr) {
        HashMap<TopicPartition, Long> hashMap = new HashMap<>();
        for (int i = 0; i < iArr.length; i++) {
            hashMap.put(new TopicPartition(TEST_TOPIC_NAME, iArr[i]), Long.valueOf(jArr[i]));
        }
        return hashMap;
    }

    @Test
    public void testComputeOffsetRanges() {
        Assert.assertEquals(200L, KafkaOffsetGen.CheckpointUtils.totalNewMessages(new OffsetRange[]{OffsetRange.apply(TEST_TOPIC_NAME, 0, 0L, 100L), OffsetRange.apply(TEST_TOPIC_NAME, 0, 100L, 200L)}));
        Assert.assertEquals(200000L, KafkaOffsetGen.CheckpointUtils.totalNewMessages(KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}), 1000000L)));
        OffsetRange[] computeOffsetRanges = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}), 10000L);
        Assert.assertEquals(10000L, KafkaOffsetGen.CheckpointUtils.totalNewMessages(computeOffsetRanges));
        Assert.assertEquals(200000L, computeOffsetRanges[0].fromOffset());
        Assert.assertEquals(205000L, computeOffsetRanges[0].untilOffset());
        Assert.assertEquals(250000L, computeOffsetRanges[1].fromOffset());
        Assert.assertEquals(255000L, computeOffsetRanges[1].untilOffset());
        Assert.assertEquals(300000L, KafkaOffsetGen.CheckpointUtils.totalNewMessages(KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), makeOffsetMap(new int[]{0, 1, 2}, new long[]{300000, 350000, 100000}), 1000000L)));
        Assert.assertEquals(3L, r0.length);
        OffsetRange[] computeOffsetRanges2 = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 10000}), 100000L);
        Assert.assertEquals(100000L, KafkaOffsetGen.CheckpointUtils.totalNewMessages(computeOffsetRanges2));
        Assert.assertEquals(10L, computeOffsetRanges2[0].count());
        Assert.assertEquals(89990L, computeOffsetRanges2[1].count());
        Assert.assertEquals(10000L, computeOffsetRanges2[2].count());
        OffsetRange[] computeOffsetRanges3 = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 10000}), 1000000L);
        Assert.assertEquals(110010L, KafkaOffsetGen.CheckpointUtils.totalNewMessages(computeOffsetRanges3));
        Assert.assertEquals(10L, computeOffsetRanges3[0].count());
        Assert.assertEquals(100000L, computeOffsetRanges3[1].count());
        Assert.assertEquals(10000L, computeOffsetRanges3[2].count());
        OffsetRange[] computeOffsetRanges4 = KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1, 2, 3, 4}, new long[]{0, 0, 0, 0, 0}), makeOffsetMap(new int[]{0, 1, 2, 3, 4}, new long[]{100, 1000, 1000, 1000, 1000}), 1001L);
        Assert.assertEquals(1001L, KafkaOffsetGen.CheckpointUtils.totalNewMessages(computeOffsetRanges4));
        Assert.assertEquals(100L, computeOffsetRanges4[0].count());
        Assert.assertEquals(226L, computeOffsetRanges4[1].count());
        Assert.assertEquals(226L, computeOffsetRanges4[2].count());
        Assert.assertEquals(226L, computeOffsetRanges4[3].count());
        Assert.assertEquals(223L, computeOffsetRanges4[4].count());
    }
}
