package org.apache.hudi.client.functional;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
import org.apache.hudi.io.storage.HoodieSparkIOFactory;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.class */
public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {
    private static final Logger LOG = LoggerFactory.getLogger(TestHoodieBackedTableMetadata.class);

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testTableOperations(boolean z) throws Exception {
        init(HoodieTableType.COPY_ON_WRITE);
        doWriteInsertAndUpsert(testTable);
        doWriteOperation(testTable, "0000003");
        verifyBaseMetadataTable(z);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testMultiReaderForHoodieBackedTableMetadata(boolean z) throws Exception {
        init(HoodieTableType.COPY_ON_WRITE);
        testTable.doWriteOperation("000001", WriteOperationType.INSERT, Collections.emptyList(), Arrays.asList("p1"), 1);
        final HoodieBackedTableMetadata hoodieBackedTableMetadata = new HoodieBackedTableMetadata(this.context, this.storage, this.writeConfig.getMetadataConfig(), this.writeConfig.getBasePath(), z);
        Assertions.assertTrue(hoodieBackedTableMetadata.enabled());
        final String str = this.basePath + "/" + ((String) hoodieBackedTableMetadata.getAllPartitionPaths().get(0));
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        for (int i = 0; i < 3; i++) {
            newFixedThreadPool.submit(new Runnable() { // from class: org.apache.hudi.client.functional.TestHoodieBackedTableMetadata.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        countDownLatch.countDown();
                        countDownLatch.await();
                        List allFilesInPartition = hoodieBackedTableMetadata.getAllFilesInPartition(new StoragePath(str));
                        if (allFilesInPartition.size() != 1) {
                            TestHoodieBackedTableMetadata.LOG.warn("Miss match data file numbers.");
                            throw new RuntimeException("Miss match data file numbers.");
                        }
                        atomicInteger.addAndGet(allFilesInPartition.size());
                    } catch (Exception e) {
                        TestHoodieBackedTableMetadata.LOG.warn("Catch Exception while reading data files from MDT.", e);
                        atomicBoolean.compareAndSet(false, true);
                    }
                }
            });
        }
        newFixedThreadPool.shutdown();
        newFixedThreadPool.awaitTermination(5L, TimeUnit.MINUTES);
        Assertions.assertFalse(atomicBoolean.get());
        Assertions.assertEquals(atomicInteger.get(), 3);
    }

    private void doWriteInsertAndUpsert(HoodieTestTable hoodieTestTable) throws Exception {
        doWriteInsertAndUpsert(hoodieTestTable, "0000001", "0000002", false);
    }

    private void verifyBaseMetadataTable(boolean z) throws IOException {
        HoodieBackedTableMetadata hoodieBackedTableMetadata = new HoodieBackedTableMetadata(this.context, this.storage, this.writeConfig.getMetadataConfig(), this.writeConfig.getBasePath(), z);
        Assertions.assertTrue(hoodieBackedTableMetadata.enabled());
        List allPartitionPaths = testTable.getAllPartitionPaths();
        ArrayList arrayList = new ArrayList();
        allPartitionPaths.forEach(path -> {
            arrayList.add(path.getFileName().toString());
        });
        List allPartitionPaths2 = hoodieBackedTableMetadata.getAllPartitionPaths();
        Collections.sort(arrayList);
        Collections.sort(allPartitionPaths2);
        Assertions.assertEquals(arrayList.size(), allPartitionPaths2.size(), "Partitions should match");
        Assertions.assertEquals(arrayList, allPartitionPaths2, "Partitions should match");
        SyncableFileSystemView hoodieView = HoodieSparkTable.create(this.writeConfig, this.context).getHoodieView();
        Map allFilesInPartitions = hoodieBackedTableMetadata.getAllFilesInPartitions((List) arrayList.stream().map(str -> {
            return this.basePath + "/" + str;
        }).collect(Collectors.toList()));
        Assertions.assertEquals(arrayList.size(), allFilesInPartitions.size());
        arrayList.forEach(str2 -> {
            try {
                validateFilesPerPartition(testTable, hoodieBackedTableMetadata, hoodieView, allFilesInPartitions, str2);
            } catch (IOException e) {
                Assertions.fail("Exception should not be raised: " + e);
            }
        });
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    public void testMetadataTableKeyGenerator(HoodieTableType hoodieTableType) throws Exception {
        init(hoodieTableType);
        Assertions.assertEquals(HoodieTableMetadataKeyGenerator.class.getCanonicalName(), new HoodieBackedTableMetadata(this.context, this.storage, this.writeConfig.getMetadataConfig(), this.writeConfig.getBasePath(), false).getMetadataMetaClient().getTableConfig().getKeyGeneratorClassName());
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    public void testNotExistPartition(HoodieTableType hoodieTableType) throws Exception {
        init(hoodieTableType);
        Assertions.assertEquals(new HoodieBackedTableMetadata(this.context, this.storage, this.writeConfig.getMetadataConfig(), this.writeConfig.getBasePath(), false).getAllFilesInPartition(new StoragePath(this.writeConfig.getBasePath() + "dummy")).size(), 0);
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    public void testMetadataRecordKeyExcludeFromPayload(HoodieTableType hoodieTableType) throws Exception {
        initPath();
        this.writeConfig = getWriteConfigBuilder(true, true, false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMaxNumDeltaCommitsBeforeCompaction(3).build()).build();
        init(hoodieTableType, this.writeConfig);
        doWriteOperation(testTable, "0000001", WriteOperationType.INSERT);
        HoodieTableMetaClient createMetaClient = createMetaClient(this.metadataTableBasePath);
        HoodieWriteConfig metadataWriteConfig = getMetadataWriteConfig(this.writeConfig);
        createMetaClient.reloadActiveTimeline();
        HoodieSparkTable create = HoodieSparkTable.create(metadataWriteConfig, this.context, createMetaClient);
        Assertions.assertDoesNotThrow(() -> {
            verifyMetadataRecordKeyExcludeFromPayloadLogFiles(create, createMetaClient, "0000001");
        }, "Metadata table should have valid log files!");
        verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(create);
        doWriteOperation(testTable, "0000002", WriteOperationType.UPSERT);
        doWriteOperation(testTable, "0000004", WriteOperationType.UPSERT);
        Assertions.assertDoesNotThrow(() -> {
            verifyMetadataRecordKeyExcludeFromPayloadLogFiles(create, createMetaClient, "0000002");
        }, "Metadata table should have valid log files!");
        Assertions.assertDoesNotThrow(() -> {
            verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(create);
        }, "Metadata table should have a valid base file!");
        doWriteOperation(testTable, "0000005", WriteOperationType.UPSERT);
        doClean(testTable, "0000006", Arrays.asList("0000004"));
        doWriteOperation(testTable, "0000007", WriteOperationType.UPSERT);
        Assertions.assertDoesNotThrow(() -> {
            verifyMetadataRecordKeyExcludeFromPayloadLogFiles(create, createMetaClient, "7");
        }, "Metadata table should have valid log files!");
        Assertions.assertDoesNotThrow(() -> {
            verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(create);
        }, "Metadata table should have a valid base file!");
        validateMetadata(testTable);
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    public void testRepeatedCleanActionsWithMetadataTableEnabled(HoodieTableType hoodieTableType) throws Exception {
        initPath();
        this.writeConfig = getWriteConfigBuilder(true, true, false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMaxNumDeltaCommitsBeforeCompaction(4).build()).build();
        init(hoodieTableType, this.writeConfig);
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        HoodieCommitMetadata doWriteOperation = testTable.doWriteOperation(createNewInstantTime, WriteOperationType.BULK_INSERT, Collections.emptyList(), Arrays.asList("p1"), 1);
        String createNewInstantTime2 = HoodieActiveTimeline.createNewInstantTime();
        HoodieCommitMetadata doWriteOperation2 = testTable.doWriteOperation(createNewInstantTime2, WriteOperationType.BULK_INSERT, Collections.emptyList(), Arrays.asList("p1"), 1);
        HoodieTableMetaClient createMetaClient = createMetaClient(this.metadataTableBasePath);
        while (getNumCompactions(createMetaClient) == 0) {
            testTable.doWriteOperation(HoodieActiveTimeline.createNewInstantTime(), WriteOperationType.BULK_INSERT, Collections.emptyList(), Arrays.asList("p1"), 1);
            createMetaClient.reloadActiveTimeline();
        }
        Assertions.assertEquals(1, getNumCompactions(createMetaClient));
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(doWriteOperation.getFileIdAndRelativePaths().keySet());
        arrayList.addAll(doWriteOperation2.getFileIdAndRelativePaths().keySet());
        testTable.doCluster(HoodieActiveTimeline.createNewInstantTime(), Collections.singletonMap("p1", arrayList), Arrays.asList("p1"), 1);
        Set<String> filePathsInPartition = getFilePathsInPartition("p1");
        String createNewInstantTime3 = HoodieActiveTimeline.createNewInstantTime();
        HoodieCleanMetadata doCleanBasedOnCommits = testTable.doCleanBasedOnCommits(createNewInstantTime3, Arrays.asList(createNewInstantTime, createNewInstantTime2));
        List<String> deletePathPatterns = ((HoodieCleanPartitionMetadata) doCleanBasedOnCommits.getPartitionMetadata().get("p1")).getDeletePathPatterns();
        Assertions.assertTrue(deletePathPatterns.size() > 0);
        validateFilesAfterCleaning(deletePathPatterns, filePathsInPartition, getFilePathsInPartition("p1"));
        this.metaClient.reloadActiveTimeline();
        testTable.repeatClean(HoodieActiveTimeline.createNewInstantTime(), CleanerUtils.getCleanerPlan(this.metaClient, new HoodieInstant(HoodieInstant.State.REQUESTED, "clean", createNewInstantTime3)), doCleanBasedOnCommits);
        Assertions.assertEquals(1, getNumCompactions(createMetaClient));
        validateFilesAfterCleaning(deletePathPatterns, filePathsInPartition, getFilePathsInPartition("p1"));
    }

    private int getNumCompactions(HoodieTableMetaClient hoodieTableMetaClient) {
        HoodieActiveTimeline activeTimeline = hoodieTableMetaClient.getActiveTimeline();
        return activeTimeline.filter(hoodieInstant -> {
            try {
                if (hoodieInstant.getAction().equals("commit")) {
                    if (((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) activeTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class)).getOperationType().equals(WriteOperationType.COMPACT)) {
                        return true;
                    }
                }
                return false;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }).countInstants();
    }

    private Set<String> getFilePathsInPartition(String str) throws IOException {
        return (Set) new HoodieBackedTableMetadata(new HoodieLocalEngineContext(this.storageConf), this.storage, HoodieMetadataConfig.newBuilder().enable(true).build(), this.basePath).getAllFilesInPartition(new StoragePath(this.basePath, str)).stream().map(storagePathInfo -> {
            return storagePathInfo.getPath().getName();
        }).collect(Collectors.toSet());
    }

    private void validateFilesAfterCleaning(List<String> list, Set<String> set, Set<String> set2) {
        Assertions.assertEquals(list.size(), set.size() - set2.size());
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            Assertions.assertFalse(set2.contains(it.next()));
        }
        Iterator<String> it2 = set2.iterator();
        while (it2.hasNext()) {
            Assertions.assertTrue(set.contains(it2.next()));
        }
    }

    private void verifyMetadataRecordKeyExcludeFromPayloadLogFiles(HoodieTable hoodieTable, HoodieTableMetaClient hoodieTableMetaClient, String str) throws IOException {
        hoodieTable.getHoodieView().sync();
        List list = (List) hoodieTable.getSliceView().getLatestFileSlices(MetadataPartitionType.FILES.getPartitionPath()).collect(Collectors.toList());
        if (list.isEmpty()) {
            throw new IllegalStateException("LogFile slices are not available!");
        }
        List<HoodieLogFile> list2 = (List) ((FileSlice) list.get(0)).getLogFiles().map(hoodieLogFile -> {
            return hoodieLogFile;
        }).collect(Collectors.toList());
        List<String> list3 = (List) list2.stream().map(hoodieLogFile2 -> {
            return hoodieLogFile2.getPath().toString();
        }).collect(Collectors.toList());
        verifyMetadataRawRecords(hoodieTable, list2);
        verifyMetadataMergedRecords(hoodieTableMetaClient, list3, str);
    }

    private void verifyMetadataRawRecords(HoodieTable hoodieTable, List<HoodieLogFile> list) throws IOException {
        for (HoodieLogFile hoodieLogFile : list) {
            List listDirectEntries = this.storage.listDirectEntries(hoodieLogFile.getPath());
            Schema readSchemaFromLogFile = TableSchemaResolver.readSchemaFromLogFile(this.storage, hoodieLogFile.getPath());
            if (readSchemaFromLogFile != null) {
                HoodieLogFormat.Reader newReader = HoodieLogFormat.newReader(this.storage, new HoodieLogFile(((StoragePathInfo) listDirectEntries.get(0)).getPath()), readSchemaFromLogFile);
                Throwable th = null;
                while (newReader.hasNext()) {
                    try {
                        HoodieDataBlock hoodieDataBlock = (HoodieLogBlock) newReader.next();
                        if (hoodieDataBlock instanceof HoodieDataBlock) {
                            ClosableIterator recordIterator = hoodieDataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO);
                            Throwable th2 = null;
                            try {
                                try {
                                    recordIterator.forEachRemaining(hoodieRecord -> {
                                        GenericRecord genericRecord = (GenericRecord) hoodieRecord.getData();
                                        Assertions.assertNull(genericRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
                                        Assertions.assertNull(genericRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
                                        Assertions.assertFalse(String.valueOf(genericRecord.get("key")).isEmpty());
                                    });
                                    if (recordIterator != null) {
                                        if (0 != 0) {
                                            try {
                                                recordIterator.close();
                                            } catch (Throwable th3) {
                                                th2.addSuppressed(th3);
                                            }
                                        } else {
                                            recordIterator.close();
                                        }
                                    }
                                } finally {
                                }
                            } finally {
                            }
                        }
                    } catch (Throwable th4) {
                        if (newReader != null) {
                            if (0 != 0) {
                                try {
                                    newReader.close();
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                }
                            } else {
                                newReader.close();
                            }
                        }
                        throw th4;
                    }
                }
                if (newReader != null) {
                    if (0 != 0) {
                        try {
                            newReader.close();
                        } catch (Throwable th6) {
                            th.addSuppressed(th6);
                        }
                    } else {
                        newReader.close();
                    }
                }
            }
        }
    }

    private void verifyMetadataMergedRecords(HoodieTableMetaClient hoodieTableMetaClient, List<String> list, String str) {
        for (HoodieRecord hoodieRecord : HoodieMetadataLogRecordReader.newBuilder().withStorage(hoodieTableMetaClient.getStorage()).withBasePath(hoodieTableMetaClient.getBasePath()).withLogFilePaths(list).withLatestInstantTime(str).withPartition(MetadataPartitionType.FILES.getPartitionPath()).withReaderSchema(HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema())).withMaxMemorySizeInBytes(100000L).withBufferSize(4096).withSpillableMapBasePath(this.tempDir.toString()).withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK).build().getRecords()) {
            Assertions.assertFalse(hoodieRecord.getRecordKey().isEmpty());
            Assertions.assertEquals(hoodieRecord.getKey().getRecordKey(), hoodieRecord.getRecordKey());
        }
    }

    private void verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(HoodieTable hoodieTable) throws IOException {
        hoodieTable.getHoodieView().sync();
        List list = (List) hoodieTable.getSliceView().getLatestFileSlices(MetadataPartitionType.FILES.getPartitionPath()).collect(Collectors.toList());
        if (!((FileSlice) list.get(0)).getBaseFile().isPresent()) {
            throw new IllegalStateException("Base file not available!");
        }
        HoodieAvroHFileReaderImplBase.readAllRecords(HoodieSparkIOFactory.getHoodieSparkIOFactory(this.storage).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hoodieTable.getConfig(), new StoragePath(((HoodieBaseFile) ((FileSlice) list.get(0)).getBaseFile().get()).getPath()))).forEach(indexedRecord -> {
            Assertions.assertNull(((GenericRecord) indexedRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
            Assertions.assertFalse(((String) ((GenericRecord) indexedRecord).get("key")).isEmpty());
        });
    }
}
