package org.apache.hudi.utilities.sources;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
import org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelectorCommon;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.SourceProfile;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.class */
public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarness {
    private static final Schema S3_METADATA_SCHEMA = SchemaTestUtil.getSchemaFromResource(TestS3EventsHoodieIncrSource.class, "/streamer-config/s3-metadata.avsc", true);
    private ObjectMapper mapper = new ObjectMapper();
    private static final String MY_BUCKET = "some-bucket";
    private static final String IGNORE_FILE_EXTENSION = ".ignore";
    private Option<SchemaProvider> schemaProvider;

    @Mock
    QueryRunner mockQueryRunner;

    @Mock
    CloudObjectsSelectorCommon mockCloudObjectsSelectorCommon;

    @Mock
    SourceProfileSupplier sourceProfileSupplier;

    @Mock
    QueryInfo queryInfo;

    @Mock
    HoodieIngestionMetrics metrics;
    private JavaSparkContext jsc;
    private HoodieTableMetaClient metaClient;

    /* loaded from: input_file:org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource$TestSourceProfile.class */
    static class TestSourceProfile implements SourceProfile<Long> {
        private final long maxSourceBytes;
        private final int sourcePartitions;
        private final long bytesPerPartition;

        public TestSourceProfile(long j, int i, long j2) {
            this.maxSourceBytes = j;
            this.sourcePartitions = i;
            this.bytesPerPartition = j2;
        }

        public long getMaxSourceBytes() {
            return this.maxSourceBytes;
        }

        public int getSourcePartitions() {
            return this.sourcePartitions;
        }

        /* renamed from: getSourceSpecificContext, reason: merged with bridge method [inline-methods] */
        public Long m31getSourceSpecificContext() {
            return Long.valueOf(this.bytesPerPartition);
        }
    }

    @BeforeEach
    public void setUp() throws IOException {
        this.jsc = JavaSparkContext.fromSparkContext(spark().sparkContext());
        this.metaClient = getHoodieMetaClient(storageConf(), basePath());
        String path = TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath();
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.put("hoodie.streamer.schemaprovider.source.schema.file", path);
        typedProperties.put("hoodie.streamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName());
        this.schemaProvider = Option.of(new FilebasedSchemaProvider(typedProperties, this.jsc));
    }

    private List<String> getSampleS3ObjectKeys(List<Triple<String, Long, String>> list) {
        return (List) list.stream().map(triple -> {
            try {
                return generateS3EventMetadata((Long) triple.getMiddle(), MY_BUCKET, (String) triple.getLeft(), (String) triple.getRight());
            } catch (JsonProcessingException e) {
                throw new RuntimeException((Throwable) e);
            }
        }).collect(Collectors.toList());
    }

    private Dataset<Row> generateDataset(List<Triple<String, Long, String>> list) {
        return spark().read().json(this.jsc.parallelize(getSampleS3ObjectKeys(list), 2));
    }

    private String generateS3EventMetadata(Long l, String str, String str2, String str3) throws JsonProcessingException {
        HashMap hashMap = new HashMap();
        hashMap.put("size", l);
        hashMap.put("key", str2);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("name", str);
        HashMap hashMap3 = new HashMap();
        hashMap3.put("object", hashMap);
        hashMap3.put("bucket", hashMap2);
        HashMap hashMap4 = new HashMap();
        hashMap4.put("s3", hashMap3);
        hashMap4.put("_hoodie_commit_time", str3);
        return this.mapper.writeValueAsString(hashMap4);
    }

    private HoodieRecord generateS3EventMetadata(String str, String str2, String str3, Long l) {
        Schema schema = S3_METADATA_SCHEMA;
        GenericData.Record record = new GenericData.Record(schema);
        Schema schema2 = (Schema) schema.getField("s3").schema().getTypes().get(1);
        GenericData.Record record2 = new GenericData.Record(schema2);
        GenericData.Record record3 = new GenericData.Record((Schema) schema2.getField("bucket").schema().getTypes().get(1));
        record3.put("name", str2);
        GenericData.Record record4 = new GenericData.Record((Schema) schema2.getField("object").schema().getTypes().get(1));
        record4.put("key", str3);
        record4.put("size", l);
        record2.put("bucket", record3);
        record2.put("object", record4);
        record.put("s3", record2);
        record.put("_hoodie_commit_time", str);
        return new HoodieAvroRecord(new HoodieKey(str3, str2), new HoodieAvroPayload(Option.of(record)));
    }

    private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy) {
        Properties properties = new Properties();
        properties.setProperty("hoodie.streamer.source.hoodieincr.path", basePath());
        properties.setProperty("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name());
        properties.setProperty("hoodie.streamer.source.hoodieincr.file.format", "json");
        return new TypedProperties(properties);
    }

    private HoodieWriteConfig.Builder getConfigBuilder(String str, HoodieTableMetaClient hoodieTableMetaClient) {
        return HoodieWriteConfig.newBuilder().withPath(str).withSchema(S3_METADATA_SCHEMA.toString()).withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).forTable(hoodieTableMetaClient.getTableConfig().getTableName());
    }

    private HoodieWriteConfig getWriteConfig() {
        return getConfigBuilder(basePath(), this.metaClient).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).build();
    }

    private Pair<String, List<HoodieRecord>> writeS3MetadataRecords(String str) throws IOException {
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getWriteConfig());
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime(str);
                List asList = Arrays.asList(generateS3EventMetadata(str, "bucket-1", "data-file-1.json", (Long) 1L));
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(asList, 1), str).collect());
                Pair<String, List<HoodieRecord>> of = Pair.of(str, asList);
                if (hoodieWriteClient != null) {
                    if (0 != 0) {
                        try {
                            hoodieWriteClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        hoodieWriteClient.close();
                    }
                }
                return of;
            } finally {
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testEmptyCheckpoint() throws IOException {
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1"), 0L, (String) writeS3MetadataRecords("1").getKey());
    }

    @Test
    public void testOneFileInCommit() throws IOException {
        writeS3MetadataRecords("1");
        writeS3MetadataRecords("2");
        ArrayList arrayList = new ArrayList();
        arrayList.add(Triple.of("path/to/file1.json", 100L, "1"));
        arrayList.add(Triple.of("path/to/file2.json", 150L, "1"));
        arrayList.add(Triple.of("path/to/file3.json", 200L, "1"));
        setMockQueryRunner(generateDataset(arrayList));
        Mockito.when(this.mockCloudObjectsSelectorCommon.loadAsDataset((SparkSession) Mockito.any(), (List) Mockito.any(), (String) Mockito.any(), (Option) Mockito.eq(this.schemaProvider), Mockito.anyInt())).thenReturn(Option.empty());
        Mockito.when(this.sourceProfileSupplier.getSourceProfile()).thenReturn((Object) null);
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L, "1#path/to/file1.json");
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"), 200L, "1#path/to/file2.json");
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 200L, "1#path/to/file3.json");
    }

    @Test
    public void testTwoFilesAndContinueInSameCommit() throws IOException {
        writeS3MetadataRecords("1");
        writeS3MetadataRecords("2");
        ArrayList arrayList = new ArrayList();
        arrayList.add(Triple.of("path/to/file1.json", 100L, "1"));
        arrayList.add(Triple.of("path/to/file2.json", 150L, "1"));
        arrayList.add(Triple.of("path/to/file3.json", 200L, "1"));
        setMockQueryRunner(generateDataset(arrayList));
        Mockito.when(this.mockCloudObjectsSelectorCommon.loadAsDataset((SparkSession) Mockito.any(), (List) Mockito.any(), (String) Mockito.any(), (Option) Mockito.eq(this.schemaProvider), Mockito.anyInt())).thenReturn(Option.empty());
        Mockito.when(this.sourceProfileSupplier.getSourceProfile()).thenReturn((Object) null);
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1"), 250L, "1#path/to/file2.json");
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 250L, "1#path/to/file3.json");
    }

    @ValueSource(strings = {".json", ".gz"})
    @ParameterizedTest
    public void testTwoFilesAndContinueAcrossCommits(String str) throws IOException {
        writeS3MetadataRecords("1");
        writeS3MetadataRecords("2");
        TypedProperties props = setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT);
        if (!str.endsWith("json")) {
            props.setProperty(CloudSourceConfig.CLOUD_DATAFILE_EXTENSION.key(), str);
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(Triple.of(String.format("path/to/file1%s", str), 100L, "1"));
        arrayList.add(Triple.of(String.format("path/to/file2%s", IGNORE_FILE_EXTENSION), 800L, "1"));
        arrayList.add(Triple.of(String.format("path/to/file3%s", str), 200L, "1"));
        arrayList.add(Triple.of(String.format("path/to/file2%s", str), 150L, "1"));
        arrayList.add(Triple.of(String.format("path/to/file4%s", str), 50L, "2"));
        arrayList.add(Triple.of(String.format("path/to/file4%s", IGNORE_FILE_EXTENSION), 200L, "2"));
        arrayList.add(Triple.of(String.format("path/to/file5%s", str), 150L, "2"));
        setMockQueryRunner(generateDataset(arrayList));
        Mockito.when(this.mockCloudObjectsSelectorCommon.loadAsDataset((SparkSession) Mockito.any(), (List) Mockito.any(), (String) Mockito.any(), (Option) Mockito.eq(this.schemaProvider), Mockito.anyInt())).thenReturn(Option.empty());
        Mockito.when(this.sourceProfileSupplier.getSourceProfile()).thenReturn((Object) null);
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L, "1#path/to/file1" + str, props);
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1" + str), 100L, "1#path/to/file2" + str, props);
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2" + str), 1000L, "2#path/to/file5" + str, props);
    }

    @Test
    public void testEmptyDataAfterFilter() throws IOException {
        writeS3MetadataRecords("1");
        writeS3MetadataRecords("2");
        ArrayList arrayList = new ArrayList();
        arrayList.add(Triple.of("path/to/skip1.json", 100L, "1"));
        arrayList.add(Triple.of("path/to/skip3.json", 200L, "1"));
        arrayList.add(Triple.of("path/to/skip2.json", 150L, "1"));
        arrayList.add(Triple.of("path/to/skip5.json", 50L, "2"));
        arrayList.add(Triple.of("path/to/skip4.json", 150L, "2"));
        setMockQueryRunner(generateDataset(arrayList));
        TypedProperties props = setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT);
        props.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", "path/to/skip");
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1"), 1000L, "2", props);
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 1000L, "2", props);
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("2#path/to/skip4.json"), 1000L, "2#path/to/skip4.json", props);
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("2#path/to/skip5.json"), 1000L, "2#path/to/skip5.json", props);
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("2"), 1000L, "2", props);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testFilterAnEntireCommit(boolean z) throws IOException {
        writeS3MetadataRecords("1");
        writeS3MetadataRecords("2");
        ArrayList arrayList = new ArrayList();
        arrayList.add(Triple.of("path/to/skip1.json", 100L, "1"));
        arrayList.add(Triple.of("path/to/skip2.json", 200L, "1"));
        arrayList.add(Triple.of("path/to/skip3.json", 150L, "1"));
        arrayList.add(Triple.of("path/to/skip4.json", 50L, "1"));
        arrayList.add(Triple.of("path/to/skip5.json", 150L, "1"));
        arrayList.add(Triple.of("path/to/file5.json", 150L, "2"));
        arrayList.add(Triple.of("path/to/file4.json", 150L, "2"));
        setMockQueryRunner(generateDataset(arrayList));
        TestSourceProfile testSourceProfile = new TestSourceProfile(50L, 0, 10L);
        Mockito.when(this.mockCloudObjectsSelectorCommon.loadAsDataset((SparkSession) Mockito.any(), (List) Mockito.any(), (String) Mockito.any(), (Option) Mockito.eq(this.schemaProvider), Mockito.anyInt())).thenReturn(Option.empty());
        if (z) {
            Mockito.when(this.sourceProfileSupplier.getSourceProfile()).thenReturn(testSourceProfile);
        } else {
            Mockito.when(this.sourceProfileSupplier.getSourceProfile()).thenReturn((Object) null);
        }
        TypedProperties props = setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT);
        props.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", "path/to/skip");
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1"), 50L, "2#path/to/file4.json", props);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testFilterAnEntireMiddleCommit(boolean z) throws IOException {
        writeS3MetadataRecords("1");
        writeS3MetadataRecords("2");
        writeS3MetadataRecords("3");
        ArrayList arrayList = new ArrayList();
        arrayList.add(Triple.of("path/to/file1.json", 100L, "1"));
        arrayList.add(Triple.of("path/to/file3.json", 200L, "1"));
        arrayList.add(Triple.of("path/to/file2.json", 150L, "1"));
        arrayList.add(Triple.of("path/to/skip1.json", 50L, "2"));
        arrayList.add(Triple.of("path/to/skip2.json", 150L, "2"));
        arrayList.add(Triple.of("path/to/file_no_match1.json", 150L, "2"));
        arrayList.add(Triple.of("path/to/file5.json", 150L, "3"));
        arrayList.add(Triple.of("path/to/file4.json", 150L, "3"));
        setMockQueryRunner(generateDataset(arrayList));
        Mockito.when(this.mockCloudObjectsSelectorCommon.loadAsDataset((SparkSession) Mockito.any(), (List) Mockito.any(), (String) Mockito.any(), (Option) Mockito.eq(this.schemaProvider), Mockito.anyInt())).thenReturn(Option.empty());
        TestSourceProfile testSourceProfile = new TestSourceProfile(50L, 0, 10L);
        if (z) {
            Mockito.when(this.sourceProfileSupplier.getSourceProfile()).thenReturn(testSourceProfile);
        } else {
            Mockito.when(this.sourceProfileSupplier.getSourceProfile()).thenReturn((Object) null);
        }
        TypedProperties props = setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT);
        props.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", "path/to/skip");
        props.setProperty("hoodie.streamer.source.cloud.data.select.relative.path.regex", "path/to/file[0-9]+");
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 50L, "3#path/to/file4.json", props);
        this.schemaProvider = Option.empty();
        Mockito.when(this.sourceProfileSupplier.getSourceProfile()).thenReturn((Object) null);
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 50L, "3#path/to/file4.json", props);
    }

    @ParameterizedTest
    @CsvSource({"1,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,1", "2,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,2", "3,3#path/to/file5.json,3#path/to/file5.json,1#path/to/file1.json,3"})
    public void testSplitSnapshotLoad(String str, String str2, String str3, String str4, String str5) throws IOException {
        writeS3MetadataRecords("1");
        writeS3MetadataRecords("2");
        writeS3MetadataRecords("3");
        ArrayList arrayList = new ArrayList();
        arrayList.add(Triple.of("path/to/file1.json", 50L, "1"));
        arrayList.add(Triple.of("path/to/file_no_match1.json", 50L, "1"));
        arrayList.add(Triple.of("path/to/file2.json", 50L, "1"));
        arrayList.add(Triple.of("path/to/skip1.json", 50L, "2"));
        arrayList.add(Triple.of("path/to/skip2.json", 50L, "2"));
        arrayList.add(Triple.of("path/to/file_no_match2.json", 50L, "2"));
        arrayList.add(Triple.of("path/to/file5.json", 50L, "3"));
        arrayList.add(Triple.of("path/to/file4.json", 50L, "3"));
        arrayList.add(Triple.of("path/to/file_no_match3.json", 50L, "3"));
        setMockQueryRunner(generateDataset(arrayList), Option.of(str));
        Mockito.when(this.mockCloudObjectsSelectorCommon.loadAsDataset((SparkSession) Mockito.any(), (List) Mockito.any(), (String) Mockito.any(), (Option) Mockito.eq(this.schemaProvider), Mockito.anyInt())).thenReturn(Option.empty());
        TypedProperties props = setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT);
        props.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix", "path/to/skip");
        props.setProperty("hoodie.streamer.source.cloud.data.select.relative.path.regex", "path/to/file[0-9]+");
        List asList = Arrays.asList(10L, 20L, -1L, 1000000000L);
        Mockito.when(this.sourceProfileSupplier.getSourceProfile()).thenReturn(new TestSourceProfile(50000L, 2, ((Long) asList.get(0)).longValue()));
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, str2, props);
        props.setProperty("hoodie.streamer.source.cloud.data.select.relpath.prefix", "path/to/");
        props.setProperty("hoodie.streamer.source.cloud.data.select.relative.path.regex", "file[0-9]+");
        Mockito.when(this.sourceProfileSupplier.getSourceProfile()).thenReturn(new TestSourceProfile(10L, 2, ((Long) asList.get(1)).longValue()));
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of(str2), 10L, str3, props);
        props.setProperty("hoodie.streamer.source.cloud.data.select.relpath.prefix", "path/to");
        props.remove("hoodie.streamer.source.cloud.data.select.relative.path.regex");
        Mockito.when(this.sourceProfileSupplier.getSourceProfile()).thenReturn(new TestSourceProfile(50L, 2, ((Long) asList.get(2)).longValue()));
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, str4, props);
        props.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix", "path/to");
        props.remove("hoodie.streamer.source.cloud.data.select.relpath.prefix");
        Mockito.when(this.sourceProfileSupplier.getSourceProfile()).thenReturn(new TestSourceProfile(50L, 2, ((Long) asList.get(3)).longValue()));
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, str5, props);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Integer.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Integer.class);
        ((CloudObjectsSelectorCommon) Mockito.verify(this.mockCloudObjectsSelectorCommon, Mockito.atLeastOnce())).loadAsDataset((SparkSession) Mockito.any(), (List) Mockito.any(), (String) Mockito.any(), (Option) Mockito.eq(this.schemaProvider), ((Integer) forClass.capture()).intValue());
        ((HoodieIngestionMetrics) Mockito.verify(this.metrics, Mockito.atLeastOnce())).updateStreamerSourceParallelism(((Integer) forClass2.capture()).intValue());
        List asList2 = (str.equals("1") || str.equals("2")) ? Arrays.asList(12, 3, 2) : Arrays.asList(23, 2);
        org.junit.jupiter.api.Assertions.assertEquals(asList2, forClass.getAllValues());
        org.junit.jupiter.api.Assertions.assertEquals(asList2, forClass2.getAllValues());
    }

    @Test
    public void testUnsupportedCheckpoint() {
        S3EventsHoodieIncrSource s3EventsHoodieIncrSource = new S3EventsHoodieIncrSource(setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT), jsc(), spark(), this.mockQueryRunner, new CloudDataFetcher(new TypedProperties(), jsc(), spark(), this.metrics, this.mockCloudObjectsSelectorCommon), new DefaultStreamContext((SchemaProvider) this.schemaProvider.orElse((Object) null), Option.of(this.sourceProfileSupplier)));
        org.junit.jupiter.api.Assertions.assertEquals("For S3EventsHoodieIncrSource, only StreamerCheckpointV1, i.e., requested time-based checkpoint, is supported. Checkpoint provided is: StreamerCheckpointV2{checkpointKey='1'}", ((Exception) org.junit.jupiter.api.Assertions.assertThrows(IllegalArgumentException.class, () -> {
            s3EventsHoodieIncrSource.translateCheckpoint(Option.of(new StreamerCheckpointV2("1")));
        })).getMessage());
    }

    @Test
    public void testCreateSource() throws IOException {
        org.junit.jupiter.api.Assertions.assertEquals(Source.SourceType.ROW, UtilHelpers.createSource(S3EventsHoodieIncrSource.class.getName(), setProps(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT), jsc(), spark(), this.metrics, new DefaultStreamContext((SchemaProvider) this.schemaProvider.orElse((Object) null), Option.of(this.sourceProfileSupplier))).getSourceType());
    }

    private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> option, long j, String str, TypedProperties typedProperties) {
        Pair fetchNextBatch = new S3EventsHoodieIncrSource(typedProperties, jsc(), spark(), this.mockQueryRunner, new CloudDataFetcher(typedProperties, jsc(), spark(), this.metrics, this.mockCloudObjectsSelectorCommon), new DefaultStreamContext((SchemaProvider) this.schemaProvider.orElse((Object) null), Option.of(this.sourceProfileSupplier))).fetchNextBatch(option.isPresent() ? Option.of(new StreamerCheckpointV1((String) option.get())) : Option.empty(), j);
        Checkpoint checkpoint = (Checkpoint) fetchNextBatch.getRight();
        org.junit.jupiter.api.Assertions.assertNotNull(checkpoint);
        org.junit.jupiter.api.Assertions.assertEquals(new StreamerCheckpointV1(str), checkpoint);
    }

    private void setMockQueryRunner(Dataset<Row> dataset) {
        setMockQueryRunner(dataset, Option.empty());
    }

    private void setMockQueryRunner(Dataset<Row> dataset, Option<String> option) {
        Mockito.when(this.mockQueryRunner.run((QueryInfo) Mockito.any(QueryInfo.class), (Option) Mockito.any())).thenAnswer(invocationOnMock -> {
            QueryInfo queryInfo = (QueryInfo) invocationOnMock.getArgument(0);
            QueryInfo queryInfo2 = (QueryInfo) option.map(str -> {
                return queryInfo.withUpdatedEndInstant(str);
            }).orElse(queryInfo);
            return queryInfo2.isSnapshot() ? Pair.of(queryInfo2, dataset.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, queryInfo2.getStartInstant())).filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, queryInfo2.getEndInstant()))) : Pair.of(queryInfo2, dataset);
        });
    }

    private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> option, long j, String str) {
        readAndAssert(missingCheckpointStrategy, option, j, str, setProps(missingCheckpointStrategy));
    }
}
