package org.apache.hudi.table.functional;

import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLayoutConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.spark.api.java.JavaRDD;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("functional")
/* loaded from: input_file:org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.class */
public class TestSparkNonBlockingConcurrencyControl extends SparkClientFunctionalTestHarness {
    String jsonSchema = "{\n  \"type\": \"record\",\n  \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n  \"fields\": [\n    {\"name\": \"_hoodie_commit_time\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"_hoodie_commit_seqno\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"_hoodie_record_key\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"_hoodie_partition_path\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"_hoodie_file_name\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"name\", \"type\": [\"null\", \"string\"]},\n    {\"name\": \"age\", \"type\": [\"null\", \"int\"]},\n    {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n    {\"name\": \"part\", \"type\": [\"null\", \"string\"]}\n  ]\n}";
    private Schema schema;
    private HoodieTableMetaClient metaClient;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.hudi.table.functional.TestSparkNonBlockingConcurrencyControl$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hudi$common$model$WriteOperationType = new int[WriteOperationType.values().length];

        static {
            try {
                $SwitchMap$org$apache$hudi$common$model$WriteOperationType[WriteOperationType.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$model$WriteOperationType[WriteOperationType.UPSERT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$model$WriteOperationType[WriteOperationType.BULK_INSERT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @BeforeEach
    public void setUp() throws Exception {
        this.schema = new Schema.Parser().parse(this.jsonSchema);
    }

    @Test
    public void testNonBlockingConcurrencyControlWithPartialUpdatePayload() throws Exception {
        HoodieWriteConfig createHoodieWriteConfig = createHoodieWriteConfig();
        this.metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, (Properties) createHoodieWriteConfig.getProps());
        List<String> singletonList = Collections.singletonList("id1,Danny,,1,par1");
        SparkRDDWriteClient hoodieWriteClient = m59getHoodieWriteClient(createHoodieWriteConfig);
        String createNewInstantTime = hoodieWriteClient.createNewInstantTime();
        List<WriteStatus> writeData = writeData(hoodieWriteClient, createNewInstantTime, singletonList, false, WriteOperationType.INSERT);
        SparkRDDWriteClient hoodieWriteClient2 = m59getHoodieWriteClient(createHoodieWriteConfig);
        List<String> singletonList2 = Collections.singletonList("id1,,23,2,par1");
        String createNewInstantTime2 = hoodieWriteClient2.createNewInstantTime();
        List<WriteStatus> writeData2 = writeData(hoodieWriteClient2, createNewInstantTime2, singletonList2, false, WriteOperationType.INSERT);
        hoodieWriteClient.commitStats(createNewInstantTime, m58context().parallelize(writeData, 1), (List) writeData.stream().map((v0) -> {
            return v0.getStat();
        }).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        hoodieWriteClient2.commitStats(createNewInstantTime2, m58context().parallelize(writeData2, 1), (List) writeData2.stream().map((v0) -> {
            return v0.getStat();
        }).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        Assertions.assertFalse(fileExists(), "No base data files should have been created");
        hoodieWriteClient.compact((String) hoodieWriteClient.scheduleCompaction(Option.empty()).get());
        checkWrittenData(Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]"), 1);
    }

    @Test
    public void testNonBlockingConcurrencyControlWithInflightInstant() throws Exception {
        HoodieWriteConfig createHoodieWriteConfig = createHoodieWriteConfig();
        this.metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, (Properties) createHoodieWriteConfig.getProps());
        SparkRDDWriteClient hoodieWriteClient = m59getHoodieWriteClient(createHoodieWriteConfig);
        List<String> singletonList = Collections.singletonList("id1,Danny,,1,par1");
        String createNewInstantTime = hoodieWriteClient.createNewInstantTime();
        List<WriteStatus> writeData = writeData(hoodieWriteClient, createNewInstantTime, singletonList, false, WriteOperationType.INSERT);
        SparkRDDWriteClient hoodieWriteClient2 = m59getHoodieWriteClient(createHoodieWriteConfig);
        writeData(hoodieWriteClient2, hoodieWriteClient2.createNewInstantTime(), Collections.singletonList("id1,,23,2,par1"), false, WriteOperationType.INSERT);
        hoodieWriteClient.commitStats(createNewInstantTime, m58context().parallelize(writeData, 1), (List) writeData.stream().map((v0) -> {
            return v0.getStat();
        }).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        String str = (String) hoodieWriteClient.scheduleCompaction(Option.empty()).get();
        List<String> singletonList2 = Collections.singletonList("id3,Julian,53,4,par1");
        String createNewInstantTime2 = hoodieWriteClient.createNewInstantTime();
        List<WriteStatus> writeData2 = writeData(hoodieWriteClient, createNewInstantTime2, singletonList2, false, WriteOperationType.INSERT);
        hoodieWriteClient.commitStats(createNewInstantTime2, m58context().parallelize(writeData2, 1), (List) writeData2.stream().map((v0) -> {
            return v0.getStat();
        }).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        hoodieWriteClient.compact(str);
        checkWrittenData(Collections.singletonMap("par1", "[id1,par1,id1,Danny,null,1,par1]"), 1);
    }

    @Test
    public void testBulkInsertInMultiWriter() throws Exception {
        HoodieWriteConfig createHoodieWriteConfig = createHoodieWriteConfig();
        this.metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, (Properties) createHoodieWriteConfig.getProps());
        SparkRDDWriteClient hoodieWriteClient = m59getHoodieWriteClient(createHoodieWriteConfig);
        List<String> singletonList = Collections.singletonList("id1,Danny,,1,par1");
        String createNewInstantTime = hoodieWriteClient.createNewInstantTime();
        List<WriteStatus> writeData = writeData(hoodieWriteClient, createNewInstantTime, singletonList, false, WriteOperationType.INSERT);
        SparkRDDWriteClient hoodieWriteClient2 = m59getHoodieWriteClient(createHoodieWriteConfig);
        List<String> singletonList2 = Collections.singletonList("id1,,23,2,par1");
        String createNewInstantTime2 = hoodieWriteClient2.createNewInstantTime();
        List<WriteStatus> writeData2 = writeData(hoodieWriteClient2, createNewInstantTime2, singletonList2, false, WriteOperationType.BULK_INSERT);
        hoodieWriteClient.commitStats(createNewInstantTime, m58context().parallelize(writeData, 1), (List) writeData.stream().map((v0) -> {
            return v0.getStat();
        }).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        Assertions.assertThrows(HoodieWriteConflictException.class, () -> {
            hoodieWriteClient2.commitStats(createNewInstantTime2, m58context().parallelize(writeData2, 1), (List) writeData2.stream().map((v0) -> {
                return v0.getStat();
            }).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType());
        });
    }

    @Test
    public void testBulkInsertInSequence() throws Exception {
        HoodieWriteConfig createHoodieWriteConfig = createHoodieWriteConfig();
        this.metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, (Properties) createHoodieWriteConfig.getProps());
        SparkRDDWriteClient hoodieWriteClient = m59getHoodieWriteClient(createHoodieWriteConfig);
        writeData(hoodieWriteClient, hoodieWriteClient.createNewInstantTime(), Collections.singletonList("id1,Danny,,1,par1"), true, WriteOperationType.BULK_INSERT);
        SparkRDDWriteClient hoodieWriteClient2 = m59getHoodieWriteClient(createHoodieWriteConfig);
        writeData(hoodieWriteClient2, hoodieWriteClient2.createNewInstantTime(), Collections.singletonList("id1,,23,2,par1"), true, WriteOperationType.INSERT);
        hoodieWriteClient.compact((String) hoodieWriteClient.scheduleCompaction(Option.empty()).get());
        checkWrittenData(Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]"), 1);
    }

    private HoodieWriteConfig createHoodieWriteConfig() {
        Properties propertiesForKeyGen = getPropertiesForKeyGen(true);
        propertiesForKeyGen.put(HoodieTableConfig.TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
        return HoodieWriteConfig.newBuilder().forTable("test").withPath(basePath()).withSchema(this.jsonSchema).withParallelism(2, 2).withAutoCommit(false).withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadClass(PartialUpdateAvroPayload.class.getName()).withPayloadOrderingField("ts").build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(1024L).build()).withLayoutConfig(HoodieLayoutConfig.newBuilder().withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()).withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()).withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(propertiesForKeyGen).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).withPopulateMetaFields(true).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()).withWriteConcurrencyMode(WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL).withMarkersType(MarkerType.DIRECT.name()).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).withConflictResolutionStrategy(new BucketIndexConcurrentFileWritesConflictResolutionStrategy()).build()).build();
    }

    private void checkWrittenData(Map<String, String> map, int i) throws IOException {
        File file = this.tempDir.toFile();
        if (!$assertionsDisabled && !file.isDirectory()) {
            throw new AssertionError();
        }
        FileFilter fileFilter = file2 -> {
            return !file2.getName().startsWith(".");
        };
        File[] listFiles = file.listFiles(fileFilter);
        Assertions.assertNotNull(listFiles);
        MatcherAssert.assertThat(Integer.valueOf(listFiles.length), CoreMatchers.is(Integer.valueOf(i)));
        for (File file3 : listFiles) {
            File[] listFiles2 = file3.listFiles(fileFilter);
            Assertions.assertNotNull(listFiles2);
            ParquetReader build = AvroParquetReader.builder(new Path(((File) Arrays.stream(listFiles2).max(Comparator.comparing(file4 -> {
                return FSUtils.getCommitTime(file4.getName());
            })).orElse(listFiles2[0])).getAbsolutePath())).build();
            ArrayList arrayList = new ArrayList();
            Object read = build.read();
            while (true) {
                GenericRecord genericRecord = (GenericRecord) read;
                if (genericRecord != null) {
                    arrayList.add(filterOutVariables(genericRecord));
                    read = build.read();
                }
            }
            arrayList.sort(Comparator.naturalOrder());
            MatcherAssert.assertThat(arrayList.toString(), CoreMatchers.is(map.get(file3.getName())));
        }
    }

    private static String filterOutVariables(GenericRecord genericRecord) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(getFieldValue(genericRecord, "_hoodie_record_key"));
        arrayList.add(getFieldValue(genericRecord, "_hoodie_partition_path"));
        arrayList.add(getFieldValue(genericRecord, "id"));
        arrayList.add(getFieldValue(genericRecord, "name"));
        arrayList.add(getFieldValue(genericRecord, "age"));
        arrayList.add(genericRecord.get("ts").toString());
        arrayList.add(genericRecord.get("part").toString());
        return String.join(",", arrayList);
    }

    private static String getFieldValue(GenericRecord genericRecord, String str) {
        if (genericRecord.get(str) != null) {
            return genericRecord.get(str).toString();
        }
        return null;
    }

    private boolean fileExists() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.tempDir.toFile());
        while (!arrayList.isEmpty()) {
            for (File file : (File[]) Objects.requireNonNull(((File) arrayList.remove(0)).listFiles())) {
                if (!file.getName().startsWith(".")) {
                    if (!file.isDirectory()) {
                        return true;
                    }
                    arrayList.add(file);
                }
            }
        }
        return false;
    }

    private GenericRecord str2GenericRecord(String str) {
        GenericData.Record record = new GenericData.Record(this.schema);
        String[] split = str.split(",");
        ValidationUtils.checkArgument(split.length == 5, "Valid record must have 5 fields");
        record.put("id", StringUtils.isNullOrEmpty(split[0]) ? null : split[0]);
        record.put("name", StringUtils.isNullOrEmpty(split[1]) ? null : split[1]);
        record.put("age", StringUtils.isNullOrEmpty(split[2]) ? null : Integer.valueOf(Integer.parseInt(split[2])));
        record.put("ts", StringUtils.isNullOrEmpty(split[3]) ? null : Long.valueOf(Long.parseLong(split[3])));
        record.put("part", StringUtils.isNullOrEmpty(split[4]) ? null : split[4]);
        return record;
    }

    private List<HoodieRecord> str2HoodieRecord(List<String> list) {
        return (List) list.stream().map(str -> {
            GenericRecord str2GenericRecord = str2GenericRecord(str);
            return new HoodieAvroRecord(new HoodieKey((String) str2GenericRecord.get("id"), (String) str2GenericRecord.get("part")), new PartialUpdateAvroPayload(str2GenericRecord, (Long) str2GenericRecord.get("ts")));
        }).collect(Collectors.toList());
    }

    private List<WriteStatus> writeData(SparkRDDWriteClient sparkRDDWriteClient, String str, List<String> list, boolean z, WriteOperationType writeOperationType) {
        List<WriteStatus> collect;
        JavaRDD parallelize = jsc().parallelize(str2HoodieRecord(list), 2);
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        sparkRDDWriteClient.startCommitWithTime(str);
        switch (AnonymousClass1.$SwitchMap$org$apache$hudi$common$model$WriteOperationType[writeOperationType.ordinal()]) {
            case 1:
                collect = sparkRDDWriteClient.insert(parallelize, str).collect();
                break;
            case 2:
                collect = sparkRDDWriteClient.upsert(parallelize, str).collect();
                break;
            case 3:
                collect = sparkRDDWriteClient.bulkInsert(parallelize, str).collect();
                break;
            default:
                throw new UnsupportedOperationException(writeOperationType + " is not supported yet in this test!");
        }
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
        if (z) {
            Assertions.assertTrue(sparkRDDWriteClient.commitStats(str, m58context().parallelize(collect, 1), (List) collect.stream().map((v0) -> {
                return v0.getStat();
            }).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType()));
        }
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        return collect;
    }

    static {
        $assertionsDisabled = !TestSparkNonBlockingConcurrencyControl.class.desiredAssertionStatus();
    }
}
