package org.apache.flink.table.store.table;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.io.DataFilePathFactory;
import org.apache.flink.table.store.file.operation.ScanKind;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.types.RowKind;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.class */
public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBase {
    @Test
    public void testBatchReadWrite() throws Exception {
        writeData();
        FileStoreTable createFileStoreTable = createFileStoreTable();
        List<Split> splits = createFileStoreTable.newScan().plan().splits();
        TableRead newRead = createFileStoreTable.newRead();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEqualTo(Arrays.asList("1|11|101|binary|varbinary|mapKey:mapVal|multiset", "1|11|101|binary|varbinary|mapKey:mapVal|multiset", "1|12|102|binary|varbinary|mapKey:mapVal|multiset"));
        Assertions.assertThat(getResult(newRead, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING)).isEqualTo(Arrays.asList("2|20|200|binary|varbinary|mapKey:mapVal|multiset", "2|21|201|binary|varbinary|mapKey:mapVal|multiset", "2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testBatchProjection() throws Exception {
        writeData();
        FileStoreTable createFileStoreTable = createFileStoreTable();
        List<Split> splits = createFileStoreTable.newScan().plan().splits();
        TableRead withProjection = createFileStoreTable.newRead().withProjection(PROJECTION);
        Assertions.assertThat(getResult(withProjection, splits, binaryRow(1), 0, BATCH_PROJECTED_ROW_TO_STRING)).isEqualTo(Arrays.asList("101|11", "101|11", "102|12"));
        Assertions.assertThat(getResult(withProjection, splits, binaryRow(2), 0, BATCH_PROJECTED_ROW_TO_STRING)).isEqualTo(Arrays.asList("200|20", "201|21", "202|22"));
    }

    @Test
    public void testBatchFilter() throws Exception {
        writeData();
        FileStoreTable createFileStoreTable = createFileStoreTable();
        List<Split> splits = createFileStoreTable.newScan().withFilter(new PredicateBuilder(createFileStoreTable.schema().logicalRowType()).equal(2, 201L)).plan().splits();
        TableRead newRead = createFileStoreTable.newRead();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEmpty();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING)).isEqualTo(Arrays.asList("2|21|201|binary|varbinary|mapKey:mapVal|multiset", "2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testStreamingReadWrite() throws Exception {
        writeData();
        FileStoreTable createFileStoreTable = createFileStoreTable();
        List<Split> splits = createFileStoreTable.newScan().withKind(ScanKind.DELTA).plan().splits();
        TableRead newRead = createFileStoreTable.newRead();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING)).isEqualTo(Arrays.asList("-1|10|100|binary|varbinary|mapKey:mapVal|multiset", "+1|11|101|binary|varbinary|mapKey:mapVal|multiset"));
        Assertions.assertThat(getResult(newRead, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING)).isEqualTo(Arrays.asList("-2|21|201|binary|varbinary|mapKey:mapVal|multiset", "-2|21|201|binary|varbinary|mapKey:mapVal|multiset", "+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
    }

    @Test
    public void testStreamingProjection() throws Exception {
        writeData();
        FileStoreTable createFileStoreTable = createFileStoreTable();
        List<Split> splits = createFileStoreTable.newScan().withKind(ScanKind.DELTA).plan().splits();
        TableRead withProjection = createFileStoreTable.newRead().withProjection(PROJECTION);
        Assertions.assertThat(getResult(withProjection, splits, binaryRow(1), 0, STREAMING_PROJECTED_ROW_TO_STRING)).isEqualTo(Arrays.asList("-100|10", "+101|11"));
        Assertions.assertThat(getResult(withProjection, splits, binaryRow(2), 0, STREAMING_PROJECTED_ROW_TO_STRING)).isEqualTo(Arrays.asList("-201|21", "-201|21", "+202|22"));
    }

    @Test
    public void testStreamingFilter() throws Exception {
        writeData();
        FileStoreTable createFileStoreTable = createFileStoreTable();
        List<Split> splits = createFileStoreTable.newScan().withKind(ScanKind.DELTA).withFilter(new PredicateBuilder(createFileStoreTable.schema().logicalRowType()).equal(2, 201L)).plan().splits();
        TableRead newRead = createFileStoreTable.newRead();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING)).isEmpty();
        Assertions.assertThat(getResult(newRead, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING)).isEqualTo(Arrays.asList("-2|21|201|binary|varbinary|mapKey:mapVal|multiset", "-2|21|201|binary|varbinary|mapKey:mapVal|multiset", "+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
    }

    private void writeData() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 100L));
        newWrite.write(rowData(2, 20, 200L));
        newWrite.write(rowData(1, 11, 101L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowData(2, 21, 201L));
        newWrite.write(rowData(1, 12, 102L));
        newWrite.write(rowData(2, 21, 201L));
        newWrite.write(rowData(2, 21, 201L));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        newWrite.write(rowData(1, 11, 101L));
        newWrite.write(rowData(2, 22, 202L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 2, 21, 201L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 2, 21, 201L));
        newCommit.commit(2L, newWrite.prepareCommit(true, 2L));
        newWrite.close();
    }

    @Test
    public void testChangelogWithoutDataFile() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 100L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.close();
        Assertions.assertThat(createFileStoreTable.newScan().withKind(ScanKind.DELTA).plan().splits()).isEmpty();
        Path bucketPath = DataFilePathFactory.bucketPath(createFileStoreTable.location(), "1", 0);
        Assertions.assertThat(bucketPath.getFileSystem().listStatus(bucketPath)).isNullOrEmpty();
    }

    @Override // org.apache.flink.table.store.table.FileStoreTableTestBase
    protected FileStoreTable createFileStoreTable(Consumer<Configuration> consumer) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CoreOptions.PATH, this.tablePath.toString());
        configuration.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
        consumer.accept(configuration);
        return new ChangelogValueCountFileStoreTable(this.tablePath, new SchemaManager(this.tablePath).commitNewVersion(new UpdateSchema(ROW_TYPE, Collections.singletonList("pt"), Collections.emptyList(), configuration.toMap(), "")));
    }
}
