package org.apache.hudi.common.functional;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.reflect.Field;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFileReader;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.LogReaderUtils;
import org.apache.hudi.common.table.log.TestLogReaderUtils;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HadoopMapRedUtils;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.exception.CorruptedLogFileException;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hudi/common/functional/TestHoodieLogFormat.class */
public class TestHoodieLogFormat extends HoodieCommonTestHarness {
    private static final HoodieLogBlock.HoodieLogBlockType DEFAULT_DATA_BLOCK_TYPE = HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK;
    private static final int BUFFER_SIZE = 4096;
    private static HdfsTestService hdfsTestService;
    private static HoodieStorage storage;
    private StoragePath partitionPath;
    private String spillableBasePath;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.hudi.common.functional.TestHoodieLogFormat$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/hudi/common/functional/TestHoodieLogFormat$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hudi$common$table$log$block$HoodieLogBlock$HoodieLogBlockType = new int[HoodieLogBlock.HoodieLogBlockType.values().length];

        static {
            try {
                $SwitchMap$org$apache$hudi$common$table$log$block$HoodieLogBlock$HoodieLogBlockType[HoodieLogBlock.HoodieLogBlockType.CDC_DATA_BLOCK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$table$log$block$HoodieLogBlock$HoodieLogBlockType[HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$table$log$block$HoodieLogBlock$HoodieLogBlockType[HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$table$log$block$HoodieLogBlock$HoodieLogBlockType[HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    @BeforeAll
    public static void setUpClass() throws IOException {
        if (HoodieTestUtils.shouldUseExternalHdfs()) {
            storage = new HoodieHadoopStorage(HoodieTestUtils.useExternalHdfs());
        } else {
            hdfsTestService = new HdfsTestService();
            storage = new HoodieHadoopStorage(hdfsTestService.start(true).getFileSystem());
        }
    }

    @AfterAll
    public static void tearDownClass() {
        if (hdfsTestService != null) {
            hdfsTestService.stop();
        }
    }

    @BeforeEach
    public void setUp(TestInfo testInfo) throws IOException, InterruptedException {
        Path workingDirectory = ((FileSystem) storage.getFileSystem()).getWorkingDirectory();
        this.basePath = new StoragePath(workingDirectory.toString(), testInfo.getDisplayName() + System.currentTimeMillis()).toString();
        this.partitionPath = new StoragePath(this.basePath, "partition_path");
        this.spillableBasePath = new StoragePath(workingDirectory.toString(), ".spillable_path").toString();
        Assertions.assertTrue(storage.createDirectory(this.partitionPath));
        HoodieTestUtils.init(storage.getConf().newInstance(), this.basePath, HoodieTableType.MERGE_ON_READ);
    }

    @AfterEach
    public void tearDown() throws IOException {
        storage.deleteDirectory(new StoragePath(this.basePath));
        storage.deleteDirectory(this.partitionPath);
        storage.deleteDirectory(new StoragePath(this.spillableBasePath));
    }

    @Test
    public void testEmptyLog() throws IOException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        Assertions.assertEquals(0L, build.getCurrentSize(), "Just created this log, size should be 0");
        Assertions.assertTrue(build.getLogFile().getFileName().startsWith("."), "Check all log files should start with a .");
        Assertions.assertEquals(1, build.getLogFile().getLogVersion(), "Version should be 1 for new log created");
        build.close();
    }

    @EnumSource(names = {"AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK", "PARQUET_DATA_BLOCK"})
    @ParameterizedTest
    public void testBasicAppend(HoodieLogBlock.HoodieLogBlockType hoodieLogBlockType) throws IOException, InterruptedException, URISyntaxException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build.getCurrentSize();
        AppendResult appendBlock = build.appendBlock(getDataBlock(hoodieLogBlockType, generateTestRecords, hashMap));
        long currentSize = build.getCurrentSize();
        Assertions.assertTrue(currentSize > 0, "We just wrote a block - size should be > 0");
        Assertions.assertEquals(currentSize, storage.getPathInfo(build.getLogFile().getPath()).getLength(), "Write should be auto-flushed. The size reported by FileStatus and the writer should match");
        Assertions.assertEquals(currentSize, appendBlock.size());
        Assertions.assertEquals(build.getLogFile(), appendBlock.logFile());
        Assertions.assertEquals(0L, appendBlock.offset());
        build.close();
    }

    @Test
    public void testRollover() throws IOException, InterruptedException, URISyntaxException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        AppendResult appendBlock = build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateTestRecords, hashMap));
        long currentSize = build.getCurrentSize();
        build.close();
        Assertions.assertEquals(0L, appendBlock.offset());
        Assertions.assertEquals(currentSize, appendBlock.size());
        HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).withSizeThreshold(currentSize - 1).build();
        AppendResult appendBlock2 = build2.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, SchemaTestUtil.generateTestRecords(0, 100), hashMap));
        Assertions.assertEquals(appendBlock.logFile(), appendBlock2.logFile());
        Assertions.assertNotEquals(0L, appendBlock2.offset());
        Assertions.assertEquals(0L, build2.getCurrentSize(), "This should be a new log file and hence size should be 0");
        Assertions.assertEquals(2, build2.getLogFile().getLogVersion(), "Version should be rolled to 2");
        StoragePath path = build2.getLogFile().getPath();
        Assertions.assertFalse(storage.exists(path), "Path (" + path + ") must not exist");
        AppendResult appendBlock3 = build2.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, SchemaTestUtil.generateTestRecords(0, 100), hashMap));
        Assertions.assertNotEquals(appendBlock2.logFile(), appendBlock3.logFile());
        Assertions.assertEquals(0L, appendBlock3.offset());
        build2.close();
    }

    @Test
    public void testConcurrentAppendOnExistingLogFileWithoutWriteToken() throws Exception {
        testConcurrentAppend(true, false);
    }

    @Test
    public void testConcurrentAppendOnExistingLogFileWithWriteToken() throws Exception {
        testConcurrentAppend(true, true);
    }

    @Test
    public void testConcurrentAppendOnFirstLogFileVersion() throws Exception {
        testConcurrentAppend(false, true);
    }

    private void testConcurrentAppend(boolean z, boolean z2) throws Exception {
        HoodieLogFormat.WriterBuilder withRolloverLogWriteToken;
        HoodieLogFormat.WriterBuilder withStorage = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage);
        HoodieLogFormat.WriterBuilder withStorage2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage);
        if (z2 && z) {
            withRolloverLogWriteToken = withStorage.withLogVersion(1).withRolloverLogWriteToken("1-0-1");
            withStorage2 = withStorage2.withLogVersion(1).withRolloverLogWriteToken("1-0-1");
        } else if (z2) {
            withRolloverLogWriteToken = withStorage.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION.intValue()).withRolloverLogWriteToken("1-0-1");
            withStorage2 = withStorage2.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION.intValue()).withRolloverLogWriteToken("1-0-1");
        } else {
            withRolloverLogWriteToken = withStorage.withLogVersion(1).withRolloverLogWriteToken("1-0-1");
        }
        HoodieLogFormat.Writer build = withRolloverLogWriteToken.build();
        List generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateTestRecords, hashMap);
        build.appendBlock(dataBlock);
        HoodieLogFormat.Writer build2 = withStorage2.build();
        build2.appendBlock(dataBlock);
        HoodieLogFile logFile = build.getLogFile();
        HoodieLogFile logFile2 = build2.getLogFile();
        build.close();
        build2.close();
        Assertions.assertNotNull(logFile.getLogWriteToken());
        Assertions.assertEquals(logFile.getLogVersion(), logFile2.getLogVersion() - 1, "Log Files must have different versions");
    }

    @EnumSource(names = {"AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK", "PARQUET_DATA_BLOCK"})
    @ParameterizedTest
    public void testMultipleAppend(HoodieLogBlock.HoodieLogBlockType hoodieLogBlockType) throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build.appendBlock(getDataBlock(hoodieLogBlockType, generateTestRecords, hashMap));
        long currentSize = build.getCurrentSize();
        build.close();
        HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List generateTestRecords2 = SchemaTestUtil.generateTestRecords(0, 100);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build2.appendBlock(getDataBlock(hoodieLogBlockType, generateTestRecords2, hashMap));
        long currentSize2 = build2.getCurrentSize();
        Assertions.assertTrue(currentSize2 > currentSize, "We just wrote a new block - size2 should be > size1");
        Assertions.assertEquals(currentSize2, storage.getPathInfo(build2.getLogFile().getPath()).getLength(), "Write should be auto-flushed. The size reported by FileStatus and the writer should match");
        build2.close();
        HoodieLogFormat.Writer build3 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List generateTestRecords3 = SchemaTestUtil.generateTestRecords(0, 100);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build3.appendBlock(getDataBlock(hoodieLogBlockType, generateTestRecords3, hashMap));
        long currentSize3 = build3.getCurrentSize();
        Assertions.assertTrue(currentSize3 > currentSize2, "We just wrote a new block - size3 should be > size2");
        Assertions.assertEquals(currentSize3, storage.getPathInfo(build3.getLogFile().getPath()).getLength(), "Write should be auto-flushed. The size reported by FileStatus and the writer should match");
        build3.close();
        Assertions.assertThrows(IllegalStateException.class, () -> {
            build3.getCurrentSize();
        }, "getCurrentSize should fail after the logAppender is closed");
    }

    @Test
    public void testAppendNotSupported(@TempDir java.nio.file.Path path) throws IOException, URISyntaxException, InterruptedException {
        StoragePath storagePath = new StoragePath(path.toUri().toString());
        HoodieStorage storage2 = HoodieStorageUtils.getStorage(storagePath.toString(), HoodieTestUtils.getDefaultStorageConf());
        Assertions.assertTrue(storage2.getFileSystem() instanceof LocalFileSystem);
        StoragePath storagePath2 = new StoragePath(storagePath, "append_test");
        storage2.createDirectory(storagePath2);
        List generateTestRecords = SchemaTestUtil.generateTestRecords(0, 5);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateTestRecords, hashMap);
        for (int i = 0; i < 2; i++) {
            HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(storagePath2).withFileExtension(".archive").withFileId("commits").overBaseCommit("").withStorage(storage2).build();
            build.appendBlock(dataBlock);
            build.close();
        }
        Assertions.assertEquals(2, storage2.listDirectEntries(storagePath2).size());
    }

    @Test
    public void testBasicWriteAndScan() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        Schema simpleSchema = SchemaTestUtil.getSimpleSchema();
        List generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        List list = (List) generateTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, simpleSchema);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateTestRecords, hashMap));
        build.close();
        HoodieLogFormat.Reader newReader = HoodieLogFormat.newReader(storage, build.getLogFile(), SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue(newReader.hasNext(), "We wrote a block, we should be able to read it");
        HoodieDataBlock hoodieDataBlock = (HoodieLogBlock) newReader.next();
        Assertions.assertEquals(DEFAULT_DATA_BLOCK_TYPE, hoodieDataBlock.getBlockType(), "The next block should be a data block");
        List<IndexedRecord> records = getRecords(hoodieDataBlock);
        Assertions.assertEquals(list.size(), records.size(), "Read records size should be equal to the written records size");
        Assertions.assertEquals(list, records, "Both records lists should be the same. (ordering guaranteed)");
        newReader.close();
    }

    @Test
    public void testHugeLogFileWrite() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).withSizeThreshold(3221225472L).build();
        Schema simpleSchema = SchemaTestUtil.getSimpleSchema();
        List generateTestRecords = SchemaTestUtil.generateTestRecords(0, 1000);
        List list = (List) generateTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, simpleSchema);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        HoodieAvroDataBlock hoodieAvroDataBlock = new HoodieAvroDataBlock((Supplier) null, Option.ofNullable(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateTestRecords, hashMap).getContentBytes(storage)), false, new HoodieLogBlock.HoodieLogBlockContentLocation(HoodieTestUtils.getStorage(this.basePath), (HoodieLogFile) null, 0L, r0.length, 0L), Option.ofNullable(SchemaTestUtil.getSimpleSchema()), hashMap, new HashMap(), HoodieRecord.RECORD_KEY_METADATA_FIELD);
        long j = 0;
        int i = 0;
        while (j < 2147483647L) {
            AppendResult appendBlock = build.appendBlock(hoodieAvroDataBlock);
            Assertions.assertTrue(appendBlock.size() > 0);
            j += appendBlock.size();
            i++;
        }
        build.close();
        HoodieLogFormat.Reader newReader = HoodieLogFormat.newReader(storage, build.getLogFile(), SchemaTestUtil.getSimpleSchema(), true);
        Assertions.assertTrue(newReader.hasNext(), "We wrote a block, we should be able to read it");
        HoodieDataBlock hoodieDataBlock = (HoodieLogBlock) newReader.next();
        Assertions.assertEquals(DEFAULT_DATA_BLOCK_TYPE, hoodieDataBlock.getBlockType(), "The next block should be a data block");
        List<IndexedRecord> records = getRecords(hoodieDataBlock);
        Assertions.assertEquals(list.size(), records.size(), "Read records size should be equal to the written records size");
        Assertions.assertEquals(list, records, "Both records lists should be the same. (ordering guaranteed)");
        int i2 = 1;
        while (newReader.hasNext()) {
            newReader.next();
            i2++;
        }
        Assertions.assertEquals(i, i2, "All written log should be correctly found");
        newReader.close();
    }

    @EnumSource(names = {"AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK", "PARQUET_DATA_BLOCK"})
    @ParameterizedTest
    public void testBasicAppendAndRead(HoodieLogBlock.HoodieLogBlockType hoodieLogBlockType) throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        Schema simpleSchema = SchemaTestUtil.getSimpleSchema();
        List list = (List) generateTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, simpleSchema);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build.appendBlock(getDataBlock(hoodieLogBlockType, generateTestRecords, hashMap));
        build.close();
        HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List generateTestRecords2 = SchemaTestUtil.generateTestRecords(0, 100);
        List list2 = (List) generateTestRecords2.stream().map(indexedRecord2 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord2, simpleSchema);
        }).collect(Collectors.toList());
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build2.appendBlock(getDataBlock(hoodieLogBlockType, generateTestRecords2, hashMap));
        build2.close();
        HoodieLogFormat.Writer build3 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List generateTestRecords3 = SchemaTestUtil.generateTestRecords(0, 100);
        List list3 = (List) generateTestRecords3.stream().map(indexedRecord3 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord3, simpleSchema);
        }).collect(Collectors.toList());
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build3.appendBlock(getDataBlock(hoodieLogBlockType, generateTestRecords3, hashMap));
        build3.close();
        HoodieLogFormat.Reader newReader = HoodieLogFormat.newReader(storage, build3.getLogFile(), SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue(newReader.hasNext(), "First block should be available");
        HoodieDataBlock hoodieDataBlock = (HoodieLogBlock) newReader.next();
        List<IndexedRecord> records = getRecords(hoodieDataBlock);
        Assertions.assertEquals(list.size(), records.size(), "Read records size should be equal to the written records size");
        Assertions.assertEquals(list, records, "Both records lists should be the same. (ordering guaranteed)");
        Assertions.assertEquals(hoodieDataBlock.getSchema(), SchemaTestUtil.getSimpleSchema());
        newReader.hasNext();
        List<IndexedRecord> records2 = getRecords((HoodieLogBlock) newReader.next());
        Assertions.assertEquals(list2.size(), records2.size(), "Read records size should be equal to the written records size");
        Assertions.assertEquals(list2, records2, "Both records lists should be the same. (ordering guaranteed)");
        newReader.hasNext();
        List<IndexedRecord> records3 = getRecords((HoodieLogBlock) newReader.next());
        Assertions.assertEquals(list3.size(), records3.size(), "Read records size should be equal to the written records size");
        Assertions.assertEquals(list3, records3, "Both records lists should be the same. (ordering guaranteed)");
        newReader.close();
    }

    @Test
    public void testCDCBlock() throws IOException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        Schema parse = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Record\",\"fields\":[{\"name\":\"uuid\",\"type\":[\"int\",\"null\"]},{\"name\":\"name\",\"type\":[\"string\",\"null\"]},{\"name\":\"ts\",\"type\":[\"long\",\"null\"]}]}");
        Schema schemaBySupplementalLoggingMode = HoodieCDCUtils.schemaBySupplementalLoggingMode(HoodieCDCSupplementalLoggingMode.DATA_BEFORE_AFTER, parse);
        GenericData.Record record = new GenericData.Record(parse);
        record.put("uuid", 1);
        record.put("name", "apple");
        record.put("ts", 1100L);
        GenericData.Record record2 = new GenericData.Record(parse);
        record2.put("uuid", 2);
        record2.put("name", "banana");
        record2.put("ts", 1000L);
        GenericData.Record record3 = new GenericData.Record(parse);
        record3.put("uuid", 2);
        record3.put("name", "blueberry");
        record3.put("ts", 1100L);
        GenericData.Record record4 = new GenericData.Record(parse);
        record4.put("uuid", 3);
        record4.put("name", "cherry");
        record4.put("ts", 1000L);
        ArrayList arrayList = new ArrayList(Arrays.asList(HoodieCDCUtils.cdcRecord(schemaBySupplementalLoggingMode, "i", "100", (GenericRecord) null, record), HoodieCDCUtils.cdcRecord(schemaBySupplementalLoggingMode, "u", "100", record2, record3), HoodieCDCUtils.cdcRecord(schemaBySupplementalLoggingMode, "d", "100", record4, (GenericRecord) null)));
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schemaBySupplementalLoggingMode.toString());
        build.appendBlock(getDataBlock(HoodieLogBlock.HoodieLogBlockType.CDC_DATA_BLOCK, arrayList, hashMap));
        build.close();
        HoodieLogFormat.Reader newReader = HoodieLogFormat.newReader(storage, build.getLogFile(), schemaBySupplementalLoggingMode);
        Assertions.assertTrue(newReader.hasNext());
        HoodieDataBlock hoodieDataBlock = (HoodieLogBlock) newReader.next();
        List<IndexedRecord> records = getRecords(hoodieDataBlock);
        Assertions.assertEquals(3, records.size(), "Read records size should be equal to the written records size");
        Assertions.assertEquals(hoodieDataBlock.getSchema(), schemaBySupplementalLoggingMode);
        GenericRecord genericRecord = records.stream().filter(indexedRecord -> {
            return indexedRecord.get(0).toString().equals("i");
        }).findFirst().get();
        Assertions.assertNull(genericRecord.get("before"));
        Assertions.assertNotNull(genericRecord.get("after"));
        Assertions.assertEquals(((GenericRecord) genericRecord.get("after")).get("name").toString(), "apple");
        GenericRecord genericRecord2 = records.stream().filter(indexedRecord2 -> {
            return indexedRecord2.get(0).toString().equals("u");
        }).findFirst().get();
        Assertions.assertNotNull(genericRecord2.get("before"));
        Assertions.assertNotNull(genericRecord2.get("after"));
        GenericRecord genericRecord3 = (GenericRecord) genericRecord2.get("before");
        GenericRecord genericRecord4 = (GenericRecord) genericRecord2.get("after");
        Assertions.assertEquals(String.valueOf(genericRecord3.get("name")), "banana");
        Assertions.assertEquals(Long.valueOf(genericRecord3.get("ts").toString()), 1000L);
        Assertions.assertEquals(String.valueOf(genericRecord4.get("name")), "blueberry");
        Assertions.assertEquals(Long.valueOf(genericRecord4.get("ts").toString()), 1100L);
        GenericRecord genericRecord5 = records.stream().filter(indexedRecord3 -> {
            return indexedRecord3.get(0).toString().equals("d");
        }).findFirst().get();
        Assertions.assertNotNull(genericRecord5.get("before"));
        Assertions.assertNull(genericRecord5.get("after"));
        Assertions.assertEquals(((GenericRecord) genericRecord5.get("before")).get("name").toString(), "cherry");
        newReader.close();
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        List generateHoodieTestRecords = new SchemaTestUtil().generateHoodieTestRecords(0, 400);
        Set<HoodieLogFile> writeLogFiles = writeLogFiles(this.partitionPath, addMetadataFields, generateHoodieTestRecords, 4);
        FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
        HoodieMergedLogRecordScanner build = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths((List) writeLogFiles.stream().map(hoodieLogFile -> {
            return hoodieLogFile.getPath().toString();
        }).collect(Collectors.toList())).withReaderSchema(addMetadataFields).withLatestInstantTime("100").withMaxMemorySizeInBytes(10240L).withReverseReader(false).withBufferSize(BUFFER_SIZE).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).withOptimizedLogBlocksScan(z2).build();
        ArrayList arrayList = new ArrayList();
        Iterator it = build.iterator();
        while (it.hasNext()) {
            arrayList.add((IndexedRecord) ((HoodieRecord) it.next()).getData().getInsertValue(addMetadataFields).get());
        }
        Assertions.assertEquals(sort(generateHoodieTestRecords), sort(arrayList), "Scanner records count should be the same as appended records");
        build.close();
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testBasicAppendAndPartialScanning(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        List generateHoodieTestRecords = new SchemaTestUtil().generateHoodieTestRecords(0, 300);
        Set<HoodieLogFile> writeLogFiles = writeLogFiles(this.partitionPath, addMetadataFields, generateHoodieTestRecords, 3);
        FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
        HoodieMergedLogRecordScanner build = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths((List) writeLogFiles.stream().map(hoodieLogFile -> {
            return hoodieLogFile.getPath().toString();
        }).collect(Collectors.toList())).withReaderSchema(addMetadataFields).withLatestInstantTime("100").withMaxMemorySizeInBytes(10240L).withReverseReader(false).withBufferSize(BUFFER_SIZE).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).withOptimizedLogBlocksScan(z2).withForceFullScan(false).build();
        List asList = Arrays.asList("b190b1fb-392b-4ceb-932d-a72c906127c2", "409e9ad3-5def-45e7-9180-ef579c1c220b", "e6b31f1c-60a8-4577-acf5-7e8ea318b08b", "0c477a9e-e602-4642-8e96-1cfd357b4ba0", "ea076c17-32ae-4659-8caf-6ad538b4dd8d", "7a943e09-3856-4874-83a1-8ee93e158f94", "9cbff584-d8a4-4b05-868b-dc917d6cf841", "bda0b0d8-0c56-43b0-89f9-e090d924586b", "ee118fb3-69cb-4705-a8c4-88a18e8aa1b7", "cb1fbe4d-06c3-4c9c-aea7-2665ffa8b205");
        List list = (List) generateHoodieTestRecords.stream().filter(indexedRecord -> {
            return asList.contains(((GenericRecord) indexedRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString());
        }).collect(Collectors.toList());
        build.scanByFullKeys(asList);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Iterator it = build.iterator();
        while (it.hasNext()) {
            HoodieAvroRecord hoodieAvroRecord = (HoodieRecord) it.next();
            arrayList.add(hoodieAvroRecord);
            arrayList2.add((IndexedRecord) hoodieAvroRecord.getData().getInsertValue(addMetadataFields).get());
        }
        Assertions.assertEquals(sort(list), sort(arrayList2));
        build.scanByFullKeys(asList);
        ArrayList arrayList3 = new ArrayList();
        Iterator it2 = build.iterator();
        while (it2.hasNext()) {
            arrayList3.add((HoodieRecord) it2.next());
        }
        Assertions.assertEquals(arrayList.size(), arrayList3.size());
        for (int i = 0; i < arrayList.size(); i++) {
            Assertions.assertSame(arrayList.get(i), arrayList3.get(i), "Objects have to be identical");
        }
        build.close();
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testBasicAppendAndPartialScanningByKeyPrefixes(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        List generateHoodieTestRecords = new SchemaTestUtil().generateHoodieTestRecords(0, 300);
        Set<HoodieLogFile> writeLogFiles = writeLogFiles(this.partitionPath, addMetadataFields, generateHoodieTestRecords, 3);
        FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
        HoodieMergedLogRecordScanner build = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths((List) writeLogFiles.stream().map(hoodieLogFile -> {
            return hoodieLogFile.getPath().toString();
        }).collect(Collectors.toList())).withReaderSchema(addMetadataFields).withLatestInstantTime("100").withMaxMemorySizeInBytes(10240L).withReverseReader(false).withBufferSize(BUFFER_SIZE).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).withOptimizedLogBlocksScan(z2).withForceFullScan(false).build();
        List asList = Arrays.asList("00509b14-3d1a-4283-9a8c-c72b971a9d06", "006b2f57-9bf7-4634-910c-c91542ea61e5", "007fc45d-7ce2-45be-8765-0b9082412518", "00826e50-73b4-4cb0-9d5a-375554d5e0f7");
        List list = (List) generateHoodieTestRecords.stream().filter(indexedRecord -> {
            return asList.contains(((GenericRecord) indexedRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString());
        }).collect(Collectors.toList());
        List singletonList = Collections.singletonList("00");
        build.scanByKeyPrefixes(singletonList);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Iterator it = build.iterator();
        while (it.hasNext()) {
            HoodieAvroRecord hoodieAvroRecord = (HoodieRecord) it.next();
            arrayList.add(hoodieAvroRecord);
            arrayList2.add((IndexedRecord) hoodieAvroRecord.getData().getInsertValue(addMetadataFields).get());
        }
        Assertions.assertEquals(sort(list), sort(arrayList2));
        build.scanByKeyPrefixes(singletonList);
        ArrayList arrayList3 = new ArrayList();
        Iterator it2 = build.iterator();
        while (it2.hasNext()) {
            arrayList3.add((HoodieRecord) it2.next());
        }
        Assertions.assertEquals(arrayList.size(), arrayList3.size());
        for (int i = 0; i < arrayList.size(); i++) {
            Assertions.assertSame(arrayList.get(i), arrayList3.get(i), "Objects have to be identical");
        }
        build.close();
    }

    @Test
    public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxException, InterruptedException {
        FSDataOutputStream append = storage.append(addValidBlock("test-fileId1", "100", 100).getPath());
        append.write(HoodieLogFormat.MAGIC);
        append.writeLong(474L);
        append.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        append.writeInt(1);
        append.writeLong(400L);
        append.write(StringUtils.getUTF8Bytes("something-random"));
        append.flush();
        append.close();
        HoodieLogFile addValidBlock = addValidBlock("test-fileId1", "100", 10);
        HoodieLogFormat.Reader newReader = HoodieLogFormat.newReader(storage, addValidBlock, SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue(newReader.hasNext(), "First block should be available");
        newReader.next();
        Assertions.assertTrue(newReader.hasNext(), "We should have corrupted block next");
        Assertions.assertEquals(HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK, ((HoodieLogBlock) newReader.next()).getBlockType(), "The read block should be a corrupt block");
        Assertions.assertTrue(newReader.hasNext(), "Third block should be available");
        newReader.next();
        Assertions.assertFalse(newReader.hasNext(), "There should be no more block left");
        newReader.close();
        FSDataOutputStream append2 = storage.append(addValidBlock.getPath());
        append2.write(HoodieLogFormat.MAGIC);
        append2.writeLong(1000L);
        append2.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        append2.writeInt(1);
        append2.writeLong(500L);
        append2.write(StringUtils.getUTF8Bytes("something-else-random"));
        append2.flush();
        append2.close();
        HoodieLogFormat.Reader newReader2 = HoodieLogFormat.newReader(storage, addValidBlock("test-fileId1", "100", 100), SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue(newReader2.hasNext(), "First block should be available");
        newReader2.next();
        Assertions.assertTrue(newReader2.hasNext(), "We should get the 1st corrupted block next");
        newReader2.next();
        Assertions.assertTrue(newReader2.hasNext(), "Third block should be available");
        newReader2.next();
        Assertions.assertTrue(newReader2.hasNext(), "We should get the 2nd corrupted block next");
        Assertions.assertEquals(HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK, ((HoodieLogBlock) newReader2.next()).getBlockType(), "The read block should be a corrupt block");
        Assertions.assertTrue(newReader2.hasNext(), "We should get the last block next");
        newReader2.next();
        Assertions.assertFalse(newReader2.hasNext(), "We should have no more blocks left");
        newReader2.close();
    }

    @Test
    public void testSkipCorruptedCheck() throws Exception {
        HoodieLogFormat.Reader createCorruptedFile = createCorruptedFile("test-fileid1");
        Assertions.assertEquals(HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK, ((HoodieLogBlock) createCorruptedFile.next()).getBlockType(), "The read block should be a corrupt block");
        createCorruptedFile.close();
        HoodieLogFormat.Reader createCorruptedFile2 = createCorruptedFile("test-fileid2");
        Assertions.assertTrue(createCorruptedFile2.hasNext(), "We should have corrupted block next");
        Field declaredField = createCorruptedFile2.getClass().getDeclaredField("storage");
        declaredField.setAccessible(true);
        HoodieStorage hoodieStorage = (HoodieStorage) Mockito.mock(HoodieStorage.class);
        Mockito.when(hoodieStorage.getScheme()).thenReturn("gs");
        declaredField.set(createCorruptedFile2, hoodieStorage);
        Assertions.assertTrue(((Exception) Assertions.assertThrows(IllegalArgumentException.class, () -> {
            createCorruptedFile2.next();
        })).getMessage().contains("Invalid block byte type found"));
        createCorruptedFile2.close();
    }

    @Test
    public void testMissingBlockExceptMagicBytes() throws IOException, URISyntaxException, InterruptedException {
        FSDataOutputStream append = storage.append(addValidBlock("test-fileId1", "100", 100).getPath());
        append.write(HoodieLogFormat.MAGIC);
        append.flush();
        append.close();
        HoodieLogFormat.Reader newReader = HoodieLogFormat.newReader(storage, addValidBlock("test-fileId1", "100", 10), SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue(newReader.hasNext(), "First block should be available");
        newReader.next();
        Assertions.assertTrue(newReader.hasNext(), "We should have corrupted block next");
        Assertions.assertEquals(HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK, ((HoodieLogBlock) newReader.next()).getBlockType(), "The read block should be a corrupt block");
        Assertions.assertTrue(newReader.hasNext(), "Third block should be available");
        newReader.next();
        Assertions.assertFalse(newReader.hasNext(), "There should be no more block left");
        newReader.close();
    }

    private HoodieLogFile addValidBlock(String str, String str2, int i) throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId(str).overBaseCommit(str2).withStorage(storage).build();
        List generateTestRecords = SchemaTestUtil.generateTestRecords(0, i);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateTestRecords, hashMap));
        build.close();
        return build.getLogFile();
    }

    @Test
    public void testValidateCorruptBlockEndPosition() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateTestRecords, hashMap));
        build.close();
        FSDataOutputStream append = storage.append(build.getLogFile().getPath());
        append.write(HoodieLogFormat.MAGIC);
        append.writeLong(474L);
        append.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        append.writeInt(1);
        append.writeLong(400L);
        append.write(StringUtils.getUTF8Bytes("something-random"));
        long pos = append.getPos();
        append.flush();
        append.close();
        HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List generateTestRecords2 = SchemaTestUtil.generateTestRecords(0, 10);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build2.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateTestRecords2, hashMap));
        build2.close();
        HoodieLogFormat.Reader newReader = HoodieLogFormat.newReader(storage, build2.getLogFile(), SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue(newReader.hasNext(), "First block should be available");
        newReader.next();
        Assertions.assertTrue(newReader.hasNext(), "We should have corrupted block next");
        HoodieLogBlock hoodieLogBlock = (HoodieLogBlock) newReader.next();
        Assertions.assertEquals(HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK, hoodieLogBlock.getBlockType(), "The read block should be a corrupt block");
        Assertions.assertEquals(pos, ((HoodieLogBlock.HoodieLogBlockContentLocation) hoodieLogBlock.getBlockContentLocation().get()).getBlockEndPos());
        Assertions.assertTrue(newReader.hasNext(), "Third block should be available");
        newReader.next();
        Assertions.assertFalse(newReader.hasNext(), "There should be no more block left");
        newReader.close();
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).withSizeThreshold(500L).build();
        SchemaTestUtil schemaTestUtil = new SchemaTestUtil();
        List generateHoodieTestRecords = schemaTestUtil.generateHoodieTestRecords(0, 100);
        List list = (List) generateHoodieTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, addMetadataFields);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords, hashMap));
        List generateHoodieTestRecords2 = schemaTestUtil.generateHoodieTestRecords(0, 100);
        List list2 = (List) generateHoodieTestRecords2.stream().map(indexedRecord2 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord2, addMetadataFields);
        }).collect(Collectors.toList());
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords2, hashMap));
        build.close();
        FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
        list.addAll(list2);
        checkLogBlocksAndKeys("100", addMetadataFields, diskMapType, z, z2, 200, 200, Option.of((Set) list.stream().map(indexedRecord3 -> {
            return ((GenericRecord) indexedRecord3).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
        }).collect(Collectors.toSet())));
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil schemaTestUtil = new SchemaTestUtil();
        List generateHoodieTestRecords = schemaTestUtil.generateHoodieTestRecords(0, 100);
        List list = (List) generateHoodieTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, addMetadataFields);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords, hashMap));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        List generateHoodieTestRecords2 = schemaTestUtil.generateHoodieTestRecords(0, 100);
        hashMap2.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords2, hashMap2));
        HashMap hashMap3 = new HashMap();
        hashMap3.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101");
        hashMap3.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        hashMap3.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        build.appendBlock(new HoodieCommandBlock(hashMap3));
        HashMap hashMap4 = new HashMap();
        hashMap4.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        List generateHoodieTestRecords3 = schemaTestUtil.generateHoodieTestRecords(0, 100);
        List list2 = (List) generateHoodieTestRecords3.stream().map(indexedRecord2 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord2, addMetadataFields);
        }).collect(Collectors.toList());
        hashMap4.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords3, hashMap4));
        build.close();
        FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
        FileCreateUtils.createDeltaCommit(this.basePath, "102", storage);
        list.addAll(list2);
        checkLogBlocksAndKeys("102", addMetadataFields, diskMapType, z, z2, 200, 200, Option.of((Set) list.stream().map(indexedRecord3 -> {
            return ((GenericRecord) indexedRecord3).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
        }).collect(Collectors.toSet())));
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithFailedPartialBlock(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil schemaTestUtil = new SchemaTestUtil();
        List generateHoodieTestRecords = schemaTestUtil.generateHoodieTestRecords(0, 100);
        List list = (List) generateHoodieTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, addMetadataFields);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords, hashMap));
        build.close();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        FSDataOutputStream append = storage.append(build.getLogFile().getPath());
        append.write(HoodieLogFormat.MAGIC);
        append.writeLong(1000L);
        append.writeInt(1);
        append.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        append.write(HoodieLogBlock.getLogMetadataBytes(hashMap));
        append.writeLong(StringUtils.getUTF8Bytes("something-random").length);
        append.write(StringUtils.getUTF8Bytes("something-random"));
        append.flush();
        append.close();
        HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
        List generateHoodieTestRecords2 = schemaTestUtil.generateHoodieTestRecords(0, 100);
        List list2 = (List) generateHoodieTestRecords2.stream().map(indexedRecord2 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord2, addMetadataFields);
        }).collect(Collectors.toList());
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build2.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords2, hashMap));
        build2.close();
        FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
        FileCreateUtils.createDeltaCommit(this.basePath, "103", storage);
        list.addAll(list2);
        checkLogBlocksAndKeys("103", addMetadataFields, diskMapType, z, z2, 200, 200, Option.of((Set) list.stream().map(indexedRecord3 -> {
            return ((GenericRecord) indexedRecord3).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
        }).collect(Collectors.toSet())));
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil schemaTestUtil = new SchemaTestUtil();
        List generateHoodieTestRecords = schemaTestUtil.generateHoodieTestRecords(0, 100);
        List list = (List) generateHoodieTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, addMetadataFields);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords, hashMap));
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        List generateHoodieTestRecords2 = schemaTestUtil.generateHoodieTestRecords(0, 100);
        List list2 = (List) generateHoodieTestRecords2.stream().map(indexedRecord2 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord2, addMetadataFields);
        }).collect(Collectors.toList());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords2, hashMap));
        List subList = ((List) list.stream().map(indexedRecord3 -> {
            return DeleteRecord.create(((GenericRecord) indexedRecord3).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), ((GenericRecord) indexedRecord3).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString());
        }).collect(Collectors.toList())).subList(0, 50);
        list2.addAll(list);
        List list3 = (List) list2.stream().map(indexedRecord4 -> {
            return ((GenericRecord) indexedRecord4).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
        }).collect(Collectors.toList());
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        build.appendBlock(new HoodieDeleteBlock((DeleteRecord[]) subList.toArray(new DeleteRecord[50]), hashMap));
        List list4 = (List) FSUtils.getAllLogFiles(storage, this.partitionPath, "test-fileid1", ".log", "100").map(hoodieLogFile -> {
            return hoodieLogFile.getPath().toString();
        }).collect(Collectors.toList());
        FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
        FileCreateUtils.createDeltaCommit(this.basePath, "101", storage);
        FileCreateUtils.createDeltaCommit(this.basePath, "102", storage);
        HoodieMergedLogRecordScanner build2 = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths(list4).withReaderSchema(addMetadataFields).withLatestInstantTime("102").withMaxMemorySizeInBytes(10240L).withReverseReader(false).withBufferSize(BUFFER_SIZE).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).withOptimizedLogBlocksScan(z2).build();
        Assertions.assertEquals(200L, build2.getTotalLogRecords(), "We still would read 200 records");
        ArrayList arrayList = new ArrayList(200);
        ArrayList arrayList2 = new ArrayList();
        build2.forEach(hoodieRecord -> {
            arrayList.add(hoodieRecord.getKey().getRecordKey());
        });
        build2.forEach(hoodieRecord2 -> {
            try {
                if (!((HoodieRecordPayload) hoodieRecord2.getData()).getInsertValue(addMetadataFields).isPresent()) {
                    arrayList2.add(true);
                }
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
        Assertions.assertEquals(200, arrayList.size(), "Stream collect should return all 200 records");
        Assertions.assertEquals(50, arrayList2.size(), "Stream collect should return all 50 records with empty payloads");
        list3.removeAll(subList);
        Collections.sort(list3);
        Collections.sort(arrayList);
        Assertions.assertEquals(list3, arrayList, "CompositeAvroLogReader should return 150 records from 2 versions");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        build.appendBlock(new HoodieCommandBlock(hashMap));
        FileCreateUtils.deleteDeltaCommit(this.basePath, "101", storage);
        arrayList.clear();
        build2.close();
        HoodieMergedLogRecordScanner build3 = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths(list4).withReaderSchema(addMetadataFields).withLatestInstantTime("103").withMaxMemorySizeInBytes(10240L).withReverseReader(false).withBufferSize(BUFFER_SIZE).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).withOptimizedLogBlocksScan(z2).build();
        build3.forEach(hoodieRecord3 -> {
            arrayList.add(hoodieRecord3.getKey().getRecordKey());
        });
        ArrayList arrayList3 = new ArrayList();
        build3.forEach(hoodieRecord4 -> {
            try {
                if (!((HoodieRecordPayload) hoodieRecord4.getData()).getInsertValue(addMetadataFields).isPresent()) {
                    arrayList3.add(true);
                }
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
        Assertions.assertEquals(100, arrayList.size(), "Stream collect should return 100 records, since 2nd block is rolled back");
        Assertions.assertEquals(50, arrayList3.size(), "Stream collect should return all 50 records with empty payloads");
        List list5 = (List) list.stream().map(indexedRecord5 -> {
            return ((GenericRecord) indexedRecord5).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
        }).collect(Collectors.toList());
        Collections.sort(list5);
        Collections.sort(arrayList);
        Assertions.assertEquals(list5, arrayList, "CompositeAvroLogReader should return 150 records from 2 versions");
        build.close();
        build3.close();
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithCommitBeforeAndAfterRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid111").overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil schemaTestUtil = new SchemaTestUtil();
        List generateHoodieTestRecords = schemaTestUtil.generateHoodieTestRecords(0, 100);
        List list = (List) generateHoodieTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, addMetadataFields);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK, generateHoodieTestRecords, hashMap));
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        List generateHoodieTestRecords2 = schemaTestUtil.generateHoodieTestRecords(0, 100);
        List list2 = (List) generateHoodieTestRecords2.stream().map(indexedRecord2 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord2, addMetadataFields);
        }).collect(Collectors.toList());
        build.appendBlock(getDataBlock(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK, generateHoodieTestRecords2, hashMap));
        list2.addAll(list);
        List subList = ((List) list.stream().map(indexedRecord3 -> {
            return new HoodieKey(((GenericRecord) indexedRecord3).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), ((GenericRecord) indexedRecord3).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString());
        }).collect(Collectors.toList())).subList(0, 50);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        build.appendBlock(new HoodieDeleteBlock((DeleteRecord[]) ((List) subList.stream().map(hoodieKey -> {
            return DeleteRecord.create(hoodieKey.getRecordKey(), hoodieKey.getPartitionPath());
        }).collect(Collectors.toList())).toArray(new DeleteRecord[0]), hashMap));
        List list3 = (List) FSUtils.getAllLogFiles(storage, this.partitionPath, "test-fileid111", ".log", "100").map(hoodieLogFile -> {
            return hoodieLogFile.getPath().toString();
        }).collect(Collectors.toList());
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "102");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        build.appendBlock(new HoodieCommandBlock(hashMap));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        build.appendBlock(new HoodieDeleteBlock((DeleteRecord[]) ((List) subList.stream().map(hoodieKey2 -> {
            return DeleteRecord.create(hoodieKey2.getRecordKey(), hoodieKey2.getPartitionPath());
        }).collect(Collectors.toList())).toArray(new DeleteRecord[0]), hashMap2));
        FileCreateUtils.createDeltaCommit(this.basePath, "102", storage);
        ArrayList arrayList = new ArrayList();
        HoodieMergedLogRecordScanner build2 = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths(list3).withReaderSchema(addMetadataFields).withLatestInstantTime("103").withMaxMemorySizeInBytes(10240L).withReverseReader(false).withBufferSize(BUFFER_SIZE).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).withOptimizedLogBlocksScan(z2).build();
        build2.forEach(hoodieRecord -> {
            arrayList.add(hoodieRecord.getKey().getRecordKey());
        });
        ArrayList arrayList2 = new ArrayList();
        build2.forEach(hoodieRecord2 -> {
            try {
                if (!((HoodieRecordPayload) hoodieRecord2.getData()).getInsertValue(addMetadataFields).isPresent()) {
                    arrayList2.add(true);
                }
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
        Assertions.assertEquals(200, arrayList.size(), "Stream collect should return all 200 records");
        Assertions.assertEquals(50, arrayList2.size(), "Stream collect should return 50 records with empty payloads.");
        List list4 = (List) list2.stream().map(indexedRecord4 -> {
            return ((GenericRecord) indexedRecord4).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
        }).collect(Collectors.toList());
        Collections.sort(list4);
        Collections.sort(arrayList);
        Assertions.assertEquals(list4, arrayList, "CompositeAvroLogReader should return 150 records from 2 versions");
        build.close();
        build2.close();
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType diskMapType, boolean z) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil schemaTestUtil = new SchemaTestUtil();
        List generateHoodieTestRecords = schemaTestUtil.generateHoodieTestRecords(0, 100);
        List list = (List) generateHoodieTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, addMetadataFields);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords, hashMap));
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        List generateHoodieTestRecords2 = schemaTestUtil.generateHoodieTestRecords(0, 100);
        List list2 = (List) generateHoodieTestRecords2.stream().map(indexedRecord2 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord2, addMetadataFields);
        }).collect(Collectors.toList());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords2, hashMap));
        list.addAll(list2);
        List list3 = (List) list.stream().map(indexedRecord3 -> {
            return ((GenericRecord) indexedRecord3).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
        }).collect(Collectors.toList());
        List list4 = (List) list.subList(0, 10).stream().map(indexedRecord4 -> {
            return DeleteRecord.create(((GenericRecord) indexedRecord4).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), ((GenericRecord) indexedRecord4).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString());
        }).collect(Collectors.toList());
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        build.appendBlock(new HoodieDeleteBlock((DeleteRecord[]) list4.toArray(new DeleteRecord[0]), hashMap));
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
        build.appendBlock(new HoodieDeleteBlock((DeleteRecord[]) list.subList(10, 20).stream().map(indexedRecord5 -> {
            return DeleteRecord.create(((GenericRecord) indexedRecord5).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), ((GenericRecord) indexedRecord5).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), -1);
        }).toArray(i -> {
            return new DeleteRecord[i];
        }), hashMap));
        List list5 = (List) list.subList(20, 30).stream().map(indexedRecord6 -> {
            return DeleteRecord.create(((GenericRecord) indexedRecord6).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), ((GenericRecord) indexedRecord6).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), 1);
        }).collect(Collectors.toList());
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "104");
        build.appendBlock(new HoodieDeleteBlock((DeleteRecord[]) list5.toArray(new DeleteRecord[0]), hashMap));
        List list6 = (List) FSUtils.getAllLogFiles(storage, this.partitionPath, "test-fileid1", ".log", "100").map(hoodieLogFile -> {
            return hoodieLogFile.getPath().toString();
        }).collect(Collectors.toList());
        FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
        FileCreateUtils.createDeltaCommit(this.basePath, "101", storage);
        FileCreateUtils.createDeltaCommit(this.basePath, "102", storage);
        FileCreateUtils.createDeltaCommit(this.basePath, "103", storage);
        FileCreateUtils.createDeltaCommit(this.basePath, "104", storage);
        HoodieMergedLogRecordScanner build2 = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths(list6).withReaderSchema(addMetadataFields).withLatestInstantTime("104").withMaxMemorySizeInBytes(10240L).withReverseReader(false).withBufferSize(BUFFER_SIZE).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).build();
        Assertions.assertEquals(200L, build2.getTotalLogRecords(), "We still would read 200 records");
        ArrayList arrayList = new ArrayList(200);
        ArrayList arrayList2 = new ArrayList();
        build2.forEach(hoodieRecord -> {
            arrayList.add(hoodieRecord.getRecordKey());
        });
        build2.forEach(hoodieRecord2 -> {
            try {
                if (!((HoodieRecordPayload) hoodieRecord2.getData()).getInsertValue(addMetadataFields).isPresent()) {
                    arrayList2.add(hoodieRecord2.getRecordKey());
                }
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
        Assertions.assertEquals(200, arrayList.size(), "Stream collect should return all 200 records");
        Assertions.assertEquals(20, arrayList2.size(), "Stream collect should return all 20 records with empty payloads");
        list3.removeAll((Collection) list4.stream().map((v0) -> {
            return v0.getRecordKey();
        }).collect(Collectors.toSet()));
        list3.removeAll((Collection) list5.stream().map((v0) -> {
            return v0.getRecordKey();
        }).collect(Collectors.toSet()));
        arrayList.removeAll(arrayList2);
        Collections.sort(list3);
        Collections.sort(arrayList);
        Assertions.assertEquals(list3, arrayList, "HoodieMergedLogRecordScanner should return 180 records from 4 versions");
        build.close();
        build2.close();
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil schemaTestUtil = new SchemaTestUtil();
        List generateHoodieTestRecords = schemaTestUtil.generateHoodieTestRecords(0, 100);
        List list = (List) generateHoodieTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, addMetadataFields);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords, hashMap));
        List generateHoodieTestRecords2 = schemaTestUtil.generateHoodieTestRecords(0, 100);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords2, hashMap));
        build.appendBlock(new HoodieDeleteBlock((DeleteRecord[]) ((List) list.stream().map(indexedRecord2 -> {
            return DeleteRecord.create(((GenericRecord) indexedRecord2).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), ((GenericRecord) indexedRecord2).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString());
        }).collect(Collectors.toList())).subList(0, 50).toArray(new DeleteRecord[50]), hashMap));
        FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        HoodieCommandBlock hoodieCommandBlock = new HoodieCommandBlock(hashMap);
        try {
            build.appendBlock(hoodieCommandBlock);
            throw new Exception("simulating failure");
        } catch (Exception e) {
            build.appendBlock(hoodieCommandBlock);
            build.close();
            checkLogBlocksAndKeys("100", addMetadataFields, diskMapType, z, z2, 0, 0, Option.empty());
            FileCreateUtils.deleteDeltaCommit(this.basePath, "100", storage);
        }
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithInsertDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List generateHoodieTestRecords = new SchemaTestUtil().generateHoodieTestRecords(0, 100);
        List list = (List) generateHoodieTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, addMetadataFields);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords, hashMap));
        build.appendBlock(new HoodieDeleteBlock((DeleteRecord[]) ((List) list.stream().map(indexedRecord2 -> {
            return DeleteRecord.create(((GenericRecord) indexedRecord2).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), ((GenericRecord) indexedRecord2).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString());
        }).collect(Collectors.toList())).subList(0, 50).toArray(new DeleteRecord[50]), hashMap));
        FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        HoodieCommandBlock hoodieCommandBlock = new HoodieCommandBlock(hashMap);
        build.appendBlock(hoodieCommandBlock);
        build.appendBlock(hoodieCommandBlock);
        build.close();
        checkLogBlocksAndKeys("100", addMetadataFields, diskMapType, z, z2, 0, 0, Option.empty());
        FileCreateUtils.deleteDeltaCommit(this.basePath, "100", storage);
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithInvalidRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List generateHoodieTestRecords = new SchemaTestUtil().generateHoodieTestRecords(0, 100);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords, hashMap));
        FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        build.appendBlock(new HoodieCommandBlock(hashMap));
        build.close();
        checkLogBlocksAndKeys("100", addMetadataFields, diskMapType, z, z2, 100, 100, Option.empty());
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List generateHoodieTestRecords = new SchemaTestUtil().generateHoodieTestRecords(0, 100);
        List list = (List) generateHoodieTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, addMetadataFields);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords, hashMap);
        build.appendBlock(dataBlock);
        build.appendBlock(dataBlock);
        build.appendBlock(dataBlock);
        build.appendBlock(new HoodieDeleteBlock((DeleteRecord[]) ((List) list.stream().map(indexedRecord2 -> {
            return DeleteRecord.create(((GenericRecord) indexedRecord2).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), ((GenericRecord) indexedRecord2).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString());
        }).collect(Collectors.toList())).subList(0, 50).toArray(new DeleteRecord[50]), hashMap));
        FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        build.appendBlock(new HoodieCommandBlock(hashMap));
        build.close();
        checkLogBlocksAndKeys("101", addMetadataFields, diskMapType, z, z2, 0, 0, Option.empty());
    }

    @Disabled("HUDI-7375")
    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testLogReaderWithDifferentVersionsOfDeleteBlocks(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List asList = Arrays.asList("d448e1b8-a0d4-45c0-bf2d-a9e16ff3c8ce", "df3f71cd-5b68-406c-bb70-861179444adb", "cf64885c-af32-463b-8f1b-2f31a39b1afa", "9884e134-0d60-46e8-8a1e-36db0e455c4a", "698544b8-defa-4fa7-ac15-8963f7d0784d", "081c279e-fc6a-4e05-89b7-3136e4cad488", "1041fac7-8a54-47e6-8a2d-d1a650301699", "69c003f8-386d-40a0-9c61-5a903d1d6ac2", "e574d164-f8c4-47cf-b150-264c2364f10e", "d76007d2-9dc8-46ff-bf6f-0789c6ffffc0");
        SchemaTestUtil schemaTestUtil = new SchemaTestUtil();
        List generateHoodieTestRecords = schemaTestUtil.generateHoodieTestRecords(0, schemaTestUtil.genRandomUUID(100, asList), "0000/00/00", "100");
        List list = (List) generateHoodieTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, addMetadataFields);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords, hashMap));
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        List generateHoodieTestRecords2 = schemaTestUtil.generateHoodieTestRecords(0, 100);
        List list2 = (List) generateHoodieTestRecords2.stream().map(indexedRecord2 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord2, addMetadataFields);
        }).collect(Collectors.toList());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords2, hashMap));
        byte[] bArr = new byte[605];
        TestHoodieLogFormat.class.getResourceAsStream("/format/delete-block-v2-content-10-records.data").read(bArr);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        build.appendBlock(new HoodieDeleteBlock(Option.of(bArr), (Supplier) null, true, Option.empty(), hashMap, Collections.EMPTY_MAP));
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
        List subList = ((List) list2.stream().map(indexedRecord3 -> {
            return DeleteRecord.create(((GenericRecord) indexedRecord3).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), ((GenericRecord) indexedRecord3).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString());
        }).collect(Collectors.toList())).subList(0, 60);
        build.appendBlock(new HoodieDeleteBlock((DeleteRecord[]) subList.toArray(new DeleteRecord[0]), hashMap));
        list2.addAll(list);
        List list3 = (List) list2.stream().map(indexedRecord4 -> {
            return ((GenericRecord) indexedRecord4).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
        }).collect(Collectors.toList());
        List list4 = (List) FSUtils.getAllLogFiles(storage, this.partitionPath, "test-fileid1", ".log", "100").map(hoodieLogFile -> {
            return hoodieLogFile.getPath().toString();
        }).collect(Collectors.toList());
        FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
        FileCreateUtils.createDeltaCommit(this.basePath, "101", storage);
        FileCreateUtils.createDeltaCommit(this.basePath, "102", storage);
        FileCreateUtils.createDeltaCommit(this.basePath, "103", storage);
        HoodieMergedLogRecordScanner build2 = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths(list4).withReaderSchema(addMetadataFields).withLatestInstantTime("103").withMaxMemorySizeInBytes(10240L).withReverseReader(false).withBufferSize(BUFFER_SIZE).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).withOptimizedLogBlocksScan(z2).build();
        Throwable th = null;
        try {
            try {
                Assertions.assertEquals(200L, build2.getTotalLogRecords(), "We still would read 200 records");
                ArrayList arrayList = new ArrayList(200);
                ArrayList arrayList2 = new ArrayList(200);
                ArrayList arrayList3 = new ArrayList();
                build2.forEach(hoodieRecord -> {
                    arrayList.add(hoodieRecord.getKey().getRecordKey());
                });
                build2.forEach(hoodieRecord2 -> {
                    try {
                        if (((HoodieRecordPayload) hoodieRecord2.getData()).getInsertValue(addMetadataFields, new Properties()).isPresent()) {
                            arrayList2.add(hoodieRecord2.getKey().getRecordKey());
                        } else {
                            arrayList3.add(true);
                        }
                    } catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                });
                Assertions.assertEquals(200, arrayList.size(), "Stream collect should return all 200 records");
                Assertions.assertEquals(70, arrayList3.size(), "Stream collect should return all 70 records with empty payloads");
                Collections.sort(list3);
                Collections.sort(arrayList);
                Assertions.assertEquals(list3, arrayList, "200 records should be scanned regardless of deletes or not");
                list3.removeAll(asList);
                list3.removeAll((Collection) subList.stream().map((v0) -> {
                    return v0.getRecordKey();
                }).collect(Collectors.toList()));
                Collections.sort(list3);
                Collections.sort(arrayList2);
                Assertions.assertEquals(list3, arrayList2, "Only 130 records should exist after deletion");
                if (build2 != null) {
                    if (0 == 0) {
                        build2.close();
                        return;
                    }
                    try {
                        build2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build2 != null) {
                if (th != null) {
                    try {
                        build2.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build2.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testAvroLogRecordReaderWithRollbackOlderBlocks() throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil schemaTestUtil = new SchemaTestUtil();
        List generateHoodieTestRecords = schemaTestUtil.generateHoodieTestRecords(0, 100);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords, hashMap));
        FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
        List generateHoodieTestRecords2 = schemaTestUtil.generateHoodieTestRecords(100, 10);
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords2, hashMap));
        FileCreateUtils.createDeltaCommit(this.basePath, "101", storage);
        checkLogBlocksAndKeys("101", addMetadataFields, ExternalSpillableMap.DiskMapType.BITCASK, false, false, 110, 110, Option.empty());
        hashMap.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        build.appendBlock(new HoodieCommandBlock(hashMap));
        checkLogBlocksAndKeys("101", addMetadataFields, ExternalSpillableMap.DiskMapType.BITCASK, false, false, 10, 10, Option.empty());
        hashMap.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        build.appendBlock(new HoodieCommandBlock(hashMap));
        build.close();
        checkLogBlocksAndKeys("101", addMetadataFields, ExternalSpillableMap.DiskMapType.BITCASK, false, false, 0, 0, Option.empty());
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List generateHoodieTestRecords = new SchemaTestUtil().generateHoodieTestRecords(0, 100);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords, hashMap);
        build.appendBlock(dataBlock);
        build.appendBlock(dataBlock);
        build.appendBlock(dataBlock);
        build.close();
        FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
        FSDataOutputStream append = storage.append(build.getLogFile().getPath());
        append.write(HoodieLogFormat.MAGIC);
        append.writeLong(1000L);
        append.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        append.writeInt(1);
        append.writeLong(100L);
        append.flush();
        append.close();
        FSDataOutputStream append2 = storage.append(build.getLogFile().getPath());
        append2.write(HoodieLogFormat.MAGIC);
        append2.writeLong(1000L);
        append2.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        append2.writeInt(1);
        append2.writeLong(100L);
        append2.flush();
        append2.close();
        HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        build2.appendBlock(dataBlock);
        build2.close();
        FSDataOutputStream append3 = storage.append(build2.getLogFile().getPath());
        append3.write(HoodieLogFormat.MAGIC);
        append3.writeLong(1000L);
        append3.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        append3.writeInt(1);
        append3.writeLong(100L);
        append3.flush();
        append3.close();
        HoodieLogFormat.Writer build3 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
        build3.appendBlock(new HoodieCommandBlock(hashMap));
        build3.close();
        checkLogBlocksAndKeys("101", addMetadataFields, ExternalSpillableMap.DiskMapType.BITCASK, false, false, 0, 0, Option.empty());
        FileCreateUtils.deleteDeltaCommit(this.basePath, "100", storage);
    }

    @MethodSource({"testArgumentsWithoutOptimizedScanArg"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithMixedInsertsCorruptsRollbackAndMergedLogBlock(ExternalSpillableMap.DiskMapType diskMapType, boolean z) throws IOException, URISyntaxException, InterruptedException {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        SchemaTestUtil schemaTestUtil = new SchemaTestUtil();
        List generateHoodieTestRecords = schemaTestUtil.generateHoodieTestRecords(0, 100);
        Set set = (Set) generateHoodieTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, addMetadataFields).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
        }).collect(Collectors.toSet());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, new ArrayList(generateHoodieTestRecords), hashMap));
        FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
        List generateHoodieTestRecords2 = schemaTestUtil.generateHoodieTestRecords(0, 100);
        set.addAll((Collection) generateHoodieTestRecords2.stream().map(indexedRecord2 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord2, addMetadataFields).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
        }).collect(Collectors.toList()));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
        hashMap2.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, new ArrayList(generateHoodieTestRecords2), hashMap2));
        FileCreateUtils.createDeltaCommit(this.basePath, "101", storage);
        List generateHoodieTestRecords3 = schemaTestUtil.generateHoodieTestRecords(0, 100);
        new HashSet(set).addAll((Collection) generateHoodieTestRecords3.stream().map(indexedRecord3 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord3, addMetadataFields).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
        }).collect(Collectors.toList()));
        HashMap hashMap3 = new HashMap();
        hashMap3.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
        hashMap3.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, new ArrayList(generateHoodieTestRecords3), hashMap3));
        build.close();
        FileCreateUtils.createDeltaCommit(this.basePath, "102", storage);
        FSDataOutputStream append = storage.append(build.getLogFile().getPath());
        append.write(HoodieLogFormat.MAGIC);
        append.writeLong(1000L);
        append.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        append.writeInt(1);
        append.writeLong(100L);
        append.flush();
        append.close();
        FSDataOutputStream append2 = storage.append(build.getLogFile().getPath());
        append2.write(HoodieLogFormat.MAGIC);
        append2.writeLong(1000L);
        append2.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        append2.writeInt(1);
        append2.writeLong(100L);
        append2.flush();
        append2.close();
        HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List list = (List) Stream.of((Object[]) new List[]{generateHoodieTestRecords, generateHoodieTestRecords2}).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
        HashMap hashMap4 = new HashMap();
        hashMap4.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
        hashMap4.put(HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES, "100,101");
        hashMap4.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build2.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, new ArrayList(list), hashMap4));
        FileCreateUtils.createDeltaCommit(this.basePath, "103", storage);
        List list2 = (List) Stream.of((Object[]) new List[]{list, generateHoodieTestRecords3}).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
        HashMap hashMap5 = new HashMap();
        hashMap5.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "104");
        hashMap5.put(HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES, "103,102");
        hashMap5.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build2.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, new ArrayList(list2), hashMap5));
        FileCreateUtils.createDeltaCommit(this.basePath, "104", storage);
        List generateHoodieTestRecords4 = schemaTestUtil.generateHoodieTestRecords(0, 100);
        HashMap hashMap6 = new HashMap();
        hashMap6.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "105");
        hashMap6.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build2.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, new ArrayList(generateHoodieTestRecords4), hashMap6));
        FileCreateUtils.createDeltaCommit(this.basePath, "105", storage);
        List generateHoodieTestRecords5 = schemaTestUtil.generateHoodieTestRecords(0, 100);
        HashMap hashMap7 = new HashMap();
        hashMap7.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "106");
        hashMap7.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build2.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, new ArrayList(generateHoodieTestRecords5), hashMap7));
        FileCreateUtils.createDeltaCommit(this.basePath, "106", storage);
        List generateHoodieTestRecords6 = schemaTestUtil.generateHoodieTestRecords(0, 100);
        HashMap hashMap8 = new HashMap();
        hashMap8.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "107");
        hashMap8.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build2.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, new ArrayList(generateHoodieTestRecords6), hashMap8));
        FileCreateUtils.createDeltaCommit(this.basePath, "107", storage);
        List list3 = (List) Stream.of((Object[]) new List[]{generateHoodieTestRecords5, generateHoodieTestRecords6}).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
        HashMap hashMap9 = new HashMap();
        hashMap9.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "108");
        hashMap9.put(HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES, "106,107");
        hashMap9.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
        build2.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, new ArrayList(list3), hashMap9));
        build2.close();
        FileCreateUtils.createDeltaCommit(this.basePath, "108", storage);
        HoodieMergedLogRecordScanner build3 = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths((List) FSUtils.getAllLogFiles(storage, this.partitionPath, "test-fileid1", ".log", "100").map(hoodieLogFile -> {
            return hoodieLogFile.getPath().toString();
        }).collect(Collectors.toList())).withReaderSchema(addMetadataFields).withLatestInstantTime("108").withMaxMemorySizeInBytes(10240L).withReverseReader(false).withBufferSize(BUFFER_SIZE).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).withOptimizedLogBlocksScan(true).build();
        Assertions.assertEquals(600L, build3.getTotalLogRecords(), "We would read 600 records from scanner");
        ArrayList arrayList = new ArrayList();
        build3.forEach(hoodieRecord -> {
            arrayList.add(hoodieRecord.getKey().getRecordKey());
        });
        List list4 = (List) Stream.of((Object[]) new List[]{list2, generateHoodieTestRecords4, list3}).flatMap((v0) -> {
            return v0.stream();
        }).map(indexedRecord4 -> {
            return ((GenericRecord) indexedRecord4).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
        }).sorted().collect(Collectors.toList());
        Assertions.assertEquals(Arrays.asList("108", "105", "104"), build3.getValidBlockInstants());
        Collections.sort(arrayList);
        Assertions.assertEquals(list4, arrayList, "Record keys read should be exactly same.");
        build3.close();
    }

    private void testAvroLogRecordReaderMergingMultipleLogFiles(int i, int i2, ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) {
        try {
            Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
            List generateHoodieTestRecords = new SchemaTestUtil().generateHoodieTestRecords(0, 101);
            ArrayList arrayList = new ArrayList(generateHoodieTestRecords);
            HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
            HashMap hashMap = new HashMap();
            hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
            hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
            build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateHoodieTestRecords.subList(0, i), hashMap));
            long currentSize = build.getCurrentSize();
            build.close();
            HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).withSizeThreshold(currentSize - 1).build();
            HashMap hashMap2 = new HashMap();
            hashMap2.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
            hashMap2.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, addMetadataFields.toString());
            build2.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, arrayList.subList(0, i2), hashMap2));
            build2.close();
            FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
            HoodieMergedLogRecordScanner build3 = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths((List) FSUtils.getAllLogFiles(storage, this.partitionPath, "test-fileid1", ".log", "100").map(hoodieLogFile -> {
                return hoodieLogFile.getPath().toString();
            }).collect(Collectors.toList())).withReaderSchema(addMetadataFields).withLatestInstantTime("100").withMaxMemorySizeInBytes(10240L).withReverseReader(false).withBufferSize(BUFFER_SIZE).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).withOptimizedLogBlocksScan(z2).build();
            Assertions.assertEquals(Math.max(i, i2), build3.getNumMergedRecordsInLog(), "We would read 100 records");
            build3.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) {
        testAvroLogRecordReaderMergingMultipleLogFiles(77, 100, diskMapType, z, z2);
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) {
        testAvroLogRecordReaderMergingMultipleLogFiles(100, 66, diskMapType, z, z2);
    }

    @MethodSource({"testArguments"})
    @ParameterizedTest
    public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts(ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2) {
        testAvroLogRecordReaderMergingMultipleLogFiles(100, 100, diskMapType, z, z2);
    }

    @Test
    public void testBasicAppendAndReadInReverse() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        Schema simpleSchema = SchemaTestUtil.getSimpleSchema();
        List generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        List list = (List) generateTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, simpleSchema);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, simpleSchema.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateTestRecords, hashMap));
        build.close();
        HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List generateTestRecords2 = SchemaTestUtil.generateTestRecords(0, 100);
        List list2 = (List) generateTestRecords2.stream().map(indexedRecord2 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord2, simpleSchema);
        }).collect(Collectors.toList());
        build2.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateTestRecords2, hashMap));
        build2.close();
        HoodieLogFormat.Writer build3 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List generateTestRecords3 = SchemaTestUtil.generateTestRecords(0, 100);
        List list3 = (List) generateTestRecords3.stream().map(indexedRecord3 -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord3, simpleSchema);
        }).collect(Collectors.toList());
        build3.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateTestRecords3, hashMap));
        build3.close();
        FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
        HoodieLogFileReader hoodieLogFileReader = new HoodieLogFileReader(storage, new HoodieLogFile(build3.getLogFile().getPath(), storage.getPathInfo(build3.getLogFile().getPath()).getLength()), SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, true);
        Throwable th = null;
        try {
            try {
                Assertions.assertTrue(hoodieLogFileReader.hasPrev(), "Last block should be available");
                List<IndexedRecord> records = getRecords(hoodieLogFileReader.prev());
                Assertions.assertEquals(list3.size(), records.size(), "Third records size should be equal to the written records size");
                Assertions.assertEquals(list3, records, "Both records lists should be the same. (ordering guaranteed)");
                Assertions.assertTrue(hoodieLogFileReader.hasPrev(), "Second block should be available");
                List<IndexedRecord> records2 = getRecords(hoodieLogFileReader.prev());
                Assertions.assertEquals(list2.size(), records2.size(), "Read records size should be equal to the written records size");
                Assertions.assertEquals(list2, records2, "Both records lists should be the same. (ordering guaranteed)");
                Assertions.assertTrue(hoodieLogFileReader.hasPrev(), "First block should be available");
                List<IndexedRecord> records3 = getRecords(hoodieLogFileReader.prev());
                Assertions.assertEquals(list.size(), records3.size(), "Read records size should be equal to the written records size");
                Assertions.assertEquals(list, records3, "Both records lists should be the same. (ordering guaranteed)");
                Assertions.assertFalse(hoodieLogFileReader.hasPrev());
                if (hoodieLogFileReader != null) {
                    if (0 == 0) {
                        hoodieLogFileReader.close();
                        return;
                    }
                    try {
                        hoodieLogFileReader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieLogFileReader != null) {
                if (th != null) {
                    try {
                        hoodieLogFileReader.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieLogFileReader.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testAppendAndReadOnCorruptedLogInReverse() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        Schema simpleSchema = SchemaTestUtil.getSimpleSchema();
        List generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, simpleSchema.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateTestRecords, hashMap));
        build.close();
        FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
        FSDataOutputStream append = storage.append(build.getLogFile().getPath());
        append.write(HoodieLogFormat.MAGIC);
        append.writeInt(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
        append.writeInt(1000);
        append.writeInt(1);
        append.write(HoodieLogBlock.getLogMetadataBytes(hashMap));
        append.write(StringUtils.getUTF8Bytes("something-random"));
        append.flush();
        append.close();
        HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        build2.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, SchemaTestUtil.generateTestRecords(0, 100), hashMap));
        build2.close();
        HoodieLogFileReader hoodieLogFileReader = new HoodieLogFileReader(storage, new HoodieLogFile(build2.getLogFile().getPath(), storage.getPathInfo(build2.getLogFile().getPath()).getLength()), simpleSchema, BUFFER_SIZE, true);
        Throwable th = null;
        try {
            Assertions.assertTrue(hoodieLogFileReader.hasPrev(), "Last block should be available");
            Assertions.assertTrue(hoodieLogFileReader.prev() instanceof HoodieDataBlock, "Last block should be datablock");
            Assertions.assertTrue(hoodieLogFileReader.hasPrev(), "Last block should be available");
            Assertions.assertThrows(CorruptedLogFileException.class, () -> {
                hoodieLogFileReader.prev();
            });
            if (hoodieLogFileReader != null) {
                if (0 == 0) {
                    hoodieLogFileReader.close();
                    return;
                }
                try {
                    hoodieLogFileReader.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieLogFileReader != null) {
                if (0 != 0) {
                    try {
                        hoodieLogFileReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieLogFileReader.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testBasicAppendAndTraverseInReverse() throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        Schema simpleSchema = SchemaTestUtil.getSimpleSchema();
        List generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        List list = (List) generateTestRecords.stream().map(indexedRecord -> {
            return HoodieAvroUtils.rewriteRecord((GenericRecord) indexedRecord, simpleSchema);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, simpleSchema.toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateTestRecords, hashMap));
        build.close();
        HoodieLogFormat.Writer build2 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        build2.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, SchemaTestUtil.generateTestRecords(0, 100), hashMap));
        build2.close();
        HoodieLogFormat.Writer build3 = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        build3.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, SchemaTestUtil.generateTestRecords(0, 100), hashMap));
        build3.close();
        FileCreateUtils.createDeltaCommit(this.basePath, "100", storage);
        HoodieLogFileReader hoodieLogFileReader = new HoodieLogFileReader(storage, new HoodieLogFile(build3.getLogFile().getPath(), storage.getPathInfo(build3.getLogFile().getPath()).getLength()), SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, true);
        Throwable th = null;
        try {
            try {
                Assertions.assertTrue(hoodieLogFileReader.hasPrev(), "Third block should be available");
                hoodieLogFileReader.moveToPrev();
                Assertions.assertTrue(hoodieLogFileReader.hasPrev(), "Second block should be available");
                hoodieLogFileReader.moveToPrev();
                Assertions.assertTrue(hoodieLogFileReader.hasPrev(), "First block should be available");
                List<IndexedRecord> records = getRecords(hoodieLogFileReader.prev());
                Assertions.assertEquals(list.size(), records.size(), "Read records size should be equal to the written records size");
                Assertions.assertEquals(list, records, "Both records lists should be the same. (ordering guaranteed)");
                Assertions.assertFalse(hoodieLogFileReader.hasPrev());
                if (hoodieLogFileReader != null) {
                    if (0 == 0) {
                        hoodieLogFileReader.close();
                        return;
                    }
                    try {
                        hoodieLogFileReader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieLogFileReader != null) {
                if (th != null) {
                    try {
                        hoodieLogFileReader.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieLogFileReader.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testV0Format() throws IOException, URISyntaxException {
        Schema simpleSchema = SchemaTestUtil.getSimpleSchema();
        List generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        ArrayList arrayList = new ArrayList(generateTestRecords);
        Assertions.assertEquals(100, generateTestRecords.size());
        Assertions.assertEquals(100, arrayList.size());
        byte[] bytes = new HoodieAvroDataBlock((List) generateTestRecords.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()), simpleSchema).getBytes(simpleSchema);
        Assertions.assertTrue(bytes.length > 0);
        HoodieAvroDataBlock block = HoodieAvroDataBlock.getBlock(bytes, simpleSchema);
        Assertions.assertEquals(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK, block.getBlockType());
        List<IndexedRecord> records = getRecords(block);
        Assertions.assertEquals(records.size(), arrayList.size());
        for (int i = 0; i < arrayList.size(); i++) {
            Assertions.assertEquals(arrayList.get(i), records.get(i));
        }
        HoodieAvroDataBlock block2 = HoodieAvroDataBlock.getBlock(bytes, (Schema) null);
        Assertions.assertEquals(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK, block2.getBlockType());
        List<IndexedRecord> records2 = getRecords(block2);
        Assertions.assertEquals(records2.size(), arrayList.size());
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            Assertions.assertEquals(arrayList.get(i2), records2.get(i2));
        }
    }

    @EnumSource(names = {"AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK", "PARQUET_DATA_BLOCK"})
    @ParameterizedTest
    public void testDataBlockFormatAppendAndReadWithProjectedSchema(HoodieLogBlock.HoodieLogBlockType hoodieLogBlockType) throws IOException, URISyntaxException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        List generateTestGenericRecords = SchemaTestUtil.generateTestGenericRecords(0, 1000);
        final Schema simpleSchema = SchemaTestUtil.getSimpleSchema();
        HashMap<HoodieLogBlock.HeaderMetadataType, String> hashMap = new HashMap<HoodieLogBlock.HeaderMetadataType, String>() { // from class: org.apache.hudi.common.functional.TestHoodieLogFormat.1
            {
                put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
                put(HoodieLogBlock.HeaderMetadataType.SCHEMA, simpleSchema.toString());
            }
        };
        BenchmarkCounter.initCounterFromReporter(HadoopMapRedUtils.createTestReporter(), ((FileSystem) storage.getFileSystem()).getConf());
        build.appendBlock(getDataBlock(hoodieLogBlockType, generateTestGenericRecords, hashMap));
        build.close();
        Schema generateProjectionSchema = HoodieAvroUtils.generateProjectionSchema(simpleSchema, Collections.singletonList("name"));
        List rewriteRecords = HoodieAvroUtils.rewriteRecords(generateTestGenericRecords, generateProjectionSchema);
        HoodieLogFormat.Reader newReader = HoodieLogFormat.newReader(storage, build.getLogFile(), generateProjectionSchema, false);
        Throwable th = null;
        try {
            Assertions.assertTrue(newReader.hasNext(), "First block should be available");
            HoodieDataBlock hoodieDataBlock = (HoodieLogBlock) newReader.next();
            HashMap<HoodieLogBlock.HoodieLogBlockType, Integer> hashMap2 = new HashMap<HoodieLogBlock.HoodieLogBlockType, Integer>() { // from class: org.apache.hudi.common.functional.TestHoodieLogFormat.2
                {
                    put(HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK, 0);
                    put(HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK, 0);
                    put(HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK, Integer.valueOf(HoodieAvroUtils.gteqAvro1_9() ? (HoodieTestUtils.getJavaVersion() == 17 || HoodieTestUtils.getJavaVersion() == 11) ? 1803 : 1802 : 1809));
                }
            };
            List<IndexedRecord> records = getRecords(hoodieDataBlock);
            Assertions.assertEquals(rewriteRecords.size(), records.size(), "Read records size should be equal to the written records size");
            Assertions.assertEquals(rewriteRecords, records, "Both records lists should be the same. (ordering guaranteed)");
            Assertions.assertEquals(hoodieDataBlock.getSchema(), generateProjectionSchema);
            Assertions.assertEquals(hashMap2.get(hoodieLogBlockType), (int) BenchmarkCounter.getBytesRead(), "Read bytes have to match");
            if (newReader != null) {
                if (0 == 0) {
                    newReader.close();
                    return;
                }
                try {
                    newReader.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (newReader != null) {
                if (0 != 0) {
                    try {
                        newReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newReader.close();
                }
            }
            throw th3;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @ValueSource(booleans = {false, true})
    @ParameterizedTest
    public void testGetRecordPositions(boolean z) throws IOException {
        HashMap hashMap = new HashMap();
        List arrayList = new ArrayList();
        if (z) {
            arrayList = TestLogReaderUtils.generatePositions();
            hashMap.put(HoodieLogBlock.HeaderMetadataType.RECORD_POSITIONS, LogReaderUtils.encodePositions(arrayList));
        }
        HoodieDeleteBlock hoodieDeleteBlock = new HoodieDeleteBlock(new DeleteRecord[0], hashMap);
        if (z) {
            TestLogReaderUtils.assertPositionEquals(arrayList, hoodieDeleteBlock.getRecordPositions());
        }
    }

    public static HoodieDataBlock getDataBlock(HoodieLogBlock.HoodieLogBlockType hoodieLogBlockType, List<IndexedRecord> list, Map<HoodieLogBlock.HeaderMetadataType, String> map) {
        return getDataBlock(hoodieLogBlockType, (List) list.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()), map, new StoragePath("dummy_path"));
    }

    private static HoodieDataBlock getDataBlock(HoodieLogBlock.HoodieLogBlockType hoodieLogBlockType, List<HoodieRecord> list, Map<HoodieLogBlock.HeaderMetadataType, String> map, StoragePath storagePath) {
        switch (AnonymousClass3.$SwitchMap$org$apache$hudi$common$table$log$block$HoodieLogBlock$HoodieLogBlockType[hoodieLogBlockType.ordinal()]) {
            case 1:
                return new HoodieCDCDataBlock(list, map, HoodieRecord.RECORD_KEY_METADATA_FIELD);
            case 2:
                return new HoodieAvroDataBlock(list, map, HoodieRecord.RECORD_KEY_METADATA_FIELD);
            case 3:
                return new HoodieHFileDataBlock(list, map, (String) HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), storagePath, ((Boolean) HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue()).booleanValue());
            case 4:
                return new HoodieParquetDataBlock(list, map, HoodieRecord.RECORD_KEY_METADATA_FIELD, (String) HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1d, true);
            default:
                throw new RuntimeException("Unknown data block type " + hoodieLogBlockType);
        }
    }

    private static Stream<Arguments> testArguments() {
        return Stream.of((Object[]) new Arguments[]{Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, false, false}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, true, false}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, true, false}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, false, true}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, false, true}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, true, true}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true})});
    }

    private static Stream<Arguments> testArgumentsWithoutOptimizedScanArg() {
        return Stream.of((Object[]) new Arguments[]{Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, false}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, false}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.BITCASK, true}), Arguments.arguments(new Object[]{ExternalSpillableMap.DiskMapType.ROCKS_DB, true})});
    }

    private static Set<HoodieLogFile> writeLogFiles(StoragePath storagePath, Schema schema, List<IndexedRecord> list, int i) throws IOException, InterruptedException {
        return writeLogFiles(storagePath, schema, list, i, false);
    }

    private static Set<HoodieLogFile> writeLogFiles(StoragePath storagePath, Schema schema, List<IndexedRecord> list, int i, boolean z) throws IOException, InterruptedException {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(storagePath).withFileExtension(".log").withSizeThreshold(1024L).withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build();
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
        HashSet hashSet = new HashSet();
        int size = list.size() / i;
        int i2 = 0;
        while (i2 < i) {
            int size2 = i2 == i - 1 ? size + (list.size() % size) : size;
            int i3 = i2 * size;
            List<IndexedRecord> subList = list.subList(i3, i3 + size2);
            hashSet.add(build.getLogFile());
            build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, subList, hashMap));
            i2++;
        }
        build.close();
        return hashSet;
    }

    private static List<IndexedRecord> getRecords(HoodieDataBlock hoodieDataBlock) {
        ClosableIterator recordIterator = hoodieDataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO);
        ArrayList arrayList = new ArrayList();
        recordIterator.forEachRemaining(hoodieRecord -> {
            arrayList.add(hoodieRecord.getData());
        });
        return arrayList;
    }

    private static List<IndexedRecord> sort(List<IndexedRecord> list) {
        ArrayList arrayList = new ArrayList(list);
        arrayList.sort(Comparator.comparing(indexedRecord -> {
            return ((GenericRecord) indexedRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
        }));
        return arrayList;
    }

    private HoodieLogFormat.Reader createCorruptedFile(String str) throws Exception {
        HoodieLogFormat.Writer build = HoodieLogFormat.newWriterBuilder().onParentPath(this.partitionPath).withFileExtension(".log").withFileId(str).overBaseCommit("100").withStorage(storage).build();
        List generateTestRecords = SchemaTestUtil.generateTestRecords(0, 100);
        HashMap hashMap = new HashMap();
        hashMap.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
        hashMap.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, SchemaTestUtil.getSimpleSchema().toString());
        build.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, generateTestRecords, hashMap));
        build.close();
        FSDataOutputStream append = storage.append(build.getLogFile().getPath());
        append.write(HoodieLogFormat.MAGIC);
        append.writeLong(473L);
        append.writeInt(1);
        append.writeInt(10000);
        append.writeLong(400L);
        append.write(StringUtils.getUTF8Bytes("something-random"));
        append.flush();
        append.close();
        HoodieLogFormat.Reader newReader = HoodieLogFormat.newReader(storage, build.getLogFile(), SchemaTestUtil.getSimpleSchema());
        Assertions.assertTrue(newReader.hasNext(), "First block should be available");
        newReader.next();
        return newReader;
    }

    private void checkLogBlocksAndKeys(String str, Schema schema, ExternalSpillableMap.DiskMapType diskMapType, boolean z, boolean z2, int i, int i2, Option<Set<String>> option) throws IOException {
        HoodieMergedLogRecordScanner build = HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(this.basePath).withLogFilePaths((List) FSUtils.getAllLogFiles(storage, this.partitionPath, "test-fileid1", ".log", "100").map(hoodieLogFile -> {
            return hoodieLogFile.getPath().toString();
        }).collect(Collectors.toList())).withReaderSchema(schema).withLatestInstantTime(str).withMaxMemorySizeInBytes(10240L).withReverseReader(false).withBufferSize(BUFFER_SIZE).withSpillableMapBasePath(this.spillableBasePath).withDiskMapType(diskMapType).withBitCaskDiskMapCompressionEnabled(z).withOptimizedLogBlocksScan(z2).build();
        Throwable th = null;
        try {
            try {
                Assertions.assertEquals(i, build.getTotalLogRecords(), "There should be " + i + " records");
                HashSet hashSet = new HashSet();
                build.forEach(hoodieRecord -> {
                    hashSet.add(hoodieRecord.getKey().getRecordKey());
                });
                Assertions.assertEquals(i2, hashSet.size(), "Read should return return all " + i2 + " keys");
                if (option.isPresent()) {
                    Assertions.assertEquals(option.get(), hashSet, "Keys read from log file should match written keys");
                }
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }
}
