package org.apache.hudi.utilities.sources.helpers;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.sources.TestS3EventsHoodieIncrSource;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.class */
class TestIncrSourceHelper extends SparkClientFunctionalTestHarness {
    private ObjectMapper mapper = new ObjectMapper();
    private JavaSparkContext jsc;
    private HoodieTableMetaClient metaClient;
    private static final Schema S3_METADATA_SCHEMA = SchemaTestUtil.getSchemaFromResource(TestS3EventsHoodieIncrSource.class, "/streamer-config/s3-metadata.avsc", true);

    TestIncrSourceHelper() {
    }

    @BeforeEach
    public void setUp() throws IOException {
        this.jsc = JavaSparkContext.fromSparkContext(spark().sparkContext());
        this.metaClient = getHoodieMetaClient(hadoopConf(), basePath());
    }

    private String generateS3EventMetadata(Long l, String str, String str2, String str3) throws JsonProcessingException {
        HashMap hashMap = new HashMap();
        hashMap.put("size", l);
        hashMap.put("key", str2);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("name", str);
        HashMap hashMap3 = new HashMap();
        hashMap3.put("object", hashMap);
        hashMap3.put("bucket", hashMap2);
        HashMap hashMap4 = new HashMap();
        hashMap4.put("s3", hashMap3);
        hashMap4.put("_hoodie_commit_time", str3);
        return this.mapper.writeValueAsString(hashMap4);
    }

    private List<String> getSampleS3ObjectKeys(List<Triple<String, Long, String>> list) {
        return (List) list.stream().map(triple -> {
            try {
                return generateS3EventMetadata((Long) triple.getMiddle(), "bucket-1", (String) triple.getLeft(), (String) triple.getRight());
            } catch (JsonProcessingException e) {
                throw new RuntimeException((Throwable) e);
            }
        }).collect(Collectors.toList());
    }

    private Dataset<Row> generateDataset(List<Triple<String, Long, String>> list) {
        return spark().read().json(this.jsc.parallelize(getSampleS3ObjectKeys(list), 2));
    }

    @Test
    void testEmptySource() {
        Pair filterAndGenerateCheckpointBasedOnSourceLimit = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(spark().createDataFrame(new ArrayList(), new StructType()), 50L, new QueryInfo(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1", "commit2", "_hoodie_commit_time", "s3.object.key", "s3.object.size"), new CloudObjectIncrCheckpoint((String) null, (String) null));
        Assertions.assertEquals("00000000000000", ((CloudObjectIncrCheckpoint) filterAndGenerateCheckpointBasedOnSourceLimit.getKey()).toString());
        Assertions.assertTrue(!((Option) filterAndGenerateCheckpointBasedOnSourceLimit.getRight()).isPresent());
    }

    @Test
    void testSingleObjectExceedingSourceLimit() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Triple.of("path/to/file1.json", 100L, "commit1"));
        arrayList.add(Triple.of("path/to/file3.json", 200L, "commit1"));
        arrayList.add(Triple.of("path/to/file2.json", 150L, "commit1"));
        arrayList.add(Triple.of("path/to/file4.json", 50L, "commit2"));
        arrayList.add(Triple.of("path/to/file5.json", 150L, "commit2"));
        Pair filterAndGenerateCheckpointBasedOnSourceLimit = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(generateDataset(arrayList), 50L, new QueryInfo(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1", "commit2", "_hoodie_commit_time", "s3.object.key", "s3.object.size"), new CloudObjectIncrCheckpoint("commit1", (String) null));
        Row row = (Row) ((Dataset) ((Option) filterAndGenerateCheckpointBasedOnSourceLimit.getRight()).get()).select("cumulativeSize", new String[0]).collectAsList().get(((int) ((Dataset) ((Option) filterAndGenerateCheckpointBasedOnSourceLimit.getRight()).get()).count()) - 1);
        Assertions.assertEquals("commit1#path/to/file1.json", ((CloudObjectIncrCheckpoint) filterAndGenerateCheckpointBasedOnSourceLimit.getKey()).toString());
        List collectAsList = ((Dataset) ((Option) filterAndGenerateCheckpointBasedOnSourceLimit.getRight()).get()).collectAsList();
        Assertions.assertEquals(1, collectAsList.size());
        Assertions.assertEquals("[[commit1,[[bucket-1],[path/to/file1.json,100]],100]]", collectAsList.toString());
        Assertions.assertEquals(100L, row.get(0));
    }

    @Test
    void testMultipleObjectExceedingSourceLimit() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Triple.of("path/to/file1.json", 100L, "commit1"));
        arrayList.add(Triple.of("path/to/file3.json", 200L, "commit1"));
        arrayList.add(Triple.of("path/to/file2.json", 150L, "commit1"));
        arrayList.add(Triple.of("path/to/file4.json", 50L, "commit2"));
        arrayList.add(Triple.of("path/to/file5.json", 150L, "commit2"));
        arrayList.add(Triple.of("path/to/file7.json", 100L, "commit3"));
        arrayList.add(Triple.of("path/to/file7.json", 250L, "commit3"));
        Dataset<Row> generateDataset = generateDataset(arrayList);
        QueryInfo queryInfo = new QueryInfo(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1", "commit2", "_hoodie_commit_time", "s3.object.key", "s3.object.size");
        Pair filterAndGenerateCheckpointBasedOnSourceLimit = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(generateDataset, 350L, queryInfo, new CloudObjectIncrCheckpoint("commit1", (String) null));
        Row row = (Row) ((Dataset) ((Option) filterAndGenerateCheckpointBasedOnSourceLimit.getRight()).get()).select("cumulativeSize", new String[0]).collectAsList().get(((int) ((Dataset) ((Option) filterAndGenerateCheckpointBasedOnSourceLimit.getRight()).get()).count()) - 1);
        Assertions.assertEquals("commit1#path/to/file2.json", ((CloudObjectIncrCheckpoint) filterAndGenerateCheckpointBasedOnSourceLimit.getKey()).toString());
        List collectAsList = ((Dataset) ((Option) filterAndGenerateCheckpointBasedOnSourceLimit.getRight()).get()).collectAsList();
        Assertions.assertEquals(2, collectAsList.size());
        Assertions.assertEquals("[[commit1,[[bucket-1],[path/to/file1.json,100]],100], [commit1,[[bucket-1],[path/to/file2.json,150]],250]]", collectAsList.toString());
        Assertions.assertEquals(250L, row.get(0));
        Pair filterAndGenerateCheckpointBasedOnSourceLimit2 = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(generateDataset, 550L, queryInfo, new CloudObjectIncrCheckpoint("commit1", (String) null));
        Row row2 = (Row) ((Dataset) ((Option) filterAndGenerateCheckpointBasedOnSourceLimit2.getRight()).get()).select("cumulativeSize", new String[0]).collectAsList().get(((int) ((Dataset) ((Option) filterAndGenerateCheckpointBasedOnSourceLimit2.getRight()).get()).count()) - 1);
        Assertions.assertEquals("commit2#path/to/file4.json", ((CloudObjectIncrCheckpoint) filterAndGenerateCheckpointBasedOnSourceLimit2.getKey()).toString());
        List collectAsList2 = ((Dataset) ((Option) filterAndGenerateCheckpointBasedOnSourceLimit2.getRight()).get()).collectAsList();
        Assertions.assertEquals(4, collectAsList2.size());
        Assertions.assertEquals("[[commit1,[[bucket-1],[path/to/file1.json,100]],100], [commit1,[[bucket-1],[path/to/file2.json,150]],250], [commit1,[[bucket-1],[path/to/file3.json,200]],450], [commit2,[[bucket-1],[path/to/file4.json,50]],500]]", collectAsList2.toString());
        Assertions.assertEquals(500L, row2.get(0));
    }

    @Test
    void testCatchAllObjects() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Triple.of("path/to/file1.json", 100L, "commit1"));
        arrayList.add(Triple.of("path/to/file3.json", 200L, "commit1"));
        arrayList.add(Triple.of("path/to/file2.json", 150L, "commit1"));
        arrayList.add(Triple.of("path/to/file4.json", 50L, "commit2"));
        arrayList.add(Triple.of("path/to/file5.json", 150L, "commit2"));
        arrayList.add(Triple.of("path/to/file8.json", 100L, "commit3"));
        arrayList.add(Triple.of("path/to/file6.json", 250L, "commit3"));
        arrayList.add(Triple.of("path/to/file7.json", 50L, "commit3"));
        Pair filterAndGenerateCheckpointBasedOnSourceLimit = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(generateDataset(arrayList), 1500L, new QueryInfo(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1", "commit2", "_hoodie_commit_time", "s3.object.key", "s3.object.size"), new CloudObjectIncrCheckpoint("commit1", (String) null));
        Row row = (Row) ((Dataset) ((Option) filterAndGenerateCheckpointBasedOnSourceLimit.getRight()).get()).select("cumulativeSize", new String[0]).collectAsList().get(((int) ((Dataset) ((Option) filterAndGenerateCheckpointBasedOnSourceLimit.getRight()).get()).count()) - 1);
        Assertions.assertEquals("commit3#path/to/file8.json", ((CloudObjectIncrCheckpoint) filterAndGenerateCheckpointBasedOnSourceLimit.getKey()).toString());
        Assertions.assertEquals(8, ((Dataset) ((Option) filterAndGenerateCheckpointBasedOnSourceLimit.getRight()).get()).collectAsList().size());
        Assertions.assertEquals(1050L, row.get(0));
    }

    @Test
    void testFileOrderingAcrossCommits() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Triple.of("path/to/file8.json", 100L, "commit3"));
        arrayList.add(Triple.of("path/to/file6.json", 250L, "commit3"));
        arrayList.add(Triple.of("path/to/file7.json", 50L, "commit3"));
        arrayList.add(Triple.of("path/to/file0.json", 100L, "commit4"));
        arrayList.add(Triple.of("path/to/file1.json", 50L, "commit4"));
        arrayList.add(Triple.of("path/to/file2.json", 50L, "commit4"));
        Dataset<Row> generateDataset = generateDataset(arrayList);
        QueryInfo queryInfo = new QueryInfo(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit3", "commit3", "commit4", "_hoodie_commit_time", "s3.object.key", "s3.object.size");
        Pair filterAndGenerateCheckpointBasedOnSourceLimit = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(generateDataset, 50L, queryInfo, new CloudObjectIncrCheckpoint("commit3", "path/to/file8.json"));
        Row row = (Row) ((Dataset) ((Option) filterAndGenerateCheckpointBasedOnSourceLimit.getRight()).get()).select("cumulativeSize", new String[0]).collectAsList().get(((int) ((Dataset) ((Option) filterAndGenerateCheckpointBasedOnSourceLimit.getRight()).get()).count()) - 1);
        Assertions.assertEquals("commit4#path/to/file0.json", ((CloudObjectIncrCheckpoint) filterAndGenerateCheckpointBasedOnSourceLimit.getKey()).toString());
        Assertions.assertEquals(1, ((Dataset) ((Option) filterAndGenerateCheckpointBasedOnSourceLimit.getRight()).get()).collectAsList().size());
        Assertions.assertEquals(100L, row.get(0));
        Pair filterAndGenerateCheckpointBasedOnSourceLimit2 = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(generateDataset, 350L, queryInfo, new CloudObjectIncrCheckpoint("commit3", "path/to/file8.json"));
        Row row2 = (Row) ((Dataset) ((Option) filterAndGenerateCheckpointBasedOnSourceLimit2.getRight()).get()).select("cumulativeSize", new String[0]).collectAsList().get(((int) ((Dataset) ((Option) filterAndGenerateCheckpointBasedOnSourceLimit2.getRight()).get()).count()) - 1);
        Assertions.assertEquals("commit4#path/to/file2.json", ((CloudObjectIncrCheckpoint) filterAndGenerateCheckpointBasedOnSourceLimit2.getKey()).toString());
        Assertions.assertEquals(3, ((Dataset) ((Option) filterAndGenerateCheckpointBasedOnSourceLimit2.getRight()).get()).collectAsList().size());
        Assertions.assertEquals(200L, row2.get(0));
    }

    @Test
    void testLastObjectInCommit() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Triple.of("path/to/file1.json", 100L, "commit1"));
        arrayList.add(Triple.of("path/to/file3.json", 200L, "commit1"));
        arrayList.add(Triple.of("path/to/file2.json", 150L, "commit1"));
        arrayList.add(Triple.of("path/to/file4.json", 50L, "commit2"));
        arrayList.add(Triple.of("path/to/file5.json", 150L, "commit2"));
        arrayList.add(Triple.of("path/to/file8.json", 100L, "commit3"));
        arrayList.add(Triple.of("path/to/file6.json", 250L, "commit3"));
        arrayList.add(Triple.of("path/to/file7.json", 50L, "commit3"));
        Pair filterAndGenerateCheckpointBasedOnSourceLimit = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(generateDataset(arrayList), 1500L, new QueryInfo(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1", "commit3", "_hoodie_commit_time", "s3.object.key", "s3.object.size"), new CloudObjectIncrCheckpoint("commit3", "path/to/file8.json"));
        Assertions.assertEquals("commit3#path/to/file8.json", ((CloudObjectIncrCheckpoint) filterAndGenerateCheckpointBasedOnSourceLimit.getKey()).toString());
        Assertions.assertTrue(!((Option) filterAndGenerateCheckpointBasedOnSourceLimit.getRight()).isPresent());
    }

    private HoodieRecord generateS3EventMetadata(String str, String str2, String str3, Long l) {
        Schema schema = S3_METADATA_SCHEMA;
        GenericData.Record record = new GenericData.Record(schema);
        Schema schema2 = (Schema) schema.getField("s3").schema().getTypes().get(1);
        GenericData.Record record2 = new GenericData.Record(schema2);
        GenericData.Record record3 = new GenericData.Record((Schema) schema2.getField("bucket").schema().getTypes().get(1));
        record3.put("name", str2);
        GenericData.Record record4 = new GenericData.Record((Schema) schema2.getField("object").schema().getTypes().get(1));
        record4.put("key", str3);
        record4.put("size", l);
        record2.put("bucket", record3);
        record2.put("object", record4);
        record.put("s3", record2);
        record.put("_hoodie_commit_time", str);
        return new HoodieAvroRecord(new HoodieKey(str3, str2), new HoodieAvroPayload(Option.of(record)));
    }

    private HoodieWriteConfig.Builder getConfigBuilder(String str, HoodieTableMetaClient hoodieTableMetaClient) {
        return HoodieWriteConfig.newBuilder().withPath(str).withSchema(S3_METADATA_SCHEMA.toString()).withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).forTable(hoodieTableMetaClient.getTableConfig().getTableName());
    }

    private HoodieWriteConfig getWriteConfig() {
        return getConfigBuilder(basePath(), this.metaClient).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).build();
    }

    private Pair<String, List<HoodieRecord>> writeS3MetadataRecords(String str) throws IOException {
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getWriteConfig());
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime(str);
                List asList = Arrays.asList(generateS3EventMetadata(str, "bucket-1", "data-file-1.json", (Long) 1L));
                org.apache.hudi.testutils.Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(asList, 1), str).collect());
                Pair<String, List<HoodieRecord>> of = Pair.of(str, asList);
                if (hoodieWriteClient != null) {
                    if (0 != 0) {
                        try {
                            hoodieWriteClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        hoodieWriteClient.close();
                    }
                }
                return of;
            } finally {
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testQueryInfoGeneration() throws IOException {
        writeS3MetadataRecords("1");
        writeS3MetadataRecords("2");
        QueryInfo generateQueryInfo = IncrSourceHelper.generateQueryInfo(this.jsc, basePath(), 5, Option.of("1"), (IncrSourceHelper.MissingCheckpointStrategy) null, TimelineUtils.HollowCommitHandling.BLOCK, "_hoodie_commit_time", "s3.object.key", "s3.object.size", true, Option.empty());
        Assertions.assertEquals(String.valueOf(Integer.parseInt("1") - 1), generateQueryInfo.getPreviousInstant());
        Assertions.assertEquals("1", generateQueryInfo.getStartInstant());
        Assertions.assertEquals("2", generateQueryInfo.getEndInstant());
        QueryInfo generateQueryInfo2 = IncrSourceHelper.generateQueryInfo(this.jsc, basePath(), 5, Option.of("2"), (IncrSourceHelper.MissingCheckpointStrategy) null, TimelineUtils.HollowCommitHandling.BLOCK, "_hoodie_commit_time", "s3.object.key", "s3.object.size", true, Option.empty());
        Assertions.assertEquals("1", generateQueryInfo2.getPreviousInstant());
        Assertions.assertEquals("2", generateQueryInfo2.getStartInstant());
        Assertions.assertEquals("2", generateQueryInfo2.getEndInstant());
    }
}
