package org.apache.hudi;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.testutils.SparkDatasetTestUtils;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.class */
public class TestHoodieMergeHandleWithSparkMerger extends SparkClientFunctionalTestHarness {
    private static final Schema SCHEMA = getAvroSchema("AvroSchema", "AvroSchemaNS");
    private HoodieTableMetaClient metaClient;

    /* loaded from: input_file:org/apache/hudi/TestHoodieMergeHandleWithSparkMerger$CustomMerger.class */
    public static class CustomMerger extends HoodieSparkRecordMerger {
        public boolean shouldFlush(HoodieRecord hoodieRecord, Schema schema, TypedProperties typedProperties) throws IOException {
            return !((InternalRow) ((HoodieSparkRecord) hoodieRecord).getData()).getString(0).equals("001");
        }
    }

    /* loaded from: input_file:org/apache/hudi/TestHoodieMergeHandleWithSparkMerger$CustomWriteConfig.class */
    public static class CustomWriteConfig extends TestHoodieWriteConfig {
        CustomWriteConfig(HoodieWriteConfig hoodieWriteConfig) {
            super(hoodieWriteConfig);
        }

        public HoodieRecordMerger getRecordMerger() {
            return new CustomMerger();
        }
    }

    /* loaded from: input_file:org/apache/hudi/TestHoodieMergeHandleWithSparkMerger$DefaultMerger.class */
    public static class DefaultMerger extends HoodieSparkRecordMerger {
        public boolean shouldFlush(HoodieRecord hoodieRecord, Schema schema, TypedProperties typedProperties) {
            return true;
        }
    }

    /* loaded from: input_file:org/apache/hudi/TestHoodieMergeHandleWithSparkMerger$DefaultWriteConfig.class */
    public static class DefaultWriteConfig extends TestHoodieWriteConfig {
        DefaultWriteConfig(HoodieWriteConfig hoodieWriteConfig) {
            super(hoodieWriteConfig);
        }

        public HoodieRecordMerger getRecordMerger() {
            return new DefaultMerger();
        }
    }

    /* loaded from: input_file:org/apache/hudi/TestHoodieMergeHandleWithSparkMerger$NoFlushMerger.class */
    public static class NoFlushMerger extends HoodieSparkRecordMerger {
        public boolean shouldFlush(HoodieRecord hoodieRecord, Schema schema, TypedProperties typedProperties) {
            return false;
        }
    }

    /* loaded from: input_file:org/apache/hudi/TestHoodieMergeHandleWithSparkMerger$NoFlushWriteConfig.class */
    public static class NoFlushWriteConfig extends TestHoodieWriteConfig {
        NoFlushWriteConfig(HoodieWriteConfig hoodieWriteConfig) {
            super(hoodieWriteConfig);
        }

        public HoodieRecordMerger getRecordMerger() {
            return new NoFlushMerger();
        }
    }

    /* loaded from: input_file:org/apache/hudi/TestHoodieMergeHandleWithSparkMerger$TestHoodieWriteConfig.class */
    public static class TestHoodieWriteConfig extends HoodieWriteConfig {
        TestHoodieWriteConfig(HoodieWriteConfig hoodieWriteConfig) {
            super(hoodieWriteConfig.getEngineType(), hoodieWriteConfig.getProps());
        }
    }

    public static String getPartitionPath() {
        return "2023-10-01";
    }

    @BeforeEach
    public void setUp() throws IOException {
        Properties properties = new Properties();
        properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), ((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).toString());
        properties.setProperty("hoodie.payload.ordering.field", HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName());
        this.metaClient = getHoodieMetaClient(hadoopConf(), basePath(), HoodieTableType.MERGE_ON_READ, properties);
    }

    @Test
    public void testDefaultMerger() throws Exception {
        DefaultWriteConfig buildDefaultWriteConfig = buildDefaultWriteConfig(SCHEMA);
        Assertions.assertTrue(buildDefaultWriteConfig.getRecordMerger() instanceof DefaultMerger);
        Assertions.assertTrue(buildDefaultWriteConfig.getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), false));
        insertAndUpdate(buildDefaultWriteConfig, 114);
    }

    @Test
    public void testNoFlushMerger() throws Exception {
        NoFlushWriteConfig buildNoFlushWriteConfig = buildNoFlushWriteConfig(SCHEMA);
        Assertions.assertTrue(buildNoFlushWriteConfig.getRecordMerger() instanceof NoFlushMerger);
        Assertions.assertTrue(buildNoFlushWriteConfig.getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), false));
        insertAndUpdate(buildNoFlushWriteConfig, 64);
    }

    @Test
    public void testCustomMerger() throws Exception {
        CustomWriteConfig buildCustomWriteConfig = buildCustomWriteConfig(SCHEMA);
        Assertions.assertTrue(buildCustomWriteConfig.getRecordMerger() instanceof CustomMerger);
        Assertions.assertTrue(buildCustomWriteConfig.getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), false));
        insertAndUpdate(buildCustomWriteConfig, 95);
    }

    public List<HoodieRecord> generateRecords(int i, String str) throws Exception {
        return (List) SparkDatasetTestUtils.toInternalRows(SparkDatasetTestUtils.getRandomRowsWithCommitTime(new SQLContext(jsc()), i, getPartitionPath(), false, str), SparkDatasetTestUtils.ENCODER).stream().map(internalRow -> {
            return new HoodieSparkRecord(new HoodieKey(internalRow.getString(2), internalRow.getString(3)), internalRow, SparkDatasetTestUtils.STRUCT_TYPE, false);
        }).collect(Collectors.toList());
    }

    public List<HoodieRecord> generateRecordUpdates(List<HoodieKey> list, String str) throws Exception {
        return (List) SparkDatasetTestUtils.toInternalRows(SparkDatasetTestUtils.getRandomRowsWithKeys(new SQLContext(jsc()), list, false, str), SparkDatasetTestUtils.ENCODER).stream().map(internalRow -> {
            return new HoodieSparkRecord(new HoodieKey(internalRow.getString(2), internalRow.getString(3)), internalRow, SparkDatasetTestUtils.STRUCT_TYPE, false);
        }).collect(Collectors.toList());
    }

    public List<HoodieRecord> generateEmptyRecords(List<HoodieKey> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<HoodieKey> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(new HoodieEmptyRecord(it.next(), HoodieOperation.DELETE, 1, HoodieRecord.HoodieRecordType.SPARK));
        }
        return arrayList;
    }

    public static List<HoodieKey> getKeys(List<HoodieRecord> list) {
        return (List) list.stream().map(hoodieRecord -> {
            return hoodieRecord.getKey();
        }).collect(Collectors.toList());
    }

    private static Schema getAvroSchema(String str, String str2) {
        return AvroConversionUtils.convertStructTypeToAvroSchema(SparkDatasetTestUtils.STRUCT_TYPE, str, str2);
    }

    public HoodieWriteConfig getWriteConfig(Schema schema) {
        Properties properties = new Properties();
        properties.setProperty(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), "org.apache.hudi.HoodieSparkRecordMerger");
        properties.setProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet");
        properties.setProperty("hoodie.payload.ordering.field", HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName());
        properties.setProperty(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true");
        properties.setProperty(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key(), "true");
        return getConfigBuilder(true).withPath(basePath()).withSchema(schema.toString()).withProperties(properties).build();
    }

    public DefaultWriteConfig buildDefaultWriteConfig(Schema schema) {
        return new DefaultWriteConfig(getWriteConfig(schema));
    }

    public NoFlushWriteConfig buildNoFlushWriteConfig(Schema schema) {
        return new NoFlushWriteConfig(getWriteConfig(schema));
    }

    public CustomWriteConfig buildCustomWriteConfig(Schema schema) {
        return new CustomWriteConfig(getWriteConfig(schema));
    }

    public HoodieTableFileSystemView getFileSystemView() {
        return new HoodieTableFileSystemView(this.metaClient, this.metaClient.getActiveTimeline());
    }

    public List<FileSlice> getLatestFileSlices(String str) {
        return (List) getFileSystemView().getLatestFileSlices(str).collect(Collectors.toList());
    }

    public Option<FileSlice> getLatestFileSlice(String str, String str2) {
        return getFileSystemView().getLatestFileSlice(str, str2);
    }

    public Option<HoodieBaseFile> getLatestBaseFile(String str, String str2) {
        return getLatestFileSlice(str, str2).map(fileSlice -> {
            return (HoodieBaseFile) fileSlice.getBaseFile().get();
        });
    }

    public List<HoodieLogFile> getLatestLogFiles(String str, String str2) {
        Option<FileSlice> latestFileSlice = getLatestFileSlice(str, str2);
        return latestFileSlice.isPresent() ? (List) ((FileSlice) latestFileSlice.get()).getLogFiles().collect(Collectors.toList()) : Collections.emptyList();
    }

    public List<String> getFileIds(String str) {
        return (List) getLatestFileSlices(str).stream().map(fileSlice -> {
            return fileSlice.getFileId();
        }).collect(Collectors.toList());
    }

    public void checkDataEquality(int i) {
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), "org.apache.hudi.HoodieSparkRecordMerger");
        hashMap.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet");
        hashMap.put("hoodie.payload.ordering.field", HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName());
        hashMap.put(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true");
        hashMap.put(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "true");
        hashMap.put(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key(), "true");
        Assertions.assertEquals(i, spark().read().options(hashMap).format("org.apache.hudi").load(basePath() + "/" + getPartitionPath()).collectAsList().size());
    }

    public void insertAndUpdate(HoodieWriteConfig hoodieWriteConfig, int i) throws Exception {
        HoodieTableMetaClient reload = HoodieTableMetaClient.reload(this.metaClient);
        Assertions.assertEquals(HoodieSparkTable.create(hoodieWriteConfig, context(), reload).getMetaClient().getTableType(), HoodieTableType.MERGE_ON_READ);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(hoodieWriteConfig);
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("001");
                List<HoodieRecord> generateRecords = generateRecords(100, "001");
                Assertions.assertTrue(insertRecordsToMORTable(reload, generateRecords, hoodieWriteClient, hoodieWriteConfig, "001").findAny().isPresent());
                Option lastInstant = reload.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
                Assertions.assertTrue(lastInstant.isPresent());
                Assertions.assertEquals("001", ((HoodieInstant) lastInstant.get()).getTimestamp(), "Delta commit should be specified value");
                List<String> fileIds = getFileIds(getPartitionPath());
                Assertions.assertEquals(1, fileIds.size());
                Assertions.assertTrue(getLatestBaseFile(getPartitionPath(), fileIds.get(0)).isPresent());
                Assertions.assertTrue(getLatestLogFiles(getPartitionPath(), fileIds.get(0)).isEmpty());
                checkDataEquality(100);
                hoodieWriteClient.startCommitWithTime("002");
                List<HoodieRecord> generateEmptyRecords = generateEmptyRecords(getKeys(generateRecords).subList(0, 17));
                List<HoodieRecord> generateRecordUpdates = generateRecordUpdates(getKeys(generateRecords).subList(17, 36), "001");
                List<HoodieRecord> generateRecords2 = generateRecords(31, "002");
                generateEmptyRecords.addAll(generateRecordUpdates);
                generateEmptyRecords.addAll(generateRecords2);
                Assertions.assertEquals(67, generateEmptyRecords.size());
                updateRecordsInMORTable(reload, generateEmptyRecords, hoodieWriteClient, hoodieWriteConfig, "002", false);
                Assertions.assertTrue(reload.getActiveTimeline().getDeltaCommitTimeline().lastInstant().isPresent());
                List<String> fileIds2 = getFileIds(getPartitionPath());
                Assertions.assertFalse(fileIds2.isEmpty());
                Assertions.assertEquals(1, fileIds2.size());
                Assertions.assertTrue(getLatestBaseFile(getPartitionPath(), fileIds2.get(0)).isPresent());
                checkDataEquality(i);
                hoodieWriteClient.startCommitWithTime("003");
                List<HoodieRecord> generateEmptyRecords2 = generateEmptyRecords(getKeys(generateRecords).subList(50, 59));
                Assertions.assertEquals(9, generateEmptyRecords2.size());
                updateRecordsInMORTable(reload, generateEmptyRecords2, hoodieWriteClient, hoodieWriteConfig, "003", false);
                checkDataEquality(i - 9);
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }
}
