/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utils;

import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import junit.framework.TestCase;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.data.writer.BinaryRowWriter;
import org.apache.flink.table.data.writer.BinaryWriter;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;

public class TestData {
    public static List<RowData> DATA_SET_INSERT = Arrays.asList(TestData.insertRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 23, TimestampData.fromEpochMillis((long)1L), StringData.fromString((String)"par1")), TestData.insertRow(StringData.fromString((String)"id2"), StringData.fromString((String)"Stephen"), 33, TimestampData.fromEpochMillis((long)2L), StringData.fromString((String)"par1")), TestData.insertRow(StringData.fromString((String)"id3"), StringData.fromString((String)"Julian"), 53, TimestampData.fromEpochMillis((long)3L), StringData.fromString((String)"par2")), TestData.insertRow(StringData.fromString((String)"id4"), StringData.fromString((String)"Fabian"), 31, TimestampData.fromEpochMillis((long)4L), StringData.fromString((String)"par2")), TestData.insertRow(StringData.fromString((String)"id5"), StringData.fromString((String)"Sophia"), 18, TimestampData.fromEpochMillis((long)5L), StringData.fromString((String)"par3")), TestData.insertRow(StringData.fromString((String)"id6"), StringData.fromString((String)"Emma"), 20, TimestampData.fromEpochMillis((long)6L), StringData.fromString((String)"par3")), TestData.insertRow(StringData.fromString((String)"id7"), StringData.fromString((String)"Bob"), 44, TimestampData.fromEpochMillis((long)7L), StringData.fromString((String)"par4")), TestData.insertRow(StringData.fromString((String)"id8"), StringData.fromString((String)"Han"), 56, TimestampData.fromEpochMillis((long)8L), StringData.fromString((String)"par4")));
    public static List<RowData> DATA_SET_UPDATE_INSERT = Arrays.asList(TestData.insertRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 24, TimestampData.fromEpochMillis((long)1L), StringData.fromString((String)"par1")), TestData.insertRow(StringData.fromString((String)"id2"), StringData.fromString((String)"Stephen"), 34, TimestampData.fromEpochMillis((long)2L), StringData.fromString((String)"par1")), TestData.insertRow(StringData.fromString((String)"id3"), StringData.fromString((String)"Julian"), 54, TimestampData.fromEpochMillis((long)3L), StringData.fromString((String)"par2")), TestData.insertRow(StringData.fromString((String)"id4"), StringData.fromString((String)"Fabian"), 32, TimestampData.fromEpochMillis((long)4L), StringData.fromString((String)"par2")), TestData.insertRow(StringData.fromString((String)"id5"), StringData.fromString((String)"Sophia"), 18, TimestampData.fromEpochMillis((long)5L), StringData.fromString((String)"par3")), TestData.insertRow(StringData.fromString((String)"id9"), StringData.fromString((String)"Jane"), 19, TimestampData.fromEpochMillis((long)6L), StringData.fromString((String)"par3")), TestData.insertRow(StringData.fromString((String)"id10"), StringData.fromString((String)"Ella"), 38, TimestampData.fromEpochMillis((long)7L), StringData.fromString((String)"par4")), TestData.insertRow(StringData.fromString((String)"id11"), StringData.fromString((String)"Phoebe"), 52, TimestampData.fromEpochMillis((long)8L), StringData.fromString((String)"par4")));
    public static List<RowData> DATA_SET_INSERT_SEPARATE_PARTITION = Arrays.asList(TestData.insertRow(StringData.fromString((String)"id12"), StringData.fromString((String)"Monica"), 27, TimestampData.fromEpochMillis((long)9L), StringData.fromString((String)"par5")), TestData.insertRow(StringData.fromString((String)"id13"), StringData.fromString((String)"Phoebe"), 31, TimestampData.fromEpochMillis((long)10L), StringData.fromString((String)"par5")), TestData.insertRow(StringData.fromString((String)"id14"), StringData.fromString((String)"Rachel"), 52, TimestampData.fromEpochMillis((long)11L), StringData.fromString((String)"par6")), TestData.insertRow(StringData.fromString((String)"id15"), StringData.fromString((String)"Ross"), 29, TimestampData.fromEpochMillis((long)12L), StringData.fromString((String)"par6")));
    public static List<RowData> DATA_SET_INSERT_DUPLICATES = new ArrayList<RowData>();
    public static List<RowData> DATA_SET_INSERT_SAME_KEY;
    public static List<RowData> DATA_SET_SOURCE_INSERT;
    public static List<RowData> DATA_SET_SOURCE_INSERT_FIRST_COMMIT;
    public static List<RowData> DATA_SET_SOURCE_INSERT_LATEST_COMMIT;
    public static List<RowData> DATA_SET_SOURCE_MERGED;
    public static List<RowData> DATA_SET_SOURCE_INSERT_OVERWRITE;
    public static List<RowData> DATA_SET_UPDATE_DELETE;
    public static List<RowData> DATA_SET_INSERT_UPDATE_DELETE;
    public static List<RowData> DATA_SET_SINGLE_INSERT;
    public static List<RowData> DATA_SET_DISORDER_INSERT;
    public static List<RowData> DATA_SET_DISORDER_UPDATE_DELETE;

    public static List<RowData> dataSetInsert(int ... ids) {
        ArrayList<RowData> inserts = new ArrayList<RowData>();
        Arrays.stream(ids).forEach(i -> inserts.add((RowData)TestData.insertRow(StringData.fromString((String)("id" + i)), StringData.fromString((String)"Danny"), 23, TimestampData.fromEpochMillis((long)i), StringData.fromString((String)"par1"))));
        return inserts;
    }

    public static List<RowData> filterOddRows(List<RowData> rows) {
        return TestData.filterRowsByIndexPredicate(rows, i -> i % 2 != 0);
    }

    public static List<RowData> filterEvenRows(List<RowData> rows) {
        return TestData.filterRowsByIndexPredicate(rows, i -> i % 2 == 0);
    }

    private static List<RowData> filterRowsByIndexPredicate(List<RowData> rows, Predicate<Integer> predicate) {
        ArrayList<RowData> filtered = new ArrayList<RowData>();
        for (int i = 0; i < rows.size(); ++i) {
            if (!predicate.test(i)) continue;
            filtered.add(rows.get(i));
        }
        return filtered;
    }

    private static Integer toIdSafely(Object id) {
        if (id == null) {
            return -1;
        }
        String idStr = id.toString();
        if (idStr.startsWith("id")) {
            return Integer.parseInt(idStr.substring(2));
        }
        return -1;
    }

    public static String rowDataToString(List<RowData> rows) {
        DataStructureConverter converter = DataStructureConverters.getConverter((DataType)TestConfigurations.ROW_DATA_TYPE);
        return rows.stream().sorted(Comparator.comparing(o -> TestData.toIdSafely(o.getString(0)))).map(row -> converter.toExternal(row).toString()).collect(Collectors.toList()).toString();
    }

    public static void writeData(List<RowData> dataBuffer, Configuration conf) throws Exception {
        StreamWriteFunctionWrapper<RowData> funcWrapper = new StreamWriteFunctionWrapper<RowData>(conf.getString(FlinkOptions.PATH), conf);
        funcWrapper.openFunction();
        for (RowData rowData : dataBuffer) {
            funcWrapper.invoke(rowData);
        }
        funcWrapper.checkpointFunction(1L);
        OperatorEvent nextEvent = funcWrapper.getNextEvent();
        funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
        funcWrapper.checkpointComplete(1L);
        funcWrapper.close();
    }

    public static void writeDataAsBatch(List<RowData> dataBuffer, Configuration conf) throws Exception {
        StreamWriteFunctionWrapper<RowData> funcWrapper = new StreamWriteFunctionWrapper<RowData>(conf.getString(FlinkOptions.PATH), conf);
        funcWrapper.openFunction();
        for (RowData rowData : dataBuffer) {
            funcWrapper.invoke(rowData);
        }
        funcWrapper.endInput();
        OperatorEvent nextEvent = funcWrapper.getNextEvent();
        funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
        funcWrapper.close();
    }

    private static String toStringSafely(Object obj) {
        return obj == null ? "null" : obj.toString();
    }

    public static void assertRowsEquals(List<Row> rows, String expected) {
        TestData.assertRowsEquals(rows, expected, false);
    }

    public static void assertRowsEquals(List<Row> rows, String expected, boolean withChangeFlag) {
        String rowsString = rows.stream().sorted(Comparator.comparing(o -> TestData.toStringSafely(o.getField(0)))).map(row -> {
            String rowStr = row.toString();
            if (withChangeFlag) {
                return row.getKind().shortString() + "(" + rowStr + ")";
            }
            return rowStr;
        }).collect(Collectors.toList()).toString();
        MatcherAssert.assertThat((Object)rowsString, (Matcher)CoreMatchers.is((Object)expected));
    }

    public static void assertRowsEquals(List<Row> rows, String expected, int orderingPos) {
        String rowsString = rows.stream().sorted(Comparator.comparing(o -> TestData.toStringSafely(o.getField(orderingPos)))).collect(Collectors.toList()).toString();
        MatcherAssert.assertThat((Object)rowsString, (Matcher)CoreMatchers.is((Object)expected));
    }

    public static void assertRowsEquals(List<Row> rows, List<RowData> expected) {
        String rowsString = rows.stream().sorted(Comparator.comparing(o -> TestData.toIdSafely(o.getField(0)))).collect(Collectors.toList()).toString();
        MatcherAssert.assertThat((Object)rowsString, (Matcher)CoreMatchers.is((Object)TestData.rowDataToString(expected)));
    }

    public static void assertRowDataEquals(List<RowData> rows, String expected) {
        String rowsString = TestData.rowDataToString(rows);
        MatcherAssert.assertThat((Object)rowsString, (Matcher)CoreMatchers.is((Object)expected));
    }

    public static void assertRowDataEquals(List<RowData> rows, List<RowData> expected) {
        String rowsString = TestData.rowDataToString(rows);
        MatcherAssert.assertThat((Object)rowsString, (Matcher)CoreMatchers.is((Object)TestData.rowDataToString(expected)));
    }

    public static void assertRowsEqualsUnordered(Collection<Row> expected, Collection<Row> actual) {
        TestCase.assertEquals(new HashSet<Row>(expected), new HashSet<Row>(actual));
    }

    public static void checkWrittenData(File baseFile, Map<String, String> expected) throws IOException {
        TestData.checkWrittenData(baseFile, expected, 4);
    }

    public static void checkWrittenData(File baseFile, Map<String, String> expected, int partitions) throws IOException {
        TestData.checkWrittenData(baseFile, expected, partitions, TestData::filterOutVariables);
    }

    public static void checkWrittenData(File baseFile, Map<String, String> expected, int partitions, Function<GenericRecord, String> extractor) throws IOException {
        assert (baseFile.isDirectory());
        FileFilter filter = file -> !file.getName().startsWith(".");
        File[] partitionDirs = baseFile.listFiles(filter);
        Assertions.assertNotNull((Object)partitionDirs);
        MatcherAssert.assertThat((Object)partitionDirs.length, (Matcher)CoreMatchers.is((Object)partitions));
        for (File partitionDir : partitionDirs) {
            File[] dataFiles = partitionDir.listFiles(filter);
            Assertions.assertNotNull((Object)dataFiles);
            File latestDataFile = Arrays.stream(dataFiles).max(Comparator.comparing(f -> FSUtils.getCommitTime((String)f.getName()))).orElse(dataFiles[0]);
            ParquetReader reader = AvroParquetReader.builder((Path)new Path(latestDataFile.getAbsolutePath())).build();
            ArrayList<String> readBuffer = new ArrayList<String>();
            GenericRecord nextRecord = (GenericRecord)reader.read();
            while (nextRecord != null) {
                readBuffer.add(extractor.apply(nextRecord));
                nextRecord = (GenericRecord)reader.read();
            }
            readBuffer.sort(Comparator.naturalOrder());
            MatcherAssert.assertThat((Object)((Object)readBuffer).toString(), (Matcher)CoreMatchers.is((Object)expected.get(partitionDir.getName())));
        }
    }

    public static void checkWrittenAllData(File baseFile, Map<String, String> expected, int partitions) throws IOException {
        assert (baseFile.isDirectory());
        FileFilter filter = file -> !file.getName().startsWith(".");
        File[] partitionDirs = baseFile.listFiles(filter);
        Assertions.assertNotNull((Object)partitionDirs);
        MatcherAssert.assertThat((Object)partitionDirs.length, (Matcher)CoreMatchers.is((Object)partitions));
        for (File partitionDir : partitionDirs) {
            File[] dataFiles = partitionDir.listFiles(filter);
            Assertions.assertNotNull((Object)dataFiles);
            ArrayList<String> readBuffer = new ArrayList<String>();
            for (File dataFile : dataFiles) {
                ParquetReader reader = AvroParquetReader.builder((Path)new Path(dataFile.getAbsolutePath())).build();
                GenericRecord nextRecord = (GenericRecord)reader.read();
                while (nextRecord != null) {
                    readBuffer.add(TestData.filterOutVariables(nextRecord));
                    nextRecord = (GenericRecord)reader.read();
                }
            }
            readBuffer.sort(Comparator.naturalOrder());
            MatcherAssert.assertThat((Object)((Object)readBuffer).toString(), (Matcher)CoreMatchers.is((Object)expected.get(partitionDir.getName())));
        }
    }

    public static void checkWrittenDataCOW(File basePath, Map<String, List<String>> expected) throws IOException {
        TestData.checkWrittenDataCOW(basePath, expected, TestData::filterOutVariables);
    }

    public static void checkWrittenDataCOW(File basePath, Map<String, List<String>> expected, Function<GenericRecord, String> extractor) throws IOException {
        HoodieTableMetaClient metaClient = HoodieTestUtils.init((String)basePath.toURI().toString());
        HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.toURI().toString()).build();
        HoodieFlinkTable table = HoodieFlinkTable.create((HoodieWriteConfig)config, (HoodieFlinkEngineContext)HoodieFlinkEngineContext.DEFAULT, (HoodieTableMetaClient)metaClient);
        expected.forEach((partition, partitionDataSet) -> {
            ArrayList readBuffer = new ArrayList();
            table.getBaseFileOnlyView().getLatestBaseFiles(partition).forEach(baseFile -> {
                String path = baseFile.getPath();
                try {
                    ParquetReader reader = AvroParquetReader.builder((Path)new Path(path)).build();
                    GenericRecord nextRecord = (GenericRecord)reader.read();
                    while (nextRecord != null) {
                        readBuffer.add(extractor.apply(nextRecord));
                        nextRecord = (GenericRecord)reader.read();
                    }
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
            MatcherAssert.assertThat((String)("Unexpected records number under partition: " + partition), (Object)readBuffer.size(), (Matcher)CoreMatchers.is((Object)partitionDataSet.size()));
            for (String record : readBuffer) {
                Assertions.assertTrue((boolean)partitionDataSet.contains(record), (String)("Unexpected record: " + record));
            }
        });
    }

    public static void checkWrittenDataMOR(FileSystem fs, File baseFile, Map<String, String> expected, int partitions) throws Exception {
        assert (baseFile.isDirectory()) : "Base path should be a directory";
        String basePath = baseFile.getAbsolutePath();
        File hoodiePropertiesFile = new File(baseFile + "/" + ".hoodie" + "/" + "hoodie.properties");
        assert (hoodiePropertiesFile.exists());
        HoodieWriteConfig config = HoodieWriteConfig.newBuilder().fromFile(hoodiePropertiesFile).withPath(basePath).build();
        HoodieTableMetaClient metaClient = HoodieTestUtils.init((String)basePath, (HoodieTableType)HoodieTableType.MERGE_ON_READ, (Properties)config.getProps());
        HoodieFlinkTable table = HoodieFlinkTable.create((HoodieWriteConfig)config, (HoodieFlinkEngineContext)HoodieFlinkEngineContext.DEFAULT, (HoodieTableMetaClient)metaClient);
        Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
        String latestInstant = (String)metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(null);
        Assertions.assertNotNull((Object)latestInstant, (String)("No completed commit under table path" + basePath));
        File[] partitionDirs = baseFile.listFiles(file -> !file.getName().startsWith(".") && file.isDirectory());
        Assertions.assertNotNull((Object)partitionDirs);
        MatcherAssert.assertThat((String)("The partitions number should be: " + partitions), (Object)partitionDirs.length, (Matcher)CoreMatchers.is((Object)partitions));
        int[] requiredPos = IntStream.range(0, schema.getFields().size()).toArray();
        for (File partitionDir : partitionDirs) {
            ArrayList<String> readBuffer = new ArrayList<String>();
            List fileSlices = table.getSliceView().getLatestMergedFileSlicesBeforeOrOn(partitionDir.getName(), latestInstant).collect(Collectors.toList());
            for (FileSlice fileSlice : fileSlices) {
                HoodieMergedLogRecordScanner scanner = null;
                List<String> logPaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
                if (logPaths.size() > 0) {
                    scanner = TestData.getScanner(fs, basePath, logPaths, schema, latestInstant);
                }
                String baseFilePath = (String)fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
                HashSet<String> keyToSkip = new HashSet<String>();
                if (baseFilePath != null) {
                    GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
                    ParquetReader reader = AvroParquetReader.builder((Path)new Path(baseFilePath)).build();
                    GenericRecord currentRecord = (GenericRecord)reader.read();
                    while (currentRecord != null) {
                        String curKey = currentRecord.get(2).toString();
                        if (scanner != null && scanner.getRecords().containsKey(curKey)) {
                            keyToSkip.add(curKey);
                            HoodieAvroRecord record = (HoodieAvroRecord)scanner.getRecords().get(curKey);
                            Option combineResult = record.getData().combineAndGetUpdateValue((IndexedRecord)currentRecord, schema, (Properties)config.getProps());
                            if (combineResult.isPresent()) {
                                GenericRecord avroRecord = FormatUtils.buildAvroRecordBySchema((IndexedRecord)((IndexedRecord)combineResult.get()), (Schema)schema, (int[])requiredPos, (GenericRecordBuilder)recordBuilder);
                                readBuffer.add(TestData.filterOutVariables(avroRecord));
                            }
                        } else {
                            readBuffer.add(TestData.filterOutVariables(currentRecord));
                        }
                        currentRecord = (GenericRecord)reader.read();
                    }
                }
                if (scanner == null) continue;
                for (String curKey : scanner.getRecords().keySet()) {
                    Option record;
                    if (keyToSkip.contains(curKey) || !(record = ((HoodieRecordPayload)((HoodieRecord)scanner.getRecords().get(curKey)).getData()).getInsertValue(schema, (Properties)config.getProps())).isPresent()) continue;
                    readBuffer.add(TestData.filterOutVariables((GenericRecord)record.get()));
                }
            }
            readBuffer.sort(String::compareTo);
            MatcherAssert.assertThat((Object)((Object)readBuffer).toString(), (Matcher)CoreMatchers.is((Object)expected.get(partitionDir.getName())));
        }
    }

    private static HoodieMergedLogRecordScanner getScanner(FileSystem fs, String basePath, List<String> logPaths, Schema readSchema, String instant) {
        return HoodieMergedLogRecordScanner.newBuilder().withFileSystem(fs).withBasePath(basePath).withLogFilePaths(logPaths).withReaderSchema(readSchema).withLatestInstantTime(instant).withReadBlocksLazily(false).withReverseReader(false).withBufferSize(0x1000000).withMaxMemorySizeInBytes(Long.valueOf(0x100000L)).withSpillableMapBasePath("/tmp/").withDiskMapType((ExternalSpillableMap.DiskMapType)HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()).withBitCaskDiskMapCompressionEnabled(((Boolean)HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()).booleanValue()).build();
    }

    private static String filterOutVariables(GenericRecord genericRecord) {
        ArrayList<String> fields = new ArrayList<String>();
        fields.add(genericRecord.get("_hoodie_record_key").toString());
        fields.add(genericRecord.get("_hoodie_partition_path").toString());
        fields.add(genericRecord.get("uuid").toString());
        fields.add(genericRecord.get("name").toString());
        fields.add(genericRecord.get("age").toString());
        fields.add(genericRecord.get("ts").toString());
        fields.add(genericRecord.get("partition").toString());
        return String.join((CharSequence)",", fields);
    }

    public static BinaryRowData insertRow(Object ... fields) {
        return TestData.insertRow(TestConfigurations.ROW_TYPE, fields);
    }

    public static BinaryRowData insertRow(RowType rowType, Object ... fields) {
        LogicalType[] types = (LogicalType[])rowType.getFields().stream().map(RowType.RowField::getType).toArray(LogicalType[]::new);
        TestCase.assertEquals((String)"Filed count inconsistent with type information", (int)fields.length, (int)types.length);
        BinaryRowData row = new BinaryRowData(fields.length);
        BinaryRowWriter writer = new BinaryRowWriter(row);
        writer.reset();
        for (int i = 0; i < fields.length; ++i) {
            Object field = fields[i];
            if (field == null) {
                writer.setNullAt(i);
                continue;
            }
            BinaryWriter.write((BinaryWriter)writer, (int)i, (Object)field, (LogicalType)types[i], (TypeSerializer)InternalSerializers.create((LogicalType)types[i]));
        }
        writer.complete();
        return row;
    }

    private static BinaryRowData deleteRow(Object ... fields) {
        BinaryRowData rowData = TestData.insertRow(fields);
        rowData.setRowKind(RowKind.DELETE);
        return rowData;
    }

    private static BinaryRowData updateBeforeRow(Object ... fields) {
        BinaryRowData rowData = TestData.insertRow(fields);
        rowData.setRowKind(RowKind.UPDATE_BEFORE);
        return rowData;
    }

    private static BinaryRowData updateAfterRow(Object ... fields) {
        BinaryRowData rowData = TestData.insertRow(fields);
        rowData.setRowKind(RowKind.UPDATE_AFTER);
        return rowData;
    }

    public static Row row(Object ... values) {
        return Row.of((Object[])values);
    }

    @SafeVarargs
    public static <T> T[] array(T ... values) {
        return values;
    }

    public static <K, V> Map<K, V> map(Object ... kwargs) {
        HashMap<Object, Object> map = new HashMap<Object, Object>();
        for (int i = 0; i < kwargs.length; i += 2) {
            map.put(kwargs[i], kwargs[i + 1]);
        }
        HashMap<Object, Object> resultMap = map;
        return resultMap;
    }

    static {
        IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_DUPLICATES.add((RowData)TestData.insertRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 23, TimestampData.fromEpochMillis((long)1L), StringData.fromString((String)"par1"))));
        DATA_SET_INSERT_SAME_KEY = new ArrayList<RowData>();
        IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_SAME_KEY.add((RowData)TestData.insertRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 23, TimestampData.fromEpochMillis((long)i), StringData.fromString((String)"par1"))));
        DATA_SET_SOURCE_INSERT = Arrays.asList(TestData.insertRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 23, TimestampData.fromEpochMillis((long)1000L), StringData.fromString((String)"par1")), TestData.insertRow(StringData.fromString((String)"id2"), StringData.fromString((String)"Stephen"), 33, TimestampData.fromEpochMillis((long)2000L), StringData.fromString((String)"par1")), TestData.insertRow(StringData.fromString((String)"id3"), StringData.fromString((String)"Julian"), 53, TimestampData.fromEpochMillis((long)3000L), StringData.fromString((String)"par2")), TestData.insertRow(StringData.fromString((String)"id4"), StringData.fromString((String)"Fabian"), 31, TimestampData.fromEpochMillis((long)4000L), StringData.fromString((String)"par2")), TestData.insertRow(StringData.fromString((String)"id5"), StringData.fromString((String)"Sophia"), 18, TimestampData.fromEpochMillis((long)5000L), StringData.fromString((String)"par3")), TestData.insertRow(StringData.fromString((String)"id6"), StringData.fromString((String)"Emma"), 20, TimestampData.fromEpochMillis((long)6000L), StringData.fromString((String)"par3")), TestData.insertRow(StringData.fromString((String)"id7"), StringData.fromString((String)"Bob"), 44, TimestampData.fromEpochMillis((long)7000L), StringData.fromString((String)"par4")), TestData.insertRow(StringData.fromString((String)"id8"), StringData.fromString((String)"Han"), 56, TimestampData.fromEpochMillis((long)8000L), StringData.fromString((String)"par4")));
        DATA_SET_SOURCE_INSERT_FIRST_COMMIT = Arrays.asList(TestData.insertRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 23, TimestampData.fromEpochMillis((long)1000L), StringData.fromString((String)"par1")), TestData.insertRow(StringData.fromString((String)"id2"), StringData.fromString((String)"Stephen"), 33, TimestampData.fromEpochMillis((long)2000L), StringData.fromString((String)"par1")), TestData.insertRow(StringData.fromString((String)"id3"), StringData.fromString((String)"Julian"), 53, TimestampData.fromEpochMillis((long)3000L), StringData.fromString((String)"par2")), TestData.insertRow(StringData.fromString((String)"id4"), StringData.fromString((String)"Fabian"), 31, TimestampData.fromEpochMillis((long)4000L), StringData.fromString((String)"par2")));
        DATA_SET_SOURCE_INSERT_LATEST_COMMIT = Arrays.asList(TestData.insertRow(StringData.fromString((String)"id5"), StringData.fromString((String)"Sophia"), 18, TimestampData.fromEpochMillis((long)5000L), StringData.fromString((String)"par3")), TestData.insertRow(StringData.fromString((String)"id6"), StringData.fromString((String)"Emma"), 20, TimestampData.fromEpochMillis((long)6000L), StringData.fromString((String)"par3")), TestData.insertRow(StringData.fromString((String)"id7"), StringData.fromString((String)"Bob"), 44, TimestampData.fromEpochMillis((long)7000L), StringData.fromString((String)"par4")), TestData.insertRow(StringData.fromString((String)"id8"), StringData.fromString((String)"Han"), 56, TimestampData.fromEpochMillis((long)8000L), StringData.fromString((String)"par4")));
        DATA_SET_SOURCE_MERGED = Arrays.asList(TestData.insertRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 24, TimestampData.fromEpochMillis((long)1000L), StringData.fromString((String)"par1")), TestData.insertRow(StringData.fromString((String)"id2"), StringData.fromString((String)"Stephen"), 34, TimestampData.fromEpochMillis((long)2000L), StringData.fromString((String)"par1")), TestData.insertRow(StringData.fromString((String)"id3"), StringData.fromString((String)"Julian"), 54, TimestampData.fromEpochMillis((long)3000L), StringData.fromString((String)"par2")), TestData.insertRow(StringData.fromString((String)"id4"), StringData.fromString((String)"Fabian"), 32, TimestampData.fromEpochMillis((long)4000L), StringData.fromString((String)"par2")), TestData.insertRow(StringData.fromString((String)"id5"), StringData.fromString((String)"Sophia"), 18, TimestampData.fromEpochMillis((long)5000L), StringData.fromString((String)"par3")), TestData.insertRow(StringData.fromString((String)"id6"), StringData.fromString((String)"Emma"), 20, TimestampData.fromEpochMillis((long)6000L), StringData.fromString((String)"par3")), TestData.insertRow(StringData.fromString((String)"id7"), StringData.fromString((String)"Bob"), 44, TimestampData.fromEpochMillis((long)7000L), StringData.fromString((String)"par4")), TestData.insertRow(StringData.fromString((String)"id8"), StringData.fromString((String)"Han"), 56, TimestampData.fromEpochMillis((long)8000L), StringData.fromString((String)"par4")), TestData.insertRow(StringData.fromString((String)"id9"), StringData.fromString((String)"Jane"), 19, TimestampData.fromEpochMillis((long)6000L), StringData.fromString((String)"par3")), TestData.insertRow(StringData.fromString((String)"id10"), StringData.fromString((String)"Ella"), 38, TimestampData.fromEpochMillis((long)7000L), StringData.fromString((String)"par4")), TestData.insertRow(StringData.fromString((String)"id11"), StringData.fromString((String)"Phoebe"), 52, TimestampData.fromEpochMillis((long)8000L), StringData.fromString((String)"par4")));
        DATA_SET_SOURCE_INSERT_OVERWRITE = Arrays.asList(TestData.insertRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 24, TimestampData.fromEpochMillis((long)1000L), StringData.fromString((String)"par1")), TestData.insertRow(StringData.fromString((String)"id2"), StringData.fromString((String)"Stephen"), 34, TimestampData.fromEpochMillis((long)2000L), StringData.fromString((String)"par1")), TestData.insertRow(StringData.fromString((String)"id3"), StringData.fromString((String)"Julian"), 53, TimestampData.fromEpochMillis((long)3000L), StringData.fromString((String)"par2")), TestData.insertRow(StringData.fromString((String)"id4"), StringData.fromString((String)"Fabian"), 31, TimestampData.fromEpochMillis((long)4000L), StringData.fromString((String)"par2")), TestData.insertRow(StringData.fromString((String)"id5"), StringData.fromString((String)"Sophia"), 18, TimestampData.fromEpochMillis((long)5000L), StringData.fromString((String)"par3")), TestData.insertRow(StringData.fromString((String)"id6"), StringData.fromString((String)"Emma"), 20, TimestampData.fromEpochMillis((long)6000L), StringData.fromString((String)"par3")), TestData.insertRow(StringData.fromString((String)"id7"), StringData.fromString((String)"Bob"), 44, TimestampData.fromEpochMillis((long)7000L), StringData.fromString((String)"par4")), TestData.insertRow(StringData.fromString((String)"id8"), StringData.fromString((String)"Han"), 56, TimestampData.fromEpochMillis((long)8000L), StringData.fromString((String)"par4")));
        DATA_SET_UPDATE_DELETE = Arrays.asList(TestData.insertRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 24, TimestampData.fromEpochMillis((long)1L), StringData.fromString((String)"par1")), TestData.insertRow(StringData.fromString((String)"id2"), StringData.fromString((String)"Stephen"), 34, TimestampData.fromEpochMillis((long)2L), StringData.fromString((String)"par1")), TestData.deleteRow(StringData.fromString((String)"id3"), StringData.fromString((String)"Julian"), 53, TimestampData.fromEpochMillis((long)3L), StringData.fromString((String)"par2")), TestData.deleteRow(StringData.fromString((String)"id5"), StringData.fromString((String)"Sophia"), 18, TimestampData.fromEpochMillis((long)5L), StringData.fromString((String)"par3")), TestData.deleteRow(StringData.fromString((String)"id9"), StringData.fromString((String)"Jane"), 19, TimestampData.fromEpochMillis((long)6L), StringData.fromString((String)"par3")));
        DATA_SET_INSERT_UPDATE_DELETE = Arrays.asList(TestData.insertRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 19, TimestampData.fromEpochMillis((long)1L), StringData.fromString((String)"par1")), TestData.updateBeforeRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 19, TimestampData.fromEpochMillis((long)1L), StringData.fromString((String)"par1")), TestData.updateAfterRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 20, TimestampData.fromEpochMillis((long)2L), StringData.fromString((String)"par1")), TestData.updateBeforeRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 20, TimestampData.fromEpochMillis((long)2L), StringData.fromString((String)"par1")), TestData.updateAfterRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 21, TimestampData.fromEpochMillis((long)3L), StringData.fromString((String)"par1")), TestData.updateBeforeRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 21, TimestampData.fromEpochMillis((long)3L), StringData.fromString((String)"par1")), TestData.updateAfterRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 22, TimestampData.fromEpochMillis((long)4L), StringData.fromString((String)"par1")), TestData.deleteRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 22, TimestampData.fromEpochMillis((long)5L), StringData.fromString((String)"par1")));
        DATA_SET_SINGLE_INSERT = Collections.singletonList(TestData.insertRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 23, TimestampData.fromEpochMillis((long)1L), StringData.fromString((String)"par1")));
        DATA_SET_DISORDER_INSERT = Arrays.asList(TestData.insertRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 23, TimestampData.fromEpochMillis((long)3L), StringData.fromString((String)"par1")), TestData.insertRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 22, TimestampData.fromEpochMillis((long)4L), StringData.fromString((String)"par1")), TestData.insertRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 23, TimestampData.fromEpochMillis((long)2L), StringData.fromString((String)"par1")), TestData.insertRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 23, TimestampData.fromEpochMillis((long)1L), StringData.fromString((String)"par1")));
        DATA_SET_DISORDER_UPDATE_DELETE = Arrays.asList(TestData.updateAfterRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 21, TimestampData.fromEpochMillis((long)3L), StringData.fromString((String)"par1")), TestData.updateAfterRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 20, TimestampData.fromEpochMillis((long)2L), StringData.fromString((String)"par1")), TestData.updateBeforeRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 23, TimestampData.fromEpochMillis((long)1L), StringData.fromString((String)"par1")), TestData.updateBeforeRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 20, TimestampData.fromEpochMillis((long)2L), StringData.fromString((String)"par1")), TestData.updateAfterRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 22, TimestampData.fromEpochMillis((long)4L), StringData.fromString((String)"par1")), TestData.updateBeforeRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 21, TimestampData.fromEpochMillis((long)3L), StringData.fromString((String)"par1")), TestData.deleteRow(StringData.fromString((String)"id1"), StringData.fromString((String)"Danny"), 22, TimestampData.fromEpochMillis((long)2L), StringData.fromString((String)"par1")));
    }
}

