package org.apache.flink.table.store.table;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.conversion.RowRowConverter;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.operation.ScanKind;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
import org.apache.flink.table.store.table.source.snapshot.FullStartingScanner;
import org.apache.flink.table.store.table.source.snapshot.InputChangelogFollowUpScanner;
import org.apache.flink.table.store.table.system.AuditLogTable;
import org.apache.flink.table.store.utils.CompatibilityTestUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.function.FunctionWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.class */
public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
    protected static final RowType COMPATIBILITY_ROW_TYPE = RowType.of(new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.BINARY(1).getLogicalType(), DataTypes.VARBINARY(1).getLogicalType()}, new String[]{"pt", "a", "b", "c", "d"});
    protected static final Function<RowData, String> COMPATIBILITY_BATCH_ROW_TO_STRING = rowData -> {
        return rowData.getInt(0) + "|" + rowData.getInt(1) + "|" + rowData.getLong(2) + "|" + new String(rowData.getBinary(3)) + "|" + new String(rowData.getBinary(4));
    };
    protected static final Function<RowData, String> COMPATIBILITY_CHANGELOG_ROW_TO_STRING = rowData -> {
        return rowData.getRowKind().shortString() + " " + COMPATIBILITY_BATCH_ROW_TO_STRING.apply(rowData);
    };

    @Test
    public void testSequenceNumber() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable(configuration -> {
            configuration.set(CoreOptions.SEQUENCE_FIELD, "b");
        });
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 200L));
        newWrite.write(rowData(1, 10, 100L));
        newWrite.write(rowData(1, 11, 101L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowData(1, 11, 55L));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        newWrite.close();
        Assertions.assertThat(getResult(createFileStoreTable.newRead(), createFileStoreTable.newScan().plan().splits(), binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEqualTo(Arrays.asList("1|10|200|binary|varbinary|mapKey:mapVal|multiset", "1|11|101|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testBatchReadWrite() throws Exception {
        writeData();
        FileStoreTable createFileStoreTable = createFileStoreTable();
        List<Split> splits = createFileStoreTable.newScan().plan().splits();
        TableRead newRead = createFileStoreTable.newRead();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEqualTo(Collections.singletonList("1|10|1000|binary|varbinary|mapKey:mapVal|multiset"));
        Assertions.assertThat(getResult(newRead, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING)).isEqualTo(Arrays.asList("2|21|20001|binary|varbinary|mapKey:mapVal|multiset", "2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testBatchProjection() throws Exception {
        writeData();
        FileStoreTable createFileStoreTable = createFileStoreTable();
        List<Split> splits = createFileStoreTable.newScan().plan().splits();
        TableRead withProjection = createFileStoreTable.newRead().withProjection(PROJECTION);
        Assertions.assertThat(getResult(withProjection, splits, binaryRow(1), 0, BATCH_PROJECTED_ROW_TO_STRING)).isEqualTo(Collections.singletonList("1000|10"));
        Assertions.assertThat(getResult(withProjection, splits, binaryRow(2), 0, BATCH_PROJECTED_ROW_TO_STRING)).isEqualTo(Arrays.asList("20001|21", "202|22"));
    }

    @Test
    public void testBatchFilter() throws Exception {
        writeData();
        FileStoreTable createFileStoreTable = createFileStoreTable();
        PredicateBuilder predicateBuilder = new PredicateBuilder(createFileStoreTable.schema().logicalRowType());
        List<Split> splits = createFileStoreTable.newScan().withFilter(PredicateBuilder.and(new Predicate[]{predicateBuilder.equal(2, 201L), predicateBuilder.equal(1, 21)})).plan().splits();
        TableRead newRead = createFileStoreTable.newRead();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEmpty();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING)).isEqualTo(Arrays.asList("2|21|20001|binary|varbinary|mapKey:mapVal|multiset", "2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testStreamingReadWrite() throws Exception {
        writeData();
        FileStoreTable createFileStoreTable = createFileStoreTable();
        List<Split> splits = createFileStoreTable.newScan().withKind(ScanKind.DELTA).plan().splits();
        TableRead newRead = createFileStoreTable.newRead();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING)).isEqualTo(Collections.singletonList("-1|11|1001|binary|varbinary|mapKey:mapVal|multiset"));
        Assertions.assertThat(getResult(newRead, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING)).isEqualTo(Arrays.asList("-2|20|200|binary|varbinary|mapKey:mapVal|multiset", "+2|21|20001|binary|varbinary|mapKey:mapVal|multiset", "+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testStreamingProjection() throws Exception {
        writeData();
        FileStoreTable createFileStoreTable = createFileStoreTable();
        List<Split> splits = createFileStoreTable.newScan().withKind(ScanKind.DELTA).plan().splits();
        TableRead withProjection = createFileStoreTable.newRead().withProjection(PROJECTION);
        Assertions.assertThat(getResult(withProjection, splits, binaryRow(1), 0, STREAMING_PROJECTED_ROW_TO_STRING)).isEqualTo(Collections.singletonList("-1001|11"));
        Assertions.assertThat(getResult(withProjection, splits, binaryRow(2), 0, STREAMING_PROJECTED_ROW_TO_STRING)).isEqualTo(Arrays.asList("-200|20", "+20001|21", "+202|22"));
    }

    @Test
    public void testStreamingFilter() throws Exception {
        writeData();
        FileStoreTable createFileStoreTable = createFileStoreTable();
        PredicateBuilder predicateBuilder = new PredicateBuilder(createFileStoreTable.schema().logicalRowType());
        List<Split> splits = createFileStoreTable.newScan().withKind(ScanKind.DELTA).withFilter(PredicateBuilder.and(new Predicate[]{predicateBuilder.equal(2, 201L), predicateBuilder.equal(1, 21)})).plan().splits();
        TableRead newRead = createFileStoreTable.newRead();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING)).isEmpty();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING)).isEqualTo(Arrays.asList("-2|20|200|binary|varbinary|mapKey:mapVal|multiset", "+2|21|20001|binary|varbinary|mapKey:mapVal|multiset", "+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testStreamingInputChangelog() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable(configuration -> {
            configuration.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT);
        });
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 100L));
        newWrite.write(rowData(1, 20, 200L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
        newWrite.write(rowData(1, 10, 101L));
        newWrite.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 20, 200L));
        newWrite.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 20, 201L));
        newWrite.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 101L));
        newWrite.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 102L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.close();
        Assertions.assertThat(getResult(createFileStoreTable.newRead(), createFileStoreTable.newScan().withKind(ScanKind.CHANGELOG).plan().splits(), binaryRow(1), 0, CHANGELOG_ROW_TO_STRING)).containsExactlyInAnyOrder(new String[]{"+I 1|10|100|binary|varbinary|mapKey:mapVal|multiset", "+I 1|20|200|binary|varbinary|mapKey:mapVal|multiset", "-D 1|10|100|binary|varbinary|mapKey:mapVal|multiset", "+I 1|10|101|binary|varbinary|mapKey:mapVal|multiset", "-U 1|20|200|binary|varbinary|mapKey:mapVal|multiset", "+U 1|20|201|binary|varbinary|mapKey:mapVal|multiset", "-U 1|10|101|binary|varbinary|mapKey:mapVal|multiset", "+U 1|10|102|binary|varbinary|mapKey:mapVal|multiset"});
    }

    @Test
    public void testStreamingFullChangelog() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable(configuration -> {
            configuration.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.FULL_COMPACTION);
        });
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 110L));
        newWrite.write(rowData(1, 20, 120L));
        newWrite.write(rowData(2, 10, 210L));
        newWrite.write(rowData(2, 20, 220L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 2, 10, 210L));
        newWrite.compact(binaryRow(1), 0, true);
        newWrite.compact(binaryRow(2), 0, true);
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        List<Split> splits = createFileStoreTable.newScan().withKind(ScanKind.CHANGELOG).plan().splits();
        TableRead newRead = createFileStoreTable.newRead();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING)).containsExactlyInAnyOrder(new String[]{"+I 1|10|110|binary|varbinary|mapKey:mapVal|multiset", "+I 1|20|120|binary|varbinary|mapKey:mapVal|multiset"});
        Assertions.assertThat(getResult(newRead, splits, binaryRow(2), 0, CHANGELOG_ROW_TO_STRING)).containsExactlyInAnyOrder(new String[]{"+I 2|20|220|binary|varbinary|mapKey:mapVal|multiset"});
        newWrite.write(rowData(1, 30, 130L));
        newWrite.write(rowData(1, 40, 140L));
        newWrite.write(rowData(2, 30, 230L));
        newWrite.write(rowData(2, 40, 240L));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 1, 40, 140L));
        newWrite.write(rowData(2, 40, 241L));
        newWrite.compact(binaryRow(1), 0, true);
        newWrite.compact(binaryRow(2), 0, true);
        newCommit.commit(2L, newWrite.prepareCommit(true, 2L));
        List<Split> splits2 = createFileStoreTable.newScan().withKind(ScanKind.CHANGELOG).plan().splits();
        Assertions.assertThat(getResult(newRead, splits2, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING)).containsExactlyInAnyOrder(new String[]{"+I 1|30|130|binary|varbinary|mapKey:mapVal|multiset"});
        Assertions.assertThat(getResult(newRead, splits2, binaryRow(2), 0, CHANGELOG_ROW_TO_STRING)).containsExactlyInAnyOrder(new String[]{"+I 2|30|230|binary|varbinary|mapKey:mapVal|multiset", "+I 2|40|241|binary|varbinary|mapKey:mapVal|multiset"});
        newWrite.write(rowData(1, 20, 121L));
        newWrite.write(rowData(1, 30, 131L));
        newWrite.write(rowData(2, 30, 231L));
        newCommit.commit(3L, newWrite.prepareCommit(true, 3L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 1, 20, 121L));
        newWrite.write(rowData(1, 30, 132L));
        newWrite.write(rowData(1, 40, 141L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 2, 20, 220L));
        newWrite.write(rowData(2, 20, 221L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 2, 20, 221L));
        newWrite.write(rowData(2, 40, 242L));
        newWrite.compact(binaryRow(1), 0, true);
        newWrite.compact(binaryRow(2), 0, true);
        newCommit.commit(4L, newWrite.prepareCommit(true, 4L));
        List<Split> splits3 = createFileStoreTable.newScan().withKind(ScanKind.CHANGELOG).plan().splits();
        Assertions.assertThat(getResult(newRead, splits3, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING)).containsExactlyInAnyOrder(new String[]{"-D 1|20|120|binary|varbinary|mapKey:mapVal|multiset", "-U 1|30|130|binary|varbinary|mapKey:mapVal|multiset", "+U 1|30|132|binary|varbinary|mapKey:mapVal|multiset", "+I 1|40|141|binary|varbinary|mapKey:mapVal|multiset"});
        Assertions.assertThat(getResult(newRead, splits3, binaryRow(2), 0, CHANGELOG_ROW_TO_STRING)).containsExactlyInAnyOrder(new String[]{"-D 2|20|220|binary|varbinary|mapKey:mapVal|multiset", "-U 2|30|230|binary|varbinary|mapKey:mapVal|multiset", "+U 2|30|231|binary|varbinary|mapKey:mapVal|multiset", "-U 2|40|241|binary|varbinary|mapKey:mapVal|multiset", "+U 2|40|242|binary|varbinary|mapKey:mapVal|multiset"});
    }

    @Test
    public void testStreamingChangelogCompatibility02() throws Exception {
        CompatibilityTestUtils.unzip("compatibility/table-changelog-0.2.zip", this.tablePath.getPath());
        FileStoreTable createFileStoreTable = createFileStoreTable(configuration -> {
            configuration.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT);
        }, COMPATIBILITY_ROW_TYPE);
        List asList = Arrays.asList(Arrays.asList(Arrays.asList("+I 1|10|100|binary|varbinary", "+I 1|20|200|binary|varbinary", "-D 1|10|100|binary|varbinary", "+I 1|10|101|binary|varbinary", "-U 1|10|101|binary|varbinary", "+U 1|10|102|binary|varbinary"), Collections.singletonList("+I 2|10|300|binary|varbinary")), Arrays.asList(Collections.singletonList("-D 1|20|200|binary|varbinary"), Arrays.asList("-U 2|10|300|binary|varbinary", "+U 2|10|301|binary|varbinary", "+I 2|20|400|binary|varbinary")), Arrays.asList(Arrays.asList("-U 1|10|102|binary|varbinary", "+U 1|10|103|binary|varbinary", "+I 1|20|201|binary|varbinary"), Collections.singletonList("-D 2|10|301|binary|varbinary")));
        ContinuousDataFileSnapshotEnumerator continuousDataFileSnapshotEnumerator = new ContinuousDataFileSnapshotEnumerator(this.tablePath, createFileStoreTable.newScan(), new FullStartingScanner(), new InputChangelogFollowUpScanner(), 1L);
        FunctionWithException functionWithException = num -> {
            DataTableScan.DataFilePlan enumerate = continuousDataFileSnapshotEnumerator.enumerate();
            Assertions.assertThat(enumerate).isNotNull();
            List<Split> splits = enumerate.splits();
            TableRead newRead = createFileStoreTable.newRead();
            for (int i = 0; i < 2; i++) {
                Assertions.assertThat(getResult(newRead, splits, binaryRow(i + 1), 0, COMPATIBILITY_CHANGELOG_ROW_TO_STRING)).isEqualTo(((List) asList.get(num.intValue())).get(i));
            }
            return null;
        };
        for (int i = 0; i < 2; i++) {
            functionWithException.apply(Integer.valueOf(i));
        }
        Assertions.assertThat(continuousDataFileSnapshotEnumerator.enumerate()).isNull();
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 102L));
        newWrite.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 103L));
        newWrite.write(rowDataWithKind(RowKind.INSERT, 1, 20, 201L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 2, 10, 301L));
        newCommit.commit(2L, newWrite.prepareCommit(true, 2L));
        newWrite.close();
        functionWithException.apply(2);
        Assertions.assertThat(continuousDataFileSnapshotEnumerator.enumerate()).isNull();
    }

    private void writeData() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 100L));
        newWrite.write(rowData(2, 20, 200L));
        newWrite.write(rowData(1, 11, 101L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowData(1, 10, 1000L));
        newWrite.write(rowData(2, 21, 201L));
        newWrite.write(rowData(2, 21, 2001L));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        newWrite.write(rowData(1, 11, 1001L));
        newWrite.write(rowData(2, 21, 20001L));
        newWrite.write(rowData(2, 22, 202L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 1, 11, 1001L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 2, 20, 200L));
        newCommit.commit(2L, newWrite.prepareCommit(true, 2L));
        newWrite.close();
    }

    @Override // org.apache.flink.table.store.table.FileStoreTableTestBase
    @Test
    public void testReadFilter() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 100L));
        newWrite.write(rowData(1, 20, 200L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowData(1, 30, 300L));
        newWrite.write(rowData(1, 40, 400L));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        newWrite.write(rowData(1, 50, 500L));
        newWrite.write(rowData(1, 60, 600L));
        newCommit.commit(2L, newWrite.prepareCommit(true, 2L));
        PredicateBuilder predicateBuilder = new PredicateBuilder(ROW_TYPE);
        List<Split> splits = createFileStoreTable.newScan().plan().splits();
        Assertions.assertThat(getResult(createFileStoreTable.newRead().withFilter(predicateBuilder.equal(1, 30)), splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).hasSameElementsAs(Arrays.asList("1|30|300|binary|varbinary|mapKey:mapVal|multiset", "1|40|400|binary|varbinary|mapKey:mapVal|multiset"));
        Assertions.assertThat(getResult(createFileStoreTable.newRead().withFilter(predicateBuilder.equal(2, 300L)), splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).hasSameElementsAs(Arrays.asList("1|30|300|binary|varbinary|mapKey:mapVal|multiset", "1|40|400|binary|varbinary|mapKey:mapVal|multiset"));
        Assertions.assertThat(getResult(createFileStoreTable.newRead().withFilter(PredicateBuilder.or(new Predicate[]{predicateBuilder.equal(1, 10), predicateBuilder.equal(2, 300L)})), splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).hasSameElementsAs(Arrays.asList("1|10|100|binary|varbinary|mapKey:mapVal|multiset", "1|20|200|binary|varbinary|mapKey:mapVal|multiset", "1|30|300|binary|varbinary|mapKey:mapVal|multiset", "1|40|400|binary|varbinary|mapKey:mapVal|multiset"));
        newWrite.write(rowData(1, 60, 500L));
        newWrite.write(rowData(1, 10, 10L));
        newCommit.commit(3L, newWrite.prepareCommit(true, 3L));
        newWrite.close();
        Assertions.assertThat(getResult(createFileStoreTable.newRead().withFilter(predicateBuilder.equal(2, 600L)), createFileStoreTable.newScan().plan().splits(), binaryRow(1), 0, BATCH_ROW_TO_STRING)).hasSameElementsAs(Arrays.asList("1|10|10|binary|varbinary|mapKey:mapVal|multiset", "1|20|200|binary|varbinary|mapKey:mapVal|multiset", "1|30|300|binary|varbinary|mapKey:mapVal|multiset", "1|40|400|binary|varbinary|mapKey:mapVal|multiset", "1|50|500|binary|varbinary|mapKey:mapVal|multiset", "1|60|500|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testPartialUpdateIgnoreDelete() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable(configuration -> {
            configuration.set(CoreOptions.MERGE_ENGINE, CoreOptions.MergeEngine.PARTIAL_UPDATE);
            configuration.set(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE, true);
        });
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 200L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
        newWrite.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 100L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 1, 20, 400L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.close();
        Assertions.assertThat(getResult(createFileStoreTable.newRead(), createFileStoreTable.newScan().plan().splits(), binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEqualTo(Collections.singletonList("1|10|200|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testSlowCommit() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 100L));
        newWrite.write(rowData(1, 20, 200L));
        List prepareCommit = newWrite.prepareCommit(false, 0L);
        newWrite.write(rowData(2, 10, 300L));
        List prepareCommit2 = newWrite.prepareCommit(false, 1L);
        newWrite.write(rowData(1, 20, 201L));
        List prepareCommit3 = newWrite.prepareCommit(true, 2L);
        newCommit.commit(0L, prepareCommit);
        newCommit.commit(1L, prepareCommit2);
        newCommit.commit(2L, prepareCommit3);
        newWrite.close();
        List<Split> splits = createFileStoreTable.newScan().plan().splits();
        TableRead newRead = createFileStoreTable.newRead();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEqualTo(Arrays.asList("1|10|100|binary|varbinary|mapKey:mapVal|multiset", "1|20|201|binary|varbinary|mapKey:mapVal|multiset"));
        Assertions.assertThat(getResult(newRead, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING)).isEqualTo(Collections.singletonList("2|10|300|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testIncrementalScanOverwrite() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 100L));
        newWrite.write(rowData(1, 20, 200L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        DataTableScan withKind = createFileStoreTable.newScan().withKind(ScanKind.DELTA);
        List list = withKind.plan().splits;
        Assertions.assertThat(list).hasSize(1);
        Assertions.assertThat(((DataSplit) list.get(0)).files()).hasSize(1);
        newWrite.write(rowData(1, 10, 1000L));
        newWrite.write(rowData(1, 20, 2000L));
        HashMap hashMap = new HashMap();
        hashMap.put("pt", "1");
        newCommit.withOverwritePartition(hashMap);
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        List list2 = withKind.plan().splits;
        Assertions.assertThat(list2).hasSize(1);
        Assertions.assertThat(((DataSplit) list2.get(0)).files()).hasSize(1);
        Assertions.assertThat(((DataFileMeta) ((DataSplit) list2.get(0)).files().get(0)).fileName()).isNotEqualTo(((DataFileMeta) ((DataSplit) list.get(0)).files().get(0)).fileName());
    }

    @Test
    public void testAuditLog() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable(configuration -> {
            configuration.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT);
        });
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowDataWithKind(RowKind.INSERT, 1, 10, 100L));
        newWrite.write(rowDataWithKind(RowKind.INSERT, 2, 20, 200L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 30, 300L));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        newWrite.close();
        newCommit.close();
        AuditLogTable auditLogTable = new AuditLogTable(createFileStoreTable);
        RowRowConverter create = RowRowConverter.create(DataTypes.ROW(new DataType[]{DataTypes.STRING(), DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()}));
        Function<RowData, String> function = rowData -> {
            return create.toExternal(rowData).toString();
        };
        PredicateBuilder predicateBuilder = new PredicateBuilder(auditLogTable.rowType());
        Assertions.assertThat(getResult(auditLogTable.newRead(), auditLogTable.newScan().plan().splits(), function)).containsExactlyInAnyOrder(new String[]{"+I[+I, 2, 20, 200]", "+I[+U, 1, 30, 300]", "+I[+I, 1, 10, 100]"});
        Predicate equal = predicateBuilder.equal(0, StringData.fromString("+I"));
        Assertions.assertThat(getResult(auditLogTable.newRead().withFilter(equal), auditLogTable.newScan().withFilter(equal).plan().splits(), function)).containsExactlyInAnyOrder(new String[]{"+I[+I, 2, 20, 200]", "+I[+U, 1, 30, 300]", "+I[+I, 1, 10, 100]"});
        Assertions.assertThat(getResult(auditLogTable.newRead().withFilter(predicateBuilder.equal(2, 10)), auditLogTable.newScan().withFilter(predicateBuilder.equal(2, 10)).plan().splits(), function)).containsExactlyInAnyOrder(new String[]{"+I[+I, 1, 10, 100]"});
        DataTableScan newScan = auditLogTable.newScan();
        TableRead withProjection = auditLogTable.newRead().withProjection(new int[]{2, 0, 1});
        RowRowConverter create2 = RowRowConverter.create(DataTypes.ROW(new DataType[]{DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()}));
        Assertions.assertThat(getResult(withProjection, newScan.plan().splits(), rowData2 -> {
            return create2.toExternal(rowData2).toString();
        })).containsExactlyInAnyOrder(new String[]{"+I[20, +I, 2]", "+I[30, +U, 1]", "+I[10, +I, 1]"});
        DataTableScan newScan2 = auditLogTable.newScan();
        TableRead withProjection2 = auditLogTable.newRead().withProjection(new int[]{2, 1});
        RowRowConverter create3 = RowRowConverter.create(DataTypes.ROW(new DataType[]{DataTypes.INT(), DataTypes.INT()}));
        Assertions.assertThat(getResult(withProjection2, newScan2.plan().splits(), rowData3 -> {
            return create3.toExternal(rowData3).toString();
        })).containsExactlyInAnyOrder(new String[]{"+I[20, 2]", "+I[30, 1]", "+I[10, 1]"});
    }

    @Override // org.apache.flink.table.store.table.FileStoreTableTestBase
    protected FileStoreTable createFileStoreTable(Consumer<Configuration> consumer) throws Exception {
        return createFileStoreTable(consumer, ROW_TYPE);
    }

    private FileStoreTable createFileStoreTable(Consumer<Configuration> consumer, RowType rowType) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CoreOptions.PATH, this.tablePath.toString());
        configuration.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
        consumer.accept(configuration);
        return new ChangelogWithKeyFileStoreTable(this.tablePath, new SchemaManager(this.tablePath).commitNewVersion(new UpdateSchema(rowType, Collections.singletonList("pt"), Arrays.asList("pt", "a"), configuration.toMap(), "")));
    }
}
