package org.apache.hudi.table.action.commit;

import java.io.File;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.testutils.Transformations;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLayoutConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.execution.bulkinsert.TestBulkInsertInternalPartitioner;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.class */
public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase implements Serializable {
    private static final Logger LOG = LogManager.getLogger(TestCopyOnWriteActionExecutor.class);
    private static final Schema SCHEMA = SchemaTestUtil.getSchemaFromResource(TestCopyOnWriteActionExecutor.class, "/exampleSchema.avsc");

    private static final Stream<Arguments> indexType() {
        return Stream.of((Object[]) new HoodieIndex.IndexType[]{HoodieIndex.IndexType.BLOOM, HoodieIndex.IndexType.BUCKET}).map(obj -> {
            return Arguments.of(new Object[]{obj});
        });
    }

    @Test
    public void testMakeNewPath() {
        String uuid = UUID.randomUUID().toString();
        String str = "2016/05/04";
        String makeNewCommitTime = HoodieTestTable.makeNewCommitTime();
        HoodieWriteConfig makeHoodieClientConfig = makeHoodieClientConfig();
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        HoodieSparkTable create = HoodieSparkTable.create(makeHoodieClientConfig, this.context, this.metaClient);
        Pair pair = (Pair) this.jsc.parallelize(Arrays.asList(1)).map(num -> {
            HoodieRecord hoodieRecord = (HoodieRecord) Mockito.mock(HoodieRecord.class);
            Mockito.when(hoodieRecord.getPartitionPath()).thenReturn(str);
            return Pair.of(new HoodieCreateHandle(makeHoodieClientConfig, makeNewCommitTime, create, str, uuid, this.supplier).makeNewPath(hoodieRecord.getPartitionPath()), FSUtils.makeWriteToken(TaskContext.getPartitionId(), TaskContext.get().stageId(), TaskContext.get().taskAttemptId()));
        }).collect().get(0);
        Assertions.assertEquals(((Path) pair.getKey()).toString(), Paths.get(this.basePath, "2016/05/04", FSUtils.makeBaseFileName(makeNewCommitTime, (String) pair.getRight(), uuid)).toString());
    }

    private HoodieWriteConfig makeHoodieClientConfig() {
        return makeHoodieClientConfigBuilder().build();
    }

    private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() {
        return HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema(SCHEMA.toString()).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(timelineServicePort)).build());
    }

    private Properties makeIndexConfig(HoodieIndex.IndexType indexType) {
        Properties properties = new Properties();
        HoodieIndexConfig.Builder withIndexType = HoodieIndexConfig.newBuilder().withIndexType(indexType);
        if (indexType.equals(HoodieIndex.IndexType.BUCKET)) {
            properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
            withIndexType.fromProperties(properties).withIndexKeyField("_row_key").withBucketNum("1").withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE);
            properties.putAll(HoodieLayoutConfig.newBuilder().fromProperties(properties).withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()).withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build().getProps());
        }
        properties.putAll(withIndexType.build().getProps());
        return properties;
    }

    @MethodSource({"indexType"})
    @ParameterizedTest
    public void testUpdateRecords(HoodieIndex.IndexType indexType) throws Exception {
        HoodieWriteConfig build = makeHoodieClientConfigBuilder().withProps(makeIndexConfig(indexType)).build();
        String makeNewCommitTime = HoodieTestTable.makeNewCommitTime();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        hoodieWriteClient.startCommitWithTime(makeNewCommitTime);
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        HoodieSparkCopyOnWriteTable create = HoodieSparkTable.create(build, this.context, this.metaClient);
        ArrayList arrayList = new ArrayList();
        RawTripTestPayload rawTripTestPayload = new RawTripTestPayload("{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
        arrayList.add(new HoodieAvroRecord(new HoodieKey(rawTripTestPayload.getRowKey(), rawTripTestPayload.getPartitionPath()), rawTripTestPayload));
        RawTripTestPayload rawTripTestPayload2 = new RawTripTestPayload("{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}");
        arrayList.add(new HoodieAvroRecord(new HoodieKey(rawTripTestPayload2.getRowKey(), rawTripTestPayload2.getPartitionPath()), rawTripTestPayload2));
        RawTripTestPayload rawTripTestPayload3 = new RawTripTestPayload("{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}");
        arrayList.add(new HoodieAvroRecord(new HoodieKey(rawTripTestPayload3.getRowKey(), rawTripTestPayload3.getPartitionPath()), rawTripTestPayload3));
        hoodieWriteClient.insert(this.jsc.parallelize(arrayList, 1), makeNewCommitTime);
        FileStatus[] incrementalFiles = getIncrementalFiles("2016/01/31", "0", -1);
        Assertions.assertEquals(1, incrementalFiles.length);
        Path path = incrementalFiles[0].getPath();
        BloomFilter readBloomFilterFromMetadata = BaseFileUtils.getInstance(create.getBaseFileFormat()).readBloomFilterFromMetadata(this.hadoopConf, path);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(readBloomFilterFromMetadata.mightContain(((HoodieRecord) it.next()).getRecordKey()));
        }
        int i = 0;
        Iterator it2 = BaseFileUtils.getInstance(create.getBaseFileFormat()).readAvroRecords(this.hadoopConf, path).iterator();
        while (it2.hasNext()) {
            Assertions.assertEquals(((HoodieRecord) arrayList.get(i)).getRecordKey(), ((GenericRecord) it2.next()).get("_row_key").toString());
            i++;
        }
        RawTripTestPayload rawTripTestPayload4 = new RawTripTestPayload("{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}");
        HoodieRecord hoodieAvroRecord = new HoodieAvroRecord(new HoodieKey(rawTripTestPayload4.getRowKey(), rawTripTestPayload4.getPartitionPath()), rawTripTestPayload4);
        RawTripTestPayload rawTripTestPayload5 = new RawTripTestPayload("{\"_row_key\":\"8eb5b87d-1fej-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":51}");
        HoodieRecord hoodieAvroRecord2 = new HoodieAvroRecord(new HoodieKey(rawTripTestPayload5.getRowKey(), rawTripTestPayload5.getPartitionPath()), rawTripTestPayload5);
        List asList = Arrays.asList(hoodieAvroRecord, hoodieAvroRecord2);
        Thread.sleep(1000L);
        String makeNewCommitTime2 = HoodieTestTable.makeNewCommitTime();
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        hoodieWriteClient.startCommitWithTime(makeNewCommitTime2);
        List collect = hoodieWriteClient.upsert(this.jsc.parallelize(asList), makeNewCommitTime2).collect();
        FileStatus[] incrementalFiles2 = getIncrementalFiles("2016/01/31", makeNewCommitTime, -1);
        Assertions.assertEquals(1, incrementalFiles2.length);
        Assertions.assertEquals(FSUtils.getFileId(path.getName()), FSUtils.getFileId(incrementalFiles2[0].getPath().getName()));
        Path path2 = incrementalFiles2[0].getPath();
        BloomFilter readBloomFilterFromMetadata2 = BaseFileUtils.getInstance(this.metaClient).readBloomFilterFromMetadata(this.hadoopConf, path2);
        Iterator it3 = arrayList.iterator();
        while (it3.hasNext()) {
            Assertions.assertTrue(readBloomFilterFromMetadata2.mightContain(((HoodieRecord) it3.next()).getRecordKey()));
        }
        Assertions.assertTrue(readBloomFilterFromMetadata2.mightContain(hoodieAvroRecord2.getRecordKey()));
        arrayList.add(hoodieAvroRecord2);
        ParquetReader build2 = ParquetReader.builder(new AvroReadSupport(), path2).build();
        int i2 = 0;
        while (true) {
            GenericRecord genericRecord = (GenericRecord) build2.read();
            if (genericRecord == null) {
                build2.close();
                WriteStatus writeStatus = (WriteStatus) collect.get(0);
                Assertions.assertEquals(1, collect.size(), "Should be only one file generated");
                Assertions.assertEquals(4L, writeStatus.getStat().getNumWrites());
                return;
            }
            Assertions.assertEquals(genericRecord.get("_row_key").toString(), ((HoodieRecord) arrayList.get(i2)).getRecordKey());
            if (i2 == 0) {
                Assertions.assertEquals("15", genericRecord.get("number").toString());
            }
            i2++;
        }
    }

    private FileStatus[] getIncrementalFiles(String str, String str2, int i) throws Exception {
        HoodieParquetInputFormat hoodieParquetInputFormat = new HoodieParquetInputFormat();
        JobConf jobConf = new JobConf(this.hadoopConf);
        hoodieParquetInputFormat.setConf(jobConf);
        HoodieTestUtils.init(this.hadoopConf, this.basePath, HoodieTableType.COPY_ON_WRITE);
        setupIncremental(jobConf, str2, i);
        FileInputFormat.setInputPaths(jobConf, Paths.get(this.basePath, str).toString());
        return hoodieParquetInputFormat.listStatus(jobConf);
    }

    private void setupIncremental(JobConf jobConf, String str, int i) {
        jobConf.set(String.format("hoodie.%s.consume.mode", "raw_trips"), "INCREMENTAL");
        jobConf.set(String.format("hoodie.%s.consume.start.timestamp", "raw_trips"), str);
        jobConf.setInt(String.format("hoodie.%s.consume.max.commits", "raw_trips"), i);
    }

    private List<HoodieRecord> newHoodieRecords(int i, String str) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            RawTripTestPayload rawTripTestPayload = new RawTripTestPayload(String.format("{\"_row_key\":\"%s\",\"time\":\"%s\",\"number\":%d}", UUID.randomUUID().toString(), str, Integer.valueOf(i2)));
            arrayList.add(new HoodieAvroRecord(new HoodieKey(rawTripTestPayload.getRowKey(), rawTripTestPayload.getPartitionPath()), rawTripTestPayload));
        }
        return arrayList;
    }

    @Test
    public void testMetadataAggregateFromWriteStatus() throws Exception {
        HoodieWriteConfig build = makeHoodieClientConfigBuilder().withWriteStatusClass(MetadataMergeWriteStatus.class).build();
        String makeNewCommitTime = HoodieTestTable.makeNewCommitTime();
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        HoodieSparkCopyOnWriteTable create = HoodieSparkTable.create(build, this.context, this.metaClient);
        ArrayList arrayList = new ArrayList();
        RawTripTestPayload rawTripTestPayload = new RawTripTestPayload("{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
        arrayList.add(new HoodieAvroRecord(new HoodieKey(rawTripTestPayload.getRowKey(), rawTripTestPayload.getPartitionPath()), rawTripTestPayload));
        RawTripTestPayload rawTripTestPayload2 = new RawTripTestPayload("{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}");
        arrayList.add(new HoodieAvroRecord(new HoodieKey(rawTripTestPayload2.getRowKey(), rawTripTestPayload2.getPartitionPath()), rawTripTestPayload2));
        RawTripTestPayload rawTripTestPayload3 = new RawTripTestPayload("{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}");
        arrayList.add(new HoodieAvroRecord(new HoodieKey(rawTripTestPayload3.getRowKey(), rawTripTestPayload3.getPartitionPath()), rawTripTestPayload3));
        SparkInsertCommitActionExecutor sparkInsertCommitActionExecutor = new SparkInsertCommitActionExecutor(this.context, build, create, makeNewCommitTime, this.context.parallelize(arrayList));
        Map mergeMetadataForWriteStatuses = MetadataMergeWriteStatus.mergeMetadataForWriteStatuses(this.jsc.parallelize(Arrays.asList(1)).map(num -> {
            return sparkInsertCommitActionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), arrayList.iterator());
        }).flatMap(Transformations::flattenAsIterator).collect());
        Assertions.assertTrue(mergeMetadataForWriteStatuses.containsKey("InputRecordCount_1506582000"));
        Assertions.assertEquals("6", mergeMetadataForWriteStatuses.get("InputRecordCount_1506582000"));
    }

    private void verifyStatusResult(List<WriteStatus> list, Map<String, Long> map) {
        HashMap hashMap = new HashMap();
        for (int i = 0; i < list.size(); i++) {
            WriteStatus writeStatus = list.get(i);
            String partitionPath = writeStatus.getPartitionPath();
            hashMap.put(partitionPath, Long.valueOf(((Long) hashMap.getOrDefault(partitionPath, 0L)).longValue() + writeStatus.getTotalRecords()));
            Assertions.assertEquals(0, writeStatus.getFailedRecords().size());
        }
        Assertions.assertEquals(map, hashMap);
    }

    @Test
    public void testInsertRecords() throws Exception {
        HoodieWriteConfig makeHoodieClientConfig = makeHoodieClientConfig();
        String makeNewCommitTime = HoodieTestTable.makeNewCommitTime();
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        HoodieSparkCopyOnWriteTable create = HoodieSparkTable.create(makeHoodieClientConfig, this.context, this.metaClient);
        List<HoodieRecord> newHoodieRecords = newHoodieRecords(10, "2016-01-31T03:16:41.415Z");
        newHoodieRecords.addAll(newHoodieRecords(1, "2016-02-01T03:16:41.415Z"));
        SparkInsertPreppedCommitActionExecutor sparkInsertPreppedCommitActionExecutor = new SparkInsertPreppedCommitActionExecutor(this.context, makeHoodieClientConfig, create, makeNewCommitTime, this.context.parallelize(newHoodieRecords));
        List<WriteStatus> collect = this.jsc.parallelize(Arrays.asList(1)).map(num -> {
            return sparkInsertPreppedCommitActionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), newHoodieRecords.iterator());
        }).flatMap(Transformations::flattenAsIterator).collect();
        Assertions.assertEquals(2, collect.size());
        HashMap hashMap = new HashMap();
        hashMap.put("2016/01/31", 10L);
        hashMap.put("2016/02/01", 1L);
        verifyStatusResult(collect, hashMap);
        List<HoodieRecord> newHoodieRecords2 = newHoodieRecords(1, "2016-01-31T03:16:41.415Z");
        newHoodieRecords2.addAll(newHoodieRecords(5, "2016-02-01T03:16:41.415Z"));
        newHoodieRecords2.addAll(newHoodieRecords(1, "2016-02-02T03:16:41.415Z"));
        SparkUpsertPreppedCommitActionExecutor sparkUpsertPreppedCommitActionExecutor = new SparkUpsertPreppedCommitActionExecutor(this.context, makeHoodieClientConfig, create, makeNewCommitTime, this.context.parallelize(newHoodieRecords2));
        List<WriteStatus> collect2 = this.jsc.parallelize(Arrays.asList(1)).map(num2 -> {
            return sparkUpsertPreppedCommitActionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), newHoodieRecords2.iterator());
        }).flatMap(Transformations::flattenAsIterator).collect();
        Assertions.assertEquals(3, collect2.size());
        hashMap.clear();
        hashMap.put("2016/01/31", 1L);
        hashMap.put("2016/02/01", 5L);
        hashMap.put("2016/02/02", 1L);
        verifyStatusResult(collect2, hashMap);
    }

    @Test
    public void testFileSizeUpsertRecords() throws Exception {
        HoodieWriteConfig build = makeHoodieClientConfigBuilder().withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(65536L).hfileMaxFileSize(65536L).parquetBlockSize(65536).parquetPageSize(65536).build()).build();
        String makeNewCommitTime = HoodieTestTable.makeNewCommitTime();
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        HoodieSparkCopyOnWriteTable create = HoodieSparkTable.create(build, this.context, this.metaClient);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2050; i++) {
            RawTripTestPayload rawTripTestPayload = new RawTripTestPayload("{\"_row_key\":\"" + UUID.randomUUID().toString() + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":" + i + "}");
            arrayList.add(new HoodieAvroRecord(new HoodieKey(rawTripTestPayload.getRowKey(), rawTripTestPayload.getPartitionPath()), rawTripTestPayload));
        }
        SparkUpsertCommitActionExecutor sparkUpsertCommitActionExecutor = new SparkUpsertCommitActionExecutor(this.context, build, create, makeNewCommitTime, this.context.parallelize(arrayList));
        this.jsc.parallelize(Arrays.asList(1)).map(num -> {
            return sparkUpsertCommitActionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), arrayList.iterator());
        }).map(Transformations::flatten).collect();
        int i2 = 0;
        for (File file : Paths.get(this.basePath, "2016/01/31").toFile().listFiles()) {
            if (file.getName().endsWith(create.getBaseFileExtension()) && FSUtils.getCommitTime(file.getName()).equals(makeNewCommitTime)) {
                LOG.info(file.getName() + "-" + file.length());
                i2++;
            }
        }
        Assertions.assertEquals(3, i2, "If the number of records are more than 1150, then there should be a new file");
    }

    @Test
    public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
        HoodieWriteConfig build = HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema(SchemaTestUtil.getSchemaFromResource(TestCopyOnWriteActionExecutor.class, "/testDataGeneratorSchema.txt").toString()).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(timelineServicePort)).build()).withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(1024000L).hfileMaxFileSize(1024000L).build()).build();
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        HoodieSparkCopyOnWriteTable create = HoodieSparkTable.create(build, this.context, this.metaClient);
        List generateInsertsWithHoodieAvroPayload = this.dataGen.generateInsertsWithHoodieAvroPayload("000", 100);
        SparkInsertCommitActionExecutor sparkInsertCommitActionExecutor = new SparkInsertCommitActionExecutor(this.context, build, create, "000", this.context.parallelize(generateInsertsWithHoodieAvroPayload));
        WriteStatus writeStatus = (WriteStatus) ((List) this.jsc.parallelize(Arrays.asList(1)).map(num -> {
            return sparkInsertCommitActionExecutor.handleInsert(UUID.randomUUID().toString(), generateInsertsWithHoodieAvroPayload.iterator());
        }).map(Transformations::flatten).collect().get(0)).get(0);
        String fileId = writeStatus.getFileId();
        this.metaClient.getFs().create(new Path(Paths.get(this.basePath, ".hoodie", "000.commit").toString())).close();
        List generateUpdatesWithHoodieAvroPayload = this.dataGen.generateUpdatesWithHoodieAvroPayload("000", generateInsertsWithHoodieAvroPayload);
        String partitionPath = writeStatus.getPartitionPath();
        long count = generateUpdatesWithHoodieAvroPayload.stream().filter(hoodieRecord -> {
            return hoodieRecord.getPartitionPath().equals(partitionPath);
        }).count();
        SparkUpsertCommitActionExecutor sparkUpsertCommitActionExecutor = new SparkUpsertCommitActionExecutor(this.context, build, HoodieSparkTable.create(build, this.context, HoodieTableMetaClient.reload(this.metaClient)), "000", this.context.parallelize(generateUpdatesWithHoodieAvroPayload));
        Assertions.assertEquals(generateUpdatesWithHoodieAvroPayload.size() - count, ((WriteStatus) ((List) this.jsc.parallelize(Arrays.asList(1)).map(num2 -> {
            return sparkUpsertCommitActionExecutor.handleUpdate(partitionPath, fileId, generateUpdatesWithHoodieAvroPayload.iterator());
        }).map(Transformations::flatten).collect().get(0)).get(0)).getTotalErrorRecords());
    }

    private void testBulkInsertRecords(String str) {
        HoodieWriteConfig build = HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withBulkInsertParallelism(2).withBulkInsertSortMode(str).build();
        String makeNewCommitTime = HoodieTestTable.makeNewCommitTime();
        getHoodieWriteClient(build).startCommitWithTime(makeNewCommitTime);
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        HoodieSparkCopyOnWriteTable create = HoodieSparkTable.create(build, this.context, this.metaClient);
        JavaRDD<HoodieRecord> generateTestRecordsForBulkInsert = TestBulkInsertInternalPartitioner.generateTestRecordsForBulkInsert(this.jsc);
        verifyStatusResult(((HoodieData) new SparkBulkInsertCommitActionExecutor(this.context, build, create, makeNewCommitTime, HoodieJavaRDD.of(generateTestRecordsForBulkInsert), Option.empty()).execute().getWriteStatuses()).collectAsList(), TestBulkInsertInternalPartitioner.generateExpectedPartitionNumRecords(generateTestRecordsForBulkInsert));
    }

    @ValueSource(strings = {"global_sort", "partition_sort", "none"})
    @ParameterizedTest(name = "[{index}] {0}")
    public void testBulkInsertRecordsWithGlobalSort(String str) throws Exception {
        testBulkInsertRecords(str);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testPartitionMetafileFormat(boolean z) throws Exception {
        HoodieWriteConfig build = HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").build();
        HoodieSparkCopyOnWriteTable create = HoodieSparkTable.create(build, this.context, this.metaClient);
        Assertions.assertFalse(create.getPartitionMetafileFormat().isPresent());
        if (z) {
            Properties properties = new Properties();
            properties.setProperty(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), "true");
            initMetaClient(HoodieTableType.COPY_ON_WRITE, properties);
            this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
            Assertions.assertTrue(this.metaClient.getTableConfig().getPartitionMetafileFormat().isPresent());
            create = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(build, this.context, this.metaClient);
            Assertions.assertTrue(create.getPartitionMetafileFormat().isPresent());
        }
        String makeNewCommitTime = HoodieTestTable.makeNewCommitTime();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        hoodieWriteClient.startCommitWithTime(makeNewCommitTime);
        hoodieWriteClient.bulkInsert(TestBulkInsertInternalPartitioner.generateTestRecordsForBulkInsert(this.jsc, 50), makeNewCommitTime);
        Path path = new Path(this.basePath, "2016/03/15");
        Assertions.assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(this.fs, path));
        Option partitionMetafilePath = HoodiePartitionMetadata.getPartitionMetafilePath(this.fs, path);
        if (z) {
            Assertions.assertTrue(((Path) partitionMetafilePath.get()).toString().endsWith(create.getBaseFileFormat().getFileExtension()));
        } else {
            Assertions.assertTrue(((Path) partitionMetafilePath.get()).toString().endsWith(".hoodie_partition_metadata"));
        }
        HoodiePartitionMetadata hoodiePartitionMetadata = new HoodiePartitionMetadata(this.fs, path);
        hoodiePartitionMetadata.readFromFS();
        Assertions.assertTrue(hoodiePartitionMetadata.getPartitionDepth() == 3);
        Assertions.assertTrue(((String) hoodiePartitionMetadata.readPartitionCreatedCommitTime().get()).equals(makeNewCommitTime));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2140010494:
                if (implMethodName.equals("lambda$testMetadataAggregateFromWriteStatus$d2d6cc3$1")) {
                    z = false;
                    break;
                }
                break;
            case -1269911580:
                if (implMethodName.equals("flattenAsIterator")) {
                    z = 6;
                    break;
                }
                break;
            case -1110556571:
                if (implMethodName.equals("lambda$testInsertRecords$5c7863df$1")) {
                    z = 7;
                    break;
                }
                break;
            case -1029510630:
                if (implMethodName.equals("lambda$testInsertUpsertWithHoodieAvroPayload$21b27613$1")) {
                    z = 5;
                    break;
                }
                break;
            case -778804732:
                if (implMethodName.equals("flatten")) {
                    z = true;
                    break;
                }
                break;
            case -773700851:
                if (implMethodName.equals("lambda$testMakeNewPath$d4eb593f$1")) {
                    z = 8;
                    break;
                }
                break;
            case -95418444:
                if (implMethodName.equals("lambda$testInsertUpsertWithHoodieAvroPayload$7e0db4b8$1")) {
                    z = 4;
                    break;
                }
                break;
            case 841682905:
                if (implMethodName.equals("lambda$testFileSizeUpsertRecords$a0c2d401$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1249200916:
                if (implMethodName.equals("lambda$testInsertRecords$58cec836$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor;Ljava/util/List;Ljava/lang/Integer;)Ljava/util/Iterator;")) {
                    BaseSparkCommitActionExecutor baseSparkCommitActionExecutor = (BaseSparkCommitActionExecutor) serializedLambda.getCapturedArg(0);
                    List list = (List) serializedLambda.getCapturedArg(1);
                    return num -> {
                        return baseSparkCommitActionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), list.iterator());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/common/testutils/Transformations") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Iterator;)Ljava/util/List;")) {
                    return Transformations::flatten;
                }
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/common/testutils/Transformations") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Iterator;)Ljava/util/List;")) {
                    return Transformations::flatten;
                }
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/common/testutils/Transformations") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Iterator;)Ljava/util/List;")) {
                    return Transformations::flatten;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor;Ljava/util/List;Ljava/lang/Integer;)Ljava/util/Iterator;")) {
                    BaseSparkCommitActionExecutor baseSparkCommitActionExecutor2 = (BaseSparkCommitActionExecutor) serializedLambda.getCapturedArg(0);
                    List list2 = (List) serializedLambda.getCapturedArg(1);
                    return num2 -> {
                        return baseSparkCommitActionExecutor2.handleInsert(FSUtils.createNewFileIdPfx(), list2.iterator());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor;Ljava/util/List;Ljava/lang/Integer;)Ljava/util/Iterator;")) {
                    BaseSparkCommitActionExecutor baseSparkCommitActionExecutor3 = (BaseSparkCommitActionExecutor) serializedLambda.getCapturedArg(0);
                    List list3 = (List) serializedLambda.getCapturedArg(1);
                    return num3 -> {
                        return baseSparkCommitActionExecutor3.handleInsert(FSUtils.createNewFileIdPfx(), list3.iterator());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor;Ljava/util/List;Ljava/lang/Integer;)Ljava/util/Iterator;")) {
                    BaseSparkCommitActionExecutor baseSparkCommitActionExecutor4 = (BaseSparkCommitActionExecutor) serializedLambda.getCapturedArg(0);
                    List list4 = (List) serializedLambda.getCapturedArg(1);
                    return num4 -> {
                        return baseSparkCommitActionExecutor4.handleInsert(UUID.randomUUID().toString(), list4.iterator());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor;Ljava/lang/String;Ljava/lang/String;Ljava/util/List;Ljava/lang/Integer;)Ljava/util/Iterator;")) {
                    BaseSparkCommitActionExecutor baseSparkCommitActionExecutor5 = (BaseSparkCommitActionExecutor) serializedLambda.getCapturedArg(0);
                    String str = (String) serializedLambda.getCapturedArg(1);
                    String str2 = (String) serializedLambda.getCapturedArg(2);
                    List list5 = (List) serializedLambda.getCapturedArg(3);
                    return num22 -> {
                        return baseSparkCommitActionExecutor5.handleUpdate(str, str2, list5.iterator());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/hudi/common/testutils/Transformations") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    return Transformations::flattenAsIterator;
                }
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/hudi/common/testutils/Transformations") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    return Transformations::flattenAsIterator;
                }
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/hudi/common/testutils/Transformations") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    return Transformations::flattenAsIterator;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor;Ljava/util/List;Ljava/lang/Integer;)Ljava/util/Iterator;")) {
                    BaseSparkCommitActionExecutor baseSparkCommitActionExecutor6 = (BaseSparkCommitActionExecutor) serializedLambda.getCapturedArg(0);
                    List list6 = (List) serializedLambda.getCapturedArg(1);
                    return num23 -> {
                        return baseSparkCommitActionExecutor6.handleInsert(FSUtils.createNewFileIdPfx(), list6.iterator());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/hudi/config/HoodieWriteConfig;Ljava/lang/String;Lorg/apache/hudi/table/HoodieTable;Ljava/lang/String;Ljava/lang/Integer;)Lorg/apache/hudi/common/util/collection/Pair;")) {
                    TestCopyOnWriteActionExecutor testCopyOnWriteActionExecutor = (TestCopyOnWriteActionExecutor) serializedLambda.getCapturedArg(0);
                    String str3 = (String) serializedLambda.getCapturedArg(1);
                    HoodieWriteConfig hoodieWriteConfig = (HoodieWriteConfig) serializedLambda.getCapturedArg(2);
                    String str4 = (String) serializedLambda.getCapturedArg(3);
                    HoodieTable hoodieTable = (HoodieTable) serializedLambda.getCapturedArg(4);
                    String str5 = (String) serializedLambda.getCapturedArg(5);
                    return num5 -> {
                        HoodieRecord hoodieRecord = (HoodieRecord) Mockito.mock(HoodieRecord.class);
                        Mockito.when(hoodieRecord.getPartitionPath()).thenReturn(str3);
                        return Pair.of(new HoodieCreateHandle(hoodieWriteConfig, str4, hoodieTable, str3, str5, this.supplier).makeNewPath(hoodieRecord.getPartitionPath()), FSUtils.makeWriteToken(TaskContext.getPartitionId(), TaskContext.get().stageId(), TaskContext.get().taskAttemptId()));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
