package org.apache.paimon.table;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.WriteMode;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.DataFormatTestUtil;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.system.AuditLogTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CompatibilityTestUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.class */
public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
    protected static final RowType COMPATIBILITY_ROW_TYPE = RowType.of(new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.BINARY(1), DataTypes.VARBINARY(1)}, new String[]{"pt", "a", "b", "c", "d"});
    protected static final Function<InternalRow, String> COMPATIBILITY_BATCH_ROW_TO_STRING = internalRow -> {
        return internalRow.getInt(0) + "|" + internalRow.getInt(1) + "|" + internalRow.getLong(2) + "|" + new String(internalRow.getBinary(3)) + "|" + new String(internalRow.getBinary(4));
    };
    protected static final Function<InternalRow, String> COMPATIBILITY_CHANGELOG_ROW_TO_STRING = internalRow -> {
        return internalRow.getRowKind().shortString() + " " + COMPATIBILITY_BATCH_ROW_TO_STRING.apply(internalRow);
    };

    @Test
    public void testBatchWriteBuilder() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        BatchWriteBuilder newBatchWriteBuilder = createFileStoreTable.newBatchWriteBuilder();
        BatchTableWrite newWrite = newBatchWriteBuilder.newWrite();
        BatchTableCommit newCommit = newBatchWriteBuilder.newCommit();
        newWrite.write(rowData(1, 10, 100L));
        newWrite.write(rowData(1, 11, 101L));
        newCommit.commit(newWrite.prepareCommit());
        newWrite.getClass();
        Assertions.assertThatThrownBy(newWrite::prepareCommit).hasMessageContaining("BatchTableWrite only support one-time committing");
        Assertions.assertThatThrownBy(() -> {
            newCommit.commit(Collections.emptyList());
        }).hasMessageContaining("BatchTableCommit only support one-time committing");
        newWrite.close();
        newCommit.close();
        Assertions.assertThat(getResult(createFileStoreTable.newReadBuilder().newRead(), toSplits(createFileStoreTable.newSnapshotReader().read().dataSplits()), BATCH_ROW_TO_STRING)).isEqualTo(Arrays.asList("1|10|100|binary|varbinary|mapKey:mapVal|multiset", "1|11|101|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testSequenceNumber() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable(options -> {
            options.set(CoreOptions.SEQUENCE_FIELD, "b");
        });
        TableWriteImpl newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommitImpl newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 200L));
        newWrite.write(rowData(1, 10, 100L));
        newWrite.write(rowData(1, 11, 101L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowData(1, 11, 55L));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        newWrite.close();
        Assertions.assertThat(getResult(createFileStoreTable.newRead(), toSplits(createFileStoreTable.newSnapshotReader().read().dataSplits()), binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEqualTo(Arrays.asList("1|10|200|binary|varbinary|mapKey:mapVal|multiset", "1|11|101|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testPaddingSequenceNumber() throws Exception {
        RowType of = RowType.of(new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()}, new String[]{"pt", "a", "b", "sec", "non_time"});
        GenericRow of2 = GenericRow.of(new Object[]{1, 10, 100, 1685530987, BinaryString.fromString("a1")});
        GenericRow of3 = GenericRow.of(new Object[]{1, 10, 101, 1685530987, BinaryString.fromString("a2")});
        TableWriteImpl newWrite = createFileStoreTable(options -> {
            options.set(CoreOptions.SEQUENCE_FIELD, "sec");
            options.set(CoreOptions.SEQUENCE_AUTO_PADDING, CoreOptions.SequenceAutoPadding.SECOND_TO_MICRO.toString());
        }, of).newWrite(this.commitUser);
        long sequenceNumber = ((KeyValue) newWrite.writeAndReturnData(of2)).sequenceNumber();
        long sequenceNumber2 = ((KeyValue) newWrite.writeAndReturnData(of3)).sequenceNumber();
        Assertions.assertThat(TimeUnit.SECONDS.convert(sequenceNumber, TimeUnit.MICROSECONDS)).isEqualTo(1685530987L);
        Assertions.assertThat(TimeUnit.SECONDS.convert(sequenceNumber2, TimeUnit.MICROSECONDS)).isEqualTo(1685530987L);
        newWrite.close();
    }

    @Test
    public void testBatchReadWrite() throws Exception {
        writeData();
        FileStoreTable createFileStoreTable = createFileStoreTable();
        List<Split> splits = toSplits(createFileStoreTable.newSnapshotReader().read().dataSplits());
        InnerTableRead newRead = createFileStoreTable.newRead();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEqualTo(Collections.singletonList("1|10|1000|binary|varbinary|mapKey:mapVal|multiset"));
        Assertions.assertThat(getResult(newRead, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING)).isEqualTo(Arrays.asList("2|21|20001|binary|varbinary|mapKey:mapVal|multiset", "2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testBatchProjection() throws Exception {
        writeData();
        FileStoreTable createFileStoreTable = createFileStoreTable();
        List<Split> splits = toSplits(createFileStoreTable.newSnapshotReader().read().dataSplits());
        InnerTableRead withProjection = createFileStoreTable.newRead().withProjection(PROJECTION);
        Assertions.assertThat(getResult(withProjection, splits, binaryRow(1), 0, BATCH_PROJECTED_ROW_TO_STRING)).isEqualTo(Collections.singletonList("1000|10"));
        Assertions.assertThat(getResult(withProjection, splits, binaryRow(2), 0, BATCH_PROJECTED_ROW_TO_STRING)).isEqualTo(Arrays.asList("20001|21", "202|22"));
    }

    @Test
    public void testBatchFilter() throws Exception {
        writeData();
        FileStoreTable createFileStoreTable = createFileStoreTable();
        PredicateBuilder predicateBuilder = new PredicateBuilder(createFileStoreTable.schema().logicalRowType());
        List<Split> splits = toSplits(createFileStoreTable.newSnapshotReader().withFilter(PredicateBuilder.and(new Predicate[]{predicateBuilder.equal(2, 201L), predicateBuilder.equal(1, 21)})).read().dataSplits());
        InnerTableRead newRead = createFileStoreTable.newRead();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEmpty();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING)).isEqualTo(Arrays.asList("2|21|20001|binary|varbinary|mapKey:mapVal|multiset", "2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testStreamingReadWrite() throws Exception {
        writeData();
        FileStoreTable createFileStoreTable = createFileStoreTable();
        List<Split> splits = toSplits(createFileStoreTable.newSnapshotReader().withKind(ScanKind.DELTA).read().dataSplits());
        InnerTableRead newRead = createFileStoreTable.newRead();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING)).isEqualTo(Collections.singletonList("-1|11|1001|binary|varbinary|mapKey:mapVal|multiset"));
        Assertions.assertThat(getResult(newRead, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING)).isEqualTo(Arrays.asList("-2|20|200|binary|varbinary|mapKey:mapVal|multiset", "+2|21|20001|binary|varbinary|mapKey:mapVal|multiset", "+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testStreamingProjection() throws Exception {
        writeData();
        FileStoreTable createFileStoreTable = createFileStoreTable();
        List<Split> splits = toSplits(createFileStoreTable.newSnapshotReader().withKind(ScanKind.DELTA).read().dataSplits());
        InnerTableRead withProjection = createFileStoreTable.newRead().withProjection(PROJECTION);
        Assertions.assertThat(getResult(withProjection, splits, binaryRow(1), 0, STREAMING_PROJECTED_ROW_TO_STRING)).isEqualTo(Collections.singletonList("-1001|11"));
        Assertions.assertThat(getResult(withProjection, splits, binaryRow(2), 0, STREAMING_PROJECTED_ROW_TO_STRING)).isEqualTo(Arrays.asList("-200|20", "+20001|21", "+202|22"));
    }

    @Test
    public void testStreamingFilter() throws Exception {
        writeData();
        FileStoreTable createFileStoreTable = createFileStoreTable();
        PredicateBuilder predicateBuilder = new PredicateBuilder(createFileStoreTable.schema().logicalRowType());
        List<Split> splits = toSplits(createFileStoreTable.newSnapshotReader().withKind(ScanKind.DELTA).withFilter(PredicateBuilder.and(new Predicate[]{predicateBuilder.equal(2, 201L), predicateBuilder.equal(1, 21)})).read().dataSplits());
        InnerTableRead newRead = createFileStoreTable.newRead();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING)).isEmpty();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING)).isEqualTo(Arrays.asList("-2|20|200|binary|varbinary|mapKey:mapVal|multiset", "+2|21|20001|binary|varbinary|mapKey:mapVal|multiset", "+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testStreamingInputChangelog() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable(options -> {
            options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT);
        });
        TableWriteImpl newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommitImpl newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 100L));
        newWrite.write(rowData(1, 20, 200L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
        newWrite.write(rowData(1, 10, 101L));
        newWrite.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 20, 200L));
        newWrite.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 20, 201L));
        newWrite.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 101L));
        newWrite.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 102L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.close();
        Assertions.assertThat(getResult(createFileStoreTable.newRead(), toSplits(createFileStoreTable.newSnapshotReader().withKind(ScanKind.CHANGELOG).read().dataSplits()), binaryRow(1), 0, CHANGELOG_ROW_TO_STRING)).containsExactlyInAnyOrder(new String[]{"+I 1|10|100|binary|varbinary|mapKey:mapVal|multiset", "+I 1|20|200|binary|varbinary|mapKey:mapVal|multiset", "-D 1|10|100|binary|varbinary|mapKey:mapVal|multiset", "+I 1|10|101|binary|varbinary|mapKey:mapVal|multiset", "-U 1|20|200|binary|varbinary|mapKey:mapVal|multiset", "+U 1|20|201|binary|varbinary|mapKey:mapVal|multiset", "-U 1|10|101|binary|varbinary|mapKey:mapVal|multiset", "+U 1|10|102|binary|varbinary|mapKey:mapVal|multiset"});
    }

    @Test
    public void testStreamingFullChangelog() throws Exception {
        innerTestStreamingFullChangelog(options -> {
        });
    }

    @Test
    public void testStreamingFullChangelogWithSpill() throws Exception {
        innerTestStreamingFullChangelog(options -> {
            options.set(CoreOptions.SORT_SPILL_THRESHOLD, 2);
        });
    }

    private void innerTestStreamingFullChangelog(Consumer<Options> consumer) throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable(options -> {
            options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.FULL_COMPACTION);
            consumer.accept(options);
        });
        TableWriteImpl withIOManager = createFileStoreTable.newWrite(this.commitUser).withIOManager(new IOManagerImpl(new String[]{this.tempDir.toString()}));
        TableCommitImpl newCommit = createFileStoreTable.newCommit(this.commitUser);
        withIOManager.write(rowData(1, 10, 110L));
        withIOManager.write(rowData(1, 20, 120L));
        withIOManager.write(rowData(2, 10, 210L));
        withIOManager.write(rowData(2, 20, 220L));
        withIOManager.write(rowDataWithKind(RowKind.DELETE, 2, 10, 210L));
        withIOManager.compact(binaryRow(1), 0, true);
        withIOManager.compact(binaryRow(2), 0, true);
        newCommit.commit(0L, withIOManager.prepareCommit(true, 0L));
        List<Split> splits = toSplits(createFileStoreTable.newSnapshotReader().withKind(ScanKind.CHANGELOG).read().dataSplits());
        InnerTableRead newRead = createFileStoreTable.newRead();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING)).containsExactlyInAnyOrder(new String[]{"+I 1|10|110|binary|varbinary|mapKey:mapVal|multiset", "+I 1|20|120|binary|varbinary|mapKey:mapVal|multiset"});
        Assertions.assertThat(getResult(newRead, splits, binaryRow(2), 0, CHANGELOG_ROW_TO_STRING)).containsExactlyInAnyOrder(new String[]{"+I 2|20|220|binary|varbinary|mapKey:mapVal|multiset"});
        withIOManager.write(rowData(1, 30, 130L));
        withIOManager.write(rowData(1, 40, 140L));
        withIOManager.write(rowData(2, 30, 230L));
        withIOManager.write(rowData(2, 40, 240L));
        newCommit.commit(1L, withIOManager.prepareCommit(true, 1L));
        withIOManager.write(rowDataWithKind(RowKind.DELETE, 1, 40, 140L));
        withIOManager.write(rowData(2, 40, 241L));
        withIOManager.compact(binaryRow(1), 0, true);
        withIOManager.compact(binaryRow(2), 0, true);
        newCommit.commit(2L, withIOManager.prepareCommit(true, 2L));
        List<Split> splits2 = toSplits(createFileStoreTable.newSnapshotReader().withKind(ScanKind.CHANGELOG).read().dataSplits());
        Assertions.assertThat(getResult(newRead, splits2, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING)).containsExactlyInAnyOrder(new String[]{"+I 1|30|130|binary|varbinary|mapKey:mapVal|multiset"});
        Assertions.assertThat(getResult(newRead, splits2, binaryRow(2), 0, CHANGELOG_ROW_TO_STRING)).containsExactlyInAnyOrder(new String[]{"+I 2|30|230|binary|varbinary|mapKey:mapVal|multiset", "+I 2|40|241|binary|varbinary|mapKey:mapVal|multiset"});
        withIOManager.write(rowData(1, 20, 121L));
        withIOManager.write(rowData(1, 30, 131L));
        withIOManager.write(rowData(2, 30, 231L));
        newCommit.commit(3L, withIOManager.prepareCommit(true, 3L));
        withIOManager.write(rowDataWithKind(RowKind.DELETE, 1, 20, 121L));
        withIOManager.write(rowData(1, 30, 132L));
        withIOManager.write(rowData(1, 40, 141L));
        withIOManager.write(rowDataWithKind(RowKind.DELETE, 2, 20, 220L));
        withIOManager.write(rowData(2, 20, 221L));
        withIOManager.write(rowDataWithKind(RowKind.DELETE, 2, 20, 221L));
        withIOManager.write(rowData(2, 40, 242L));
        withIOManager.compact(binaryRow(1), 0, true);
        withIOManager.compact(binaryRow(2), 0, true);
        newCommit.commit(4L, withIOManager.prepareCommit(true, 4L));
        List<Split> splits3 = toSplits(createFileStoreTable.newSnapshotReader().withKind(ScanKind.CHANGELOG).read().dataSplits());
        Assertions.assertThat(getResult(newRead, splits3, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING)).containsExactlyInAnyOrder(new String[]{"-D 1|20|120|binary|varbinary|mapKey:mapVal|multiset", "-U 1|30|130|binary|varbinary|mapKey:mapVal|multiset", "+U 1|30|132|binary|varbinary|mapKey:mapVal|multiset", "+I 1|40|141|binary|varbinary|mapKey:mapVal|multiset"});
        Assertions.assertThat(getResult(newRead, splits3, binaryRow(2), 0, CHANGELOG_ROW_TO_STRING)).containsExactlyInAnyOrder(new String[]{"-D 2|20|220|binary|varbinary|mapKey:mapVal|multiset", "-U 2|30|230|binary|varbinary|mapKey:mapVal|multiset", "+U 2|30|231|binary|varbinary|mapKey:mapVal|multiset", "-U 2|40|241|binary|varbinary|mapKey:mapVal|multiset", "+U 2|40|242|binary|varbinary|mapKey:mapVal|multiset"});
    }

    @Test
    public void testStreamingChangelogCompatibility02() throws Exception {
        CompatibilityTestUtils.unzip("compatibility/table-changelog-0.2.zip", this.tablePath.getPath());
        FileStoreTable createFileStoreTable = createFileStoreTable(options -> {
            options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT);
        }, COMPATIBILITY_ROW_TYPE);
        List asList = Arrays.asList(Arrays.asList(Arrays.asList("+I 1|10|100|binary|varbinary", "+I 1|20|200|binary|varbinary", "-D 1|10|100|binary|varbinary", "+I 1|10|101|binary|varbinary", "-U 1|10|101|binary|varbinary", "+U 1|10|102|binary|varbinary"), Collections.singletonList("+I 2|10|300|binary|varbinary")), Arrays.asList(Collections.singletonList("-D 1|20|200|binary|varbinary"), Arrays.asList("-U 2|10|300|binary|varbinary", "+U 2|10|301|binary|varbinary", "+I 2|20|400|binary|varbinary")), Arrays.asList(Arrays.asList("-U 1|10|102|binary|varbinary", "+U 1|10|103|binary|varbinary", "+I 1|20|201|binary|varbinary"), Collections.singletonList("-D 2|10|301|binary|varbinary")));
        InnerStreamTableScan newStreamScan = createFileStoreTable.newStreamScan();
        newStreamScan.restore(1L);
        Function function = num -> {
            TableScan.Plan plan = newStreamScan.plan();
            Assertions.assertThat(plan).isNotNull();
            List<Split> splits = plan.splits();
            InnerTableRead newRead = createFileStoreTable.newRead();
            for (int i = 0; i < 2; i++) {
                try {
                    Assertions.assertThat(getResult(newRead, splits, binaryRow(i + 1), 0, COMPATIBILITY_CHANGELOG_ROW_TO_STRING)).isEqualTo(((List) asList.get(num.intValue())).get(i));
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            return null;
        };
        for (int i = 0; i < 2; i++) {
            function.apply(Integer.valueOf(i));
        }
        Assertions.assertThat(newStreamScan.plan().splits()).isEmpty();
        TableWriteImpl newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommitImpl newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 102L));
        newWrite.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 103L));
        newWrite.write(rowDataWithKind(RowKind.INSERT, 1, 20, 201L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 2, 10, 301L));
        newCommit.commit(2L, newWrite.prepareCommit(true, 2L));
        newWrite.close();
        function.apply(2);
        Assertions.assertThat(newStreamScan.plan().splits()).isEmpty();
    }

    private void writeData() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        TableWriteImpl newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommitImpl newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 100L));
        newWrite.write(rowData(2, 20, 200L));
        newWrite.write(rowData(1, 11, 101L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowData(1, 10, 1000L));
        newWrite.write(rowData(2, 21, 201L));
        newWrite.write(rowData(2, 21, 2001L));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        newWrite.write(rowData(1, 11, 1001L));
        newWrite.write(rowData(2, 21, 20001L));
        newWrite.write(rowData(2, 22, 202L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 1, 11, 1001L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 2, 20, 200L));
        newCommit.commit(2L, newWrite.prepareCommit(true, 2L));
        newWrite.close();
    }

    @Override // org.apache.paimon.table.FileStoreTableTestBase
    @Test
    public void testReadFilter() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        if (createFileStoreTable.coreOptions().fileFormat().getFormatIdentifier().equals("parquet")) {
            return;
        }
        TableWriteImpl newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommitImpl newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 100L));
        newWrite.write(rowData(1, 20, 200L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowData(1, 30, 300L));
        newWrite.write(rowData(1, 40, 400L));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        newWrite.write(rowData(1, 50, 500L));
        newWrite.write(rowData(1, 60, 600L));
        newCommit.commit(2L, newWrite.prepareCommit(true, 2L));
        PredicateBuilder predicateBuilder = new PredicateBuilder(ROW_TYPE);
        List<Split> splits = toSplits(createFileStoreTable.newSnapshotReader().read().dataSplits());
        Assertions.assertThat(getResult(createFileStoreTable.newRead().withFilter(predicateBuilder.equal(1, 30)), splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).hasSameElementsAs(Arrays.asList("1|30|300|binary|varbinary|mapKey:mapVal|multiset", "1|40|400|binary|varbinary|mapKey:mapVal|multiset"));
        Assertions.assertThat(getResult(createFileStoreTable.newRead().withFilter(predicateBuilder.equal(2, 300L)), splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).hasSameElementsAs(Arrays.asList("1|30|300|binary|varbinary|mapKey:mapVal|multiset", "1|40|400|binary|varbinary|mapKey:mapVal|multiset"));
        Assertions.assertThat(getResult(createFileStoreTable.newRead().withFilter(PredicateBuilder.or(new Predicate[]{predicateBuilder.equal(1, 10), predicateBuilder.equal(2, 300L)})), splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).hasSameElementsAs(Arrays.asList("1|10|100|binary|varbinary|mapKey:mapVal|multiset", "1|20|200|binary|varbinary|mapKey:mapVal|multiset", "1|30|300|binary|varbinary|mapKey:mapVal|multiset", "1|40|400|binary|varbinary|mapKey:mapVal|multiset"));
        newWrite.write(rowData(1, 60, 500L));
        newWrite.write(rowData(1, 10, 10L));
        newCommit.commit(3L, newWrite.prepareCommit(true, 3L));
        newWrite.close();
        Assertions.assertThat(getResult(createFileStoreTable.newRead().withFilter(predicateBuilder.equal(2, 600L)), toSplits(createFileStoreTable.newSnapshotReader().read().dataSplits()), binaryRow(1), 0, BATCH_ROW_TO_STRING)).hasSameElementsAs(Arrays.asList("1|10|10|binary|varbinary|mapKey:mapVal|multiset", "1|20|200|binary|varbinary|mapKey:mapVal|multiset", "1|30|300|binary|varbinary|mapKey:mapVal|multiset", "1|40|400|binary|varbinary|mapKey:mapVal|multiset", "1|50|500|binary|varbinary|mapKey:mapVal|multiset", "1|60|500|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testPartialUpdateIgnoreDelete() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable(options -> {
            options.set(CoreOptions.MERGE_ENGINE, CoreOptions.MergeEngine.PARTIAL_UPDATE);
            options.set(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE, true);
        });
        TableWriteImpl newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommitImpl newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 200L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
        newWrite.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 100L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 1, 20, 400L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.close();
        Assertions.assertThat(getResult(createFileStoreTable.newRead(), toSplits(createFileStoreTable.newSnapshotReader().read().dataSplits()), binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEqualTo(Collections.singletonList("1|10|200|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testSlowCommit() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        TableWriteImpl newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommitImpl newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 100L));
        newWrite.write(rowData(1, 20, 200L));
        List prepareCommit = newWrite.prepareCommit(false, 0L);
        newWrite.write(rowData(2, 10, 300L));
        List prepareCommit2 = newWrite.prepareCommit(false, 1L);
        newWrite.write(rowData(1, 20, 201L));
        List prepareCommit3 = newWrite.prepareCommit(true, 2L);
        newCommit.commit(0L, prepareCommit);
        newCommit.commit(1L, prepareCommit2);
        newCommit.commit(2L, prepareCommit3);
        newWrite.close();
        List<Split> splits = toSplits(createFileStoreTable.newSnapshotReader().read().dataSplits());
        InnerTableRead newRead = createFileStoreTable.newRead();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEqualTo(Arrays.asList("1|10|100|binary|varbinary|mapKey:mapVal|multiset", "1|20|201|binary|varbinary|mapKey:mapVal|multiset"));
        Assertions.assertThat(getResult(newRead, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING)).isEqualTo(Collections.singletonList("2|10|300|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testIncrementalSplitOverwrite() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        TableWriteImpl newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommitImpl newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 100L));
        newWrite.write(rowData(1, 20, 200L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        SnapshotReader withKind = createFileStoreTable.newSnapshotReader().withKind(ScanKind.DELTA);
        List dataSplits = withKind.read().dataSplits();
        Assertions.assertThat(dataSplits).hasSize(1);
        Assertions.assertThat(((DataSplit) dataSplits.get(0)).dataFiles()).hasSize(1);
        newWrite.write(rowData(1, 10, 1000L));
        newWrite.write(rowData(1, 20, 2000L));
        HashMap hashMap = new HashMap();
        hashMap.put("pt", "1");
        newCommit.withOverwrite(hashMap);
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        List dataSplits2 = withKind.read().dataSplits();
        Assertions.assertThat(dataSplits2).hasSize(1);
        Assertions.assertThat(((DataSplit) dataSplits2.get(0)).dataFiles()).hasSize(1);
        Assertions.assertThat(((DataFileMeta) ((DataSplit) dataSplits2.get(0)).dataFiles().get(0)).fileName()).isNotEqualTo(((DataFileMeta) ((DataSplit) dataSplits.get(0)).dataFiles().get(0)).fileName());
    }

    @Test
    public void testAuditLogWithDefaultValueFields() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable(options -> {
            options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT);
            options.set(String.format("%s.%s.%s", "fields", "b", "default-value"), "0");
        });
        TableWriteImpl newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommitImpl newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowDataWithKind(RowKind.INSERT, 2, 20, 200L));
        newWrite.write(rowDataWithKind(RowKind.INSERT, 2, 21, null));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.close();
        newCommit.close();
        AuditLogTable auditLogTable = new AuditLogTable(createFileStoreTable);
        Function<InternalRow, String> function = internalRow -> {
            return DataFormatTestUtil.internalRowToString(internalRow, DataTypes.ROW(new DataType[]{DataTypes.STRING(), DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()}));
        };
        PredicateBuilder predicateBuilder = new PredicateBuilder(auditLogTable.rowType());
        Assertions.assertThat(getResult(auditLogTable.newRead(), toSplits(auditLogTable.newSnapshotReader().withFilter(PredicateBuilder.and(new Predicate[]{predicateBuilder.equal(predicateBuilder.indexOf("b"), 300), predicateBuilder.equal(predicateBuilder.indexOf("pt"), 2)})).read().dataSplits()), function)).containsExactlyInAnyOrder(new String[]{"+I[+I, 2, 20, 200]", "+I[+I, 2, 21, 0]"});
    }

    @Test
    public void testAuditLog() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable(options -> {
            options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT);
        });
        TableWriteImpl newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommitImpl newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowDataWithKind(RowKind.INSERT, 1, 10, 100L));
        newWrite.write(rowDataWithKind(RowKind.INSERT, 2, 20, 200L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 30, 300L));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        newWrite.close();
        newCommit.close();
        AuditLogTable auditLogTable = new AuditLogTable(createFileStoreTable);
        Function<InternalRow, String> function = internalRow -> {
            return DataFormatTestUtil.internalRowToString(internalRow, DataTypes.ROW(new DataType[]{DataTypes.STRING(), DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()}));
        };
        PredicateBuilder predicateBuilder = new PredicateBuilder(auditLogTable.rowType());
        Assertions.assertThat(getResult(auditLogTable.newRead(), toSplits(auditLogTable.newSnapshotReader().read().dataSplits()), function)).containsExactlyInAnyOrder(new String[]{"+I[+I, 2, 20, 200]", "+I[+U, 1, 30, 300]", "+I[+I, 1, 10, 100]"});
        Predicate equal = predicateBuilder.equal(0, BinaryString.fromString("+I"));
        Assertions.assertThat(getResult(auditLogTable.newRead().withFilter(equal), toSplits(auditLogTable.newSnapshotReader().withFilter(equal).read().dataSplits()), function)).containsExactlyInAnyOrder(new String[]{"+I[+I, 2, 20, 200]", "+I[+U, 1, 30, 300]", "+I[+I, 1, 10, 100]"});
        Assertions.assertThat(getResult(auditLogTable.newRead().withFilter(predicateBuilder.equal(2, 10)), toSplits(auditLogTable.newSnapshotReader().withFilter(predicateBuilder.equal(2, 10)).read().dataSplits()), function)).containsExactlyInAnyOrder(new String[]{"+I[+I, 1, 10, 100]"});
        Assertions.assertThat(getResult(auditLogTable.newRead().withProjection(new int[]{2, 0, 1}), toSplits(auditLogTable.newSnapshotReader().read().dataSplits()), internalRow2 -> {
            return DataFormatTestUtil.internalRowToString(internalRow2, DataTypes.ROW(new DataType[]{DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()}));
        })).containsExactlyInAnyOrder(new String[]{"+I[20, +I, 2]", "+I[30, +U, 1]", "+I[10, +I, 1]"});
        Assertions.assertThat(getResult(auditLogTable.newRead().withProjection(new int[]{2, 1}), toSplits(auditLogTable.newSnapshotReader().read().dataSplits()), internalRow3 -> {
            return DataFormatTestUtil.internalRowToString(internalRow3, DataTypes.ROW(new DataType[]{DataTypes.INT(), DataTypes.INT()}));
        })).containsExactlyInAnyOrder(new String[]{"+I[20, 2]", "+I[30, 1]", "+I[10, 1]"});
    }

    @Test
    public void testAggMergeFunc() throws Exception {
        RowType of = RowType.of(new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT()}, new String[]{"pt", "a", "b", "c"});
        FileStoreTable createFileStoreTable = createFileStoreTable(options -> {
            options.set("merge-engine", "aggregation");
            options.set("fields.b.aggregate-function", "sum");
            options.set("fields.c.aggregate-function", "max");
            options.set("fields.c.ignore-retract", "true");
        }, of);
        Function<InternalRow, String> function = internalRow -> {
            return DataFormatTestUtil.internalRowToString(internalRow, of);
        };
        SnapshotReader newSnapshotReader = createFileStoreTable.newSnapshotReader();
        InnerTableRead newRead = createFileStoreTable.newRead();
        TableWriteImpl newWrite = createFileStoreTable.newWrite("");
        TableCommitImpl newCommit = createFileStoreTable.newCommit("");
        newWrite.write(GenericRow.of(new Object[]{1, 1, 3, 3}));
        newWrite.write(GenericRow.of(new Object[]{1, 1, 1, 1}));
        newWrite.write(GenericRow.of(new Object[]{1, 1, 2, 2}));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        Assertions.assertThat(getResult(newRead, toSplits(newSnapshotReader.read().dataSplits()), function)).containsExactlyInAnyOrder(new String[]{"+I[1, 1, 6, 3]"});
        newWrite.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, new Object[]{1, 1, 1, 1}));
        newWrite.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, new Object[]{1, 1, 2, 2}));
        newWrite.write(GenericRow.ofKind(RowKind.DELETE, new Object[]{1, 1, 1, 1}));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        Assertions.assertThat(getResult(newRead, toSplits(newSnapshotReader.read().dataSplits()), function)).containsExactlyInAnyOrder(new String[]{"+I[1, 1, 2, 3]"});
    }

    @Test
    public void testAggMergeFuncNotAllowRetract() throws Exception {
        TableWriteImpl newWrite = createFileStoreTable(options -> {
            options.set("merge-engine", "aggregation");
            options.set("fields.b.aggregate-function", "sum");
            options.set("fields.c.aggregate-function", "max");
        }, RowType.of(new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT()}, new String[]{"pt", "a", "b", "c"})).newWrite("");
        newWrite.write(GenericRow.of(new Object[]{1, 1, 3, 3}));
        newWrite.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, new Object[]{1, 1, 3, 3}));
        Assertions.assertThatThrownBy(() -> {
            newWrite.prepareCommit(true, 0L);
        }).hasMessageContaining("Aggregate function 'max' does not support retraction, If you allow this function to ignore retraction messages, you can configure 'fields.${field_name}.ignore-retract'='true'");
    }

    @Test
    public void testFullCompactedRead() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "2");
        hashMap.put(CoreOptions.SCAN_MODE.key(), "compacted-full");
        hashMap.put(CoreOptions.BUCKET.key(), "1");
        FileStoreTable copy = createFileStoreTable().copy(hashMap);
        StreamWriteBuilder newStreamWriteBuilder = copy.newStreamWriteBuilder();
        StreamTableWrite newWrite = newStreamWriteBuilder.newWrite();
        StreamTableCommit newCommit = newStreamWriteBuilder.newCommit();
        newWrite.write(rowData(1, 10, 100L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        ReadBuilder newReadBuilder = copy.newReadBuilder();
        Assertions.assertThat(getResult(newReadBuilder.newRead(), newReadBuilder.newScan().plan().splits(), BATCH_ROW_TO_STRING)).containsExactly(new String[]{"1|10|100|binary|varbinary|mapKey:mapVal|multiset"});
        newWrite.write(rowData(1, 10, 200L));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        newWrite.compact(binaryRow(1), 0, true);
        newCommit.commit(2L, newWrite.prepareCommit(true, 2L));
        newWrite.write(rowData(1, 10, 300L));
        newWrite.compact(binaryRow(1), 0, true);
        newCommit.commit(3L, newWrite.prepareCommit(true, 3L));
        newWrite.close();
        Assertions.assertThat(getResult(newReadBuilder.newRead(), newReadBuilder.newScan().plan().splits(), BATCH_ROW_TO_STRING)).containsExactly(new String[]{"1|10|200|binary|varbinary|mapKey:mapVal|multiset"});
    }

    @Override // org.apache.paimon.table.FileStoreTableTestBase
    protected FileStoreTable createFileStoreTable(Consumer<Options> consumer) throws Exception {
        return createFileStoreTable(consumer, ROW_TYPE);
    }

    @Override // org.apache.paimon.table.FileStoreTableTestBase
    protected FileStoreTable overwriteTestFileStoreTable() throws Exception {
        Options options = new Options();
        options.set(CoreOptions.PATH, this.tablePath.toString());
        options.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
        return new ChangelogWithKeyFileStoreTable(FileIOFinder.find(this.tablePath), this.tablePath, SchemaUtils.forceCommit(new SchemaManager(LocalFileIO.create(), this.tablePath), new Schema(OVERWRITE_TEST_ROW_TYPE.getFields(), Arrays.asList("pt0", "pt1"), Arrays.asList("pk", "pt0", "pt1"), options.toMap(), "")));
    }

    private FileStoreTable createFileStoreTable(Consumer<Options> consumer, RowType rowType) throws Exception {
        Options options = new Options();
        options.set(CoreOptions.PATH, this.tablePath.toString());
        options.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
        consumer.accept(options);
        return new ChangelogWithKeyFileStoreTable(FileIOFinder.find(this.tablePath), this.tablePath, SchemaUtils.forceCommit(new SchemaManager(LocalFileIO.create(), this.tablePath), new Schema(rowType.getFields(), Collections.singletonList("pt"), Arrays.asList("pt", "a"), options.toMap(), "")));
    }
}
