package org.apache.hudi;

import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLayoutConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("functional")
/* loaded from: input_file:org/apache/hudi/TestDataSourceReadWithDeletes.class */
public class TestDataSourceReadWithDeletes extends SparkClientFunctionalTestHarness {
    String jsonSchema = "{\n  \"type\": \"record\",\n  \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n  \"fields\": [\n    {\"name\": \"_hoodie_commit_time\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"_hoodie_commit_seqno\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"_hoodie_record_key\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"_hoodie_partition_path\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"_hoodie_file_name\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"_hoodie_operation\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"name\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"age\", \"type\": [\"null\", \"int\"]},\n    {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n    {\"name\": \"part\", \"type\": [\"null\", \"string\"]}\n  ]\n}";
    private Schema schema;
    private HoodieTableMetaClient metaClient;

    @BeforeEach
    public void setUp() {
        this.schema = new Schema.Parser().parse(this.jsonSchema);
    }

    @Test
    public void test() throws Exception {
        HoodieWriteConfig createHoodieWriteConfig = createHoodieWriteConfig();
        this.metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, createHoodieWriteConfig.getProps());
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(createHoodieWriteConfig);
        String createNewInstantTime = hoodieWriteClient.createNewInstantTime();
        hoodieWriteClient.commit(createNewInstantTime, jsc().parallelize(writeData(hoodieWriteClient, createNewInstantTime, new String[]{"I,id1,Danny,23,1,par1", "I,id2,Tony,20,1,par1"})));
        String createNewInstantTime2 = hoodieWriteClient.createNewInstantTime();
        hoodieWriteClient.commit(createNewInstantTime2, jsc().parallelize(writeData(hoodieWriteClient, createNewInstantTime2, new String[]{"I,id1,Danny,30,2,par1", "D,id2,Tony,20,2,par1", "I,id3,Julian,40,2,par1", "D,id4,Stephan,35,2,par1"})));
        List collectAsList = spark().read().format("org.apache.hudi").option("hoodie.datasource.query.type", "snapshot").load(createHoodieWriteConfig.getBasePath() + "/*/*").select("id", new String[]{"name", "age", "ts", "part"}).collectAsList();
        Assertions.assertEquals(2, collectAsList.size());
        Assertions.assertArrayEquals(new String[]{"[id1,Danny,30,2,par1]", "[id3,Julian,40,2,par1]"}, collectAsList.stream().map((v0) -> {
            return v0.toString();
        }).sorted().toArray(i -> {
            return new String[i];
        }));
    }

    private HoodieWriteConfig createHoodieWriteConfig() {
        Properties propertiesForKeyGen = getPropertiesForKeyGen(true);
        propertiesForKeyGen.put(HoodieTableConfig.TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
        return HoodieWriteConfig.newBuilder().forTable("test").withPath(basePath()).withSchema(this.jsonSchema).withParallelism(2, 2).withAutoCommit(false).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(1024L).build()).withLayoutConfig(HoodieLayoutConfig.newBuilder().withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()).withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()).withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(propertiesForKeyGen).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).withPopulateMetaFields(true).withAllowOperationMetadataField(true).withMarkersType(MarkerType.DIRECT.name()).build();
    }

    private List<WriteStatus> writeData(SparkRDDWriteClient sparkRDDWriteClient, String str, String[] strArr) {
        JavaRDD parallelize = jsc().parallelize(str2HoodieRecord(strArr), 2);
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        sparkRDDWriteClient.startCommitWithTime(str);
        List<WriteStatus> collect = sparkRDDWriteClient.upsert(parallelize, str).collect();
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        return collect;
    }

    private List<HoodieRecord> str2HoodieRecord(String[] strArr) {
        return (List) Stream.of((Object[]) strArr).map(str -> {
            String[] split = str.split(",");
            boolean equalsIgnoreCase = split[0].equalsIgnoreCase("D");
            GenericData.Record record = new GenericData.Record(this.schema);
            record.put("id", split[1]);
            record.put("name", split[2]);
            record.put("age", Integer.valueOf(Integer.parseInt(split[3])));
            record.put("ts", Long.valueOf(Long.parseLong(split[4])));
            record.put("part", split[5]);
            return new HoodieAvroRecord(new HoodieKey((String) record.get("id"), (String) record.get("part")), new OverwriteWithLatestAvroPayload(record, (Long) record.get("ts")), equalsIgnoreCase ? HoodieOperation.DELETE : HoodieOperation.INSERT);
        }).collect(Collectors.toList());
    }
}
