package org.apache.hudi.utilities.sources;

import java.util.ArrayList;
import java.util.Map;
import java.util.Properties;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieStreamerException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.streamer.SourceProfile;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hudi/utilities/sources/BaseTestKafkaSource.class */
public abstract class BaseTestKafkaSource extends SparkClientFunctionalTestHarness {
    protected static final String TEST_TOPIC_PREFIX = "hoodie_test_";
    protected final HoodieIngestionMetrics metrics = (HoodieIngestionMetrics) Mockito.mock(HoodieIngestionMetrics.class);
    protected final Option<SourceProfileSupplier> sourceProfile = Option.of(Mockito.mock(SourceProfileSupplier.class));
    protected SchemaProvider schemaProvider;
    protected KafkaTestUtils testUtils;

    /* loaded from: input_file:org/apache/hudi/utilities/sources/BaseTestKafkaSource$TestSourceProfile.class */
    static class TestSourceProfile implements SourceProfile<Long> {
        private final long maxSourceBytes;
        private final int sourcePartitions;
        private final long numEvents;

        public TestSourceProfile(long j, int i, long j2) {
            this.maxSourceBytes = j;
            this.sourcePartitions = i;
            this.numEvents = j2;
        }

        public long getMaxSourceBytes() {
            return this.maxSourceBytes;
        }

        public int getSourcePartitions() {
            return this.sourcePartitions;
        }

        /* renamed from: getSourceSpecificContext, reason: merged with bridge method [inline-methods] */
        public Long m20getSourceSpecificContext() {
            return Long.valueOf(this.numEvents);
        }
    }

    @BeforeEach
    public void initClass() {
        this.testUtils = new KafkaTestUtils();
        this.testUtils.setup();
    }

    @AfterEach
    public void cleanupClass() {
        this.testUtils.teardown();
    }

    protected abstract TypedProperties createPropsForKafkaSource(String str, Long l, String str2);

    protected abstract SourceFormatAdapter createSource(TypedProperties typedProperties);

    protected abstract void sendMessagesToKafka(String str, int i, int i2);

    @Test
    public void testKafkaSource() {
        this.testUtils.createTopic("hoodie_test_testKafkaSource", 2);
        SourceFormatAdapter createSource = createSource(createPropsForKafkaSource("hoodie_test_testKafkaSource", null, "earliest"));
        Assertions.assertEquals(Option.empty(), createSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        sendMessagesToKafka("hoodie_test_testKafkaSource", 1000, 2);
        InputBatch fetchNewDataInAvroFormat = createSource.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals(900L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        Assertions.assertEquals(900L, AvroConversionUtils.createDataFrame(JavaRDD.toRDD((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()), this.schemaProvider.getSourceSchema().toString(), createSource.getSource().getSparkSession()).count());
        sendMessagesToKafka("hoodie_test_testKafkaSource", 1000, 2);
        InputBatch fetchNewDataInRowFormat = createSource.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(1100L, ((Dataset) fetchNewDataInRowFormat.getBatch().get()).count());
        InputBatch fetchNewDataInAvroFormat2 = createSource.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(((Dataset) fetchNewDataInRowFormat.getBatch().get()).count(), ((JavaRDD) fetchNewDataInAvroFormat2.getBatch().get()).count());
        Assertions.assertEquals(fetchNewDataInRowFormat.getCheckpointForNextBatch(), fetchNewDataInAvroFormat2.getCheckpointForNextBatch());
        InputBatch fetchNewDataInRowFormat2 = createSource.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(((Dataset) fetchNewDataInRowFormat.getBatch().get()).count(), ((Dataset) fetchNewDataInRowFormat2.getBatch().get()).count());
        Assertions.assertEquals(fetchNewDataInRowFormat.getCheckpointForNextBatch(), fetchNewDataInRowFormat2.getCheckpointForNextBatch());
        Assertions.assertEquals(Option.empty(), createSource.fetchNewDataInAvroFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), Long.MAX_VALUE).getBatch());
        Assertions.assertEquals(Option.empty(), createSource.fetchNewDataInRowFormat(Option.of(fetchNewDataInRowFormat.getCheckpointForNextBatch()), Long.MAX_VALUE).getBatch());
    }

    @Test
    public void testKafkaSourceResetStrategy() {
        this.testUtils.createTopic("hoodie_test_testKafkaSourceResetStrategy", 2);
        SourceFormatAdapter createSource = createSource(createPropsForKafkaSource("hoodie_test_testKafkaSourceResetStrategy", null, "earliest"));
        SourceFormatAdapter createSource2 = createSource(createPropsForKafkaSource("hoodie_test_testKafkaSourceResetStrategy", null, "latest"));
        InputBatch fetchNewDataInAvroFormat = createSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        InputBatch fetchNewDataInAvroFormat2 = createSource2.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals(fetchNewDataInAvroFormat.getBatch(), fetchNewDataInAvroFormat2.getBatch());
        Assertions.assertEquals(fetchNewDataInAvroFormat.getCheckpointForNextBatch(), fetchNewDataInAvroFormat2.getCheckpointForNextBatch());
        sendMessagesToKafka("hoodie_test_testKafkaSourceResetStrategy", 1000, 2);
        Assertions.assertEquals(createSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getCheckpointForNextBatch(), createSource2.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getCheckpointForNextBatch());
    }

    @Test
    public void testProtoKafkaSourceInsertRecordsLessSourceLimit() {
        this.testUtils.createTopic("hoodie_test_testKafkaSourceInsertRecordsLessSourceLimit", 2);
        TypedProperties createPropsForKafkaSource = createPropsForKafkaSource("hoodie_test_testKafkaSourceInsertRecordsLessSourceLimit", Long.MAX_VALUE, "earliest");
        SourceFormatAdapter createSource = createSource(createPropsForKafkaSource);
        createPropsForKafkaSource.setProperty("hoodie.streamer.kafka.source.maxEvents", "500");
        sendMessagesToKafka("hoodie_test_testKafkaSourceInsertRecordsLessSourceLimit", 400, 2);
        InputBatch fetchNewDataInAvroFormat = createSource.fetchNewDataInAvroFormat(Option.empty(), 300L);
        Assertions.assertEquals(300L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        sendMessagesToKafka("hoodie_test_testKafkaSourceInsertRecordsLessSourceLimit", 600, 2);
        Assertions.assertEquals(300L, ((Dataset) createSource.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), 300L).getBatch().get()).count());
    }

    @Test
    public void testCommitOffsetToKafka() {
        this.testUtils.createTopic("hoodie_test_testCommitOffsetToKafka", 2);
        ArrayList arrayList = new ArrayList();
        TopicPartition topicPartition = new TopicPartition("hoodie_test_testCommitOffsetToKafka", 0);
        arrayList.add(topicPartition);
        TopicPartition topicPartition2 = new TopicPartition("hoodie_test_testCommitOffsetToKafka", 1);
        arrayList.add(topicPartition2);
        TypedProperties createPropsForKafkaSource = createPropsForKafkaSource("hoodie_test_testCommitOffsetToKafka", null, "earliest");
        createPropsForKafkaSource.put(KafkaSourceConfig.ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
        SourceFormatAdapter createSource = createSource(createPropsForKafkaSource);
        Assertions.assertEquals(Option.empty(), createSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        sendMessagesToKafka("hoodie_test_testCommitOffsetToKafka", 1000, 2);
        InputBatch fetchNewDataInAvroFormat = createSource.fetchNewDataInAvroFormat(Option.empty(), 599L);
        createSource.getSource().onCommit(fetchNewDataInAvroFormat.getCheckpointForNextBatch());
        KafkaConsumer kafkaConsumer = new KafkaConsumer(createPropsForKafkaSource);
        Throwable th = null;
        try {
            try {
                kafkaConsumer.assign(arrayList);
                OffsetAndMetadata committed = kafkaConsumer.committed(topicPartition);
                Assertions.assertNotNull(committed);
                Assertions.assertEquals(300L, committed.offset());
                OffsetAndMetadata committed2 = kafkaConsumer.committed(topicPartition2);
                Assertions.assertNotNull(committed2);
                Assertions.assertEquals(299L, committed2.offset());
                Map endOffsets = kafkaConsumer.endOffsets(arrayList);
                Assertions.assertEquals(500L, (Long) endOffsets.get(topicPartition));
                Assertions.assertEquals(500L, (Long) endOffsets.get(topicPartition2));
                sendMessagesToKafka("hoodie_test_testCommitOffsetToKafka", 500, 2);
                createSource.getSource().onCommit(createSource.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE).getCheckpointForNextBatch());
                OffsetAndMetadata committed3 = kafkaConsumer.committed(topicPartition);
                Assertions.assertNotNull(committed3);
                Assertions.assertEquals(750L, committed3.offset());
                OffsetAndMetadata committed4 = kafkaConsumer.committed(topicPartition2);
                Assertions.assertNotNull(committed4);
                Assertions.assertEquals(750L, committed4.offset());
                Map endOffsets2 = kafkaConsumer.endOffsets(arrayList);
                Assertions.assertEquals(750L, (Long) endOffsets2.get(topicPartition));
                Assertions.assertEquals(750L, (Long) endOffsets2.get(topicPartition2));
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                createPropsForKafkaSource.remove("group.id");
                Assertions.assertThrows(HoodieNotSupportedException.class, () -> {
                    createSource.getSource().onCommit("");
                });
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (th != null) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testFailOnDataLoss() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("retention.ms", "8000");
        this.testUtils.createTopic("hoodie_test_testFailOnDataLoss", 2, properties);
        TypedProperties createPropsForKafkaSource = createPropsForKafkaSource("hoodie_test_testFailOnDataLoss", null, "earliest");
        createPropsForKafkaSource.setProperty(KafkaSourceConfig.ENABLE_FAIL_ON_DATA_LOSS.key(), Boolean.toString(true));
        SourceFormatAdapter createSource = createSource(createPropsForKafkaSource);
        sendMessagesToKafka("hoodie_test_testFailOnDataLoss", 10, 2);
        InputBatch fetchNewDataInAvroFormat = createSource.fetchNewDataInAvroFormat(Option.empty(), 2L);
        Assertions.assertEquals(2L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        Thread.sleep(30000L);
        Assertions.assertTrue(Assertions.assertThrows(HoodieStreamerException.class, () -> {
            createSource.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        }).getMessage().startsWith("Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. Kafka partitions that have out-of-bound checkpoints:"));
        Assertions.assertTrue(Assertions.assertThrows(HoodieStreamerException.class, () -> {
            createSource.fetchNewDataInRowFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        }).getMessage().startsWith("Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. Kafka partitions that have out-of-bound checkpoints:"));
    }

    @Test
    public void testKafkaSourceWithOffsetsFromSourceProfile() {
        this.testUtils.createTopic("hoodie_test_testKafkaSourceWithOffsetRanges", 2);
        TypedProperties createPropsForKafkaSource = createPropsForKafkaSource("hoodie_test_testKafkaSourceWithOffsetRanges", null, "earliest");
        Mockito.when(((SourceProfileSupplier) this.sourceProfile.get()).getSourceProfile()).thenReturn(new TestSourceProfile(Long.MAX_VALUE, 4, 500L));
        SourceFormatAdapter createSource = createSource(createPropsForKafkaSource);
        Assertions.assertEquals(Option.empty(), createSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        sendMessagesToKafka("hoodie_test_testKafkaSourceWithOffsetRanges", 1000, 2);
        Assertions.assertEquals(500L, ((JavaRDD) createSource.fetchNewDataInAvroFormat(Option.empty(), 900L).getBatch().get()).count());
        ((HoodieIngestionMetrics) Mockito.verify(this.metrics, Mockito.times(2))).updateStreamerSourceParallelism(4);
        ((HoodieIngestionMetrics) Mockito.verify(this.metrics, Mockito.times(2))).updateStreamerSourceBytesToBeIngestedInSyncRound(Long.MAX_VALUE);
    }
}
