package org.apache.paimon.flink;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.kafka.KafkaTableTestBase;
import org.apache.paimon.flink.util.ReadWriteTableTestUtil;
import org.apache.paimon.utils.BlockingIterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/StreamingReadWriteTableWithKafkaLogITCase.class */
public class StreamingReadWriteTableWithKafkaLogITCase extends KafkaTableTestBase {
    @BeforeEach
    public void setUp() {
        ReadWriteTableTestUtil.init(createAndRegisterTempFile("").toString());
    }

    @Test
    public void testReadWriteWithPartitionedRecordsWithPk() throws Exception {
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L, "2022-01-02"})), "dt:2022-01-01;dt:2022-01-02", false, "I,UA,D");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), false);
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable, createTableWithKafkaLog);
        ReadWriteTableTestUtil.checkFileStorePath(createTableWithKafkaLog, Arrays.asList("dt=2022-01-01", "dt=2022-01-02"));
        BlockingIterator<Row, Row> testStreamingRead = ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "*", "WHERE dt >= '2022-01-01' AND dt <= '2022-01-03' OR currency = 'HK Dollar'"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"})));
        ReadWriteTableTestUtil.insertIntoPartition(createTableWithKafkaLog, "PARTITION (dt = '2022-01-03')", "('HK Dollar', 100)", "('Yen', 20)");
        ReadWriteTableTestUtil.insertIntoPartition(createTableWithKafkaLog, "PARTITION (dt = '2022-01-04')", "('Yen', 20)");
        ReadWriteTableTestUtil.validateStreamingReadResult(testStreamingRead, Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", 100L, "2022-01-03"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 20L, "2022-01-03"})));
        ReadWriteTableTestUtil.insertOverwritePartition(createTableWithKafkaLog, "PARTITION (dt = '2022-01-02')", "('Euro', 100)", "('Yen', 1)");
        ReadWriteTableTestUtil.assertNoMoreRecords(testStreamingRead);
        testStreamingRead.close();
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(createTableWithKafkaLog), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 100L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", 100L, "2022-01-03"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 20L, "2022-01-03"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 20L, "2022-01-04"})));
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "*", "WHERE dt = '2022-01-01'"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "*", "WHERE currency = 'US Dollar'"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "*", "WHERE dt = '2022-01-01' AND rate = 1"), Collections.emptyList()).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "rate, dt, currency", "WHERE dt = '2022-01-02' AND currency = 'Euro'"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{100L, "2022-01-02", "Euro"}))).close();
    }

    @Test
    public void testReadWriteWithPartitionedRecordsWithoutPk() throws Exception {
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Collections.emptyList(), Collections.singletonList("dt"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-02"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 115L, "2022-01-02"})), "dt:2022-01-01;dt:2022-01-02", false, "I,UA,UB,D");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Collections.emptyList(), Collections.singletonList("dt"), false);
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable, createTableWithKafkaLog);
        ReadWriteTableTestUtil.checkFileStorePath(createTableWithKafkaLog, Arrays.asList("dt=2022-01-01", "dt=2022-01-02"));
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildSimpleQuery(createTableWithKafkaLog), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 115L, "2022-01-02"}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "*", "WHERE dt IS NOT NULL"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 115L, "2022-01-02"}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "*", "WHERE dt IS NULL"), Collections.emptyList()).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "*", "WHERE currency = 'US Dollar' OR rate = 115"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 115L, "2022-01-02"}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "*", "WHERE (dt = '2022-01-02' AND currency = 'US Dollar') OR (dt = '2022-01-01' AND rate = 115)"), Collections.emptyList()).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "rate", ""), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{115L}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "rate", "WHERE dt <> '2022-01-01'"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{115L}))).close();
    }

    @Test
    public void testSReadWriteWithNonPartitionedRecordsWithPk() throws Exception {
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L})), null, false, "I, UA, D");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), false);
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable, createTableWithKafkaLog);
        ReadWriteTableTestUtil.checkFileStorePath(createTableWithKafkaLog, Collections.emptyList());
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildSimpleQuery(createTableWithKafkaLog), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "*", "WHERE currency = 'Yen'"), Collections.emptyList()).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "currency", ""), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "currency", "WHERE rate = 102"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"}))).close();
    }

    @Test
    public void testReadWriteWithNonPartitionedRecordsWithoutPk() throws Exception {
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("+I", new Object[]{null, 100L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", null})), null, false, "I,UA,UB,D");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), false);
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable, createTableWithKafkaLog);
        ReadWriteTableTestUtil.checkFileStorePath(createTableWithKafkaLog, Collections.emptyList());
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildSimpleQuery(createTableWithKafkaLog), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("+I", new Object[]{null, 100L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", null}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "*", "WHERE currency IS NOT NULL"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", null}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "*", "WHERE rate IS NOT NULL"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("+I", new Object[]{null, 100L}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "rate", "WHERE currency IS NOT NULL AND rate IS NOT NULL"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{119L}))).close();
    }

    @Test
    public void testReadLatestChangelogOfPartitionedRecordsWithPk() throws Exception {
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L, "2022-01-02"})), "dt:2022-01-01;dt:2022-01-02", false, "I,UA,D");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), true);
        BlockingIterator<Row, Row> testStreamingReadWithReadFirst = ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog, "*", "", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"})));
        ReadWriteTableTestUtil.insertInto(createTableWithKafkaLog, "('US Dollar', 104, '2022-01-01')", "('Euro', 100, '2022-01-02')");
        ReadWriteTableTestUtil.validateStreamingReadResult(testStreamingReadWithReadFirst, Arrays.asList(TestValuesTableFactory.changelogRow("-U", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"US Dollar", 104L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 100L, "2022-01-02"})));
        ReadWriteTableTestUtil.assertNoMoreRecords(testStreamingReadWithReadFirst);
        testStreamingReadWithReadFirst.close();
        String createTableWithKafkaLog2 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog2, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog2, "*", "WHERE dt = '2022-01-01'", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01"}))).close();
        String createTableWithKafkaLog3 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog3, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog3, "*", "WHERE currency = 'Yen'", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}))).close();
        String createTableWithKafkaLog4 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog4, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog4, "*", "WHERE rate = 114", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L, "2022-01-01"}))).close();
        String createTableWithKafkaLog5 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog5, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog5, "*", "WHERE rate = 114 AND dt = '2022-01-02'", ReadWriteTableTestUtil.SCAN_LATEST), Collections.emptyList()).close();
        String createTableWithKafkaLog6 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog6, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog6, "rate", "", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{1L}), TestValuesTableFactory.changelogRow("-U", new Object[]{114L}), TestValuesTableFactory.changelogRow("+U", new Object[]{116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{1L}), TestValuesTableFactory.changelogRow("-D", new Object[]{116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{119L}))).close();
        String createTableWithKafkaLog7 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog7, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog7, "rate", "WHERE dt = '2022-01-02'", ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{119L}))).close();
    }

    @Test
    public void testReadLatestChangelogOfPartitionedRecordsWithoutPk() throws Exception {
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Collections.emptyList(), Collections.singletonList("dt"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-02"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 115L, "2022-01-02"})), "dt:2022-01-01;dt:2022-01-02", false, "I,UA,UB,D");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Collections.emptyList(), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog, "*", "", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-02"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 114L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 115L, "2022-01-02"}))).close();
        String createTableWithKafkaLog2 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Collections.emptyList(), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog2, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog2, "*", "WHERE dt = '2022-01-01'", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}))).close();
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog2, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog2, "*", "WHERE currency = 'US Dollar' OR rate = 1", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}))).close();
        String createTableWithKafkaLog3 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Collections.emptyList(), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog3, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog3, "*", "WHERE dt = '2022-01-02' AND rate = 114", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-02"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 114L, "2022-01-02"}))).close();
        String createTableWithKafkaLog4 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Collections.emptyList(), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog4, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog4, "rate", "", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{1L}), TestValuesTableFactory.changelogRow("-D", new Object[]{1L}), TestValuesTableFactory.changelogRow("+I", new Object[]{114L}), TestValuesTableFactory.changelogRow("-D", new Object[]{114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{119L}), TestValuesTableFactory.changelogRow("-D", new Object[]{119L}), TestValuesTableFactory.changelogRow("+I", new Object[]{115L}))).close();
        String createTableWithKafkaLog5 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Collections.emptyList(), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog5, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog5, "rate", "WHERE dt <> '2022-01-01'", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{114L}), TestValuesTableFactory.changelogRow("-D", new Object[]{114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{119L}), TestValuesTableFactory.changelogRow("-D", new Object[]{119L}), TestValuesTableFactory.changelogRow("+I", new Object[]{115L}))).close();
    }

    @Test
    public void testReadLatestChangelogOfNonPartitionedRecordsWithPk() throws Exception {
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L})), null, false, "I,UA,D");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog, "*", "", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L}))).close();
        String createTableWithKafkaLog2 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog2, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog2, "*", "WHERE currency = 'Euro'", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}))).close();
        String createTableWithKafkaLog3 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog3, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog3, "currency", "", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen"}))).close();
        String createTableWithKafkaLog4 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog4, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog4, "rate", "WHERE currency = 'Euro'", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{114L}), TestValuesTableFactory.changelogRow("-U", new Object[]{114L}), TestValuesTableFactory.changelogRow("+U", new Object[]{116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{119L}))).close();
    }

    @Test
    public void testReadLatestChangelogOfNonPartitionedRecordsWithoutPk() throws Exception {
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("+I", new Object[]{null, 100L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", null})), null, false, "I,UA,UB,D");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog, "*", "", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("+I", new Object[]{null, 100L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", null}))).close();
        String createTableWithKafkaLog2 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog2, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog2, "*", "WHERE currency = 'Euro'", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}))).close();
        String createTableWithKafkaLog3 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog3, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog3, "currency, rate", "", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("+I", new Object[]{null, 100L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", null}))).close();
        String createTableWithKafkaLog4 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog4, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog4, "currency", "WHERE currency IS NOT NULL", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar"}))).close();
        String createTableWithKafkaLog5 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog5, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog5, "currency", "WHERE rate = 119", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}))).close();
    }

    @Test
    public void testReadLatestChangelogOfInsertOnlyRecords() throws Exception {
        List asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}));
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), asList, null, true, "I");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog, "*", "", ReadWriteTableTestUtil.SCAN_LATEST), asList).close();
        String createTemporaryTable2 = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), asList, null, true, "I");
        String createTableWithKafkaLog2 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable2, createTableWithKafkaLog2, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog2, "*", "", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L}))).close();
        String createTableWithKafkaLog3 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable2, createTableWithKafkaLog3, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog3, "*", "WHERE rate = 114", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L}))).close();
        String createTableWithKafkaLog4 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable2, createTableWithKafkaLog4, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog4, "rate", "", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{1L}), TestValuesTableFactory.changelogRow("-U", new Object[]{114L}), TestValuesTableFactory.changelogRow("+U", new Object[]{119L}))).close();
        String createTableWithKafkaLog5 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable2, createTableWithKafkaLog5, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog5, "currency", "WHERE rate = 114", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro"}))).close();
    }

    @Test
    public void testReadInsertOnlyChangelogFromTimestamp() throws Exception {
        List asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"}));
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Collections.emptyList(), Collections.singletonList("dt"), asList, "dt:2022-01-01;dt:2022-01-02", true, "I");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Collections.emptyList(), Collections.singletonList("dt"), false);
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable, createTableWithKafkaLog);
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog, "*", "", scanFromTimeStampMillis(0L)), asList).close();
        String createTemporaryTable2 = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), asList, "dt:2022-01-01;dt:2022-01-02", true, "I");
        String createTableWithKafkaLog2 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), false);
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable2, createTableWithKafkaLog2);
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog2, "*", "", scanFromTimeStampMillis(0L)), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"}))).close();
        List asList2 = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}));
        String createTemporaryTable3 = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), asList2, null, true, "I");
        String createTableWithKafkaLog3 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), false);
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable3, createTableWithKafkaLog3);
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog3, "*", "", scanFromTimeStampMillis(0L)), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L}))).close();
        String createTemporaryTable4 = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), asList2, null, true, "I");
        String createTableWithKafkaLog4 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), false);
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable4, createTableWithKafkaLog4);
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog4, "*", "", scanFromTimeStampMillis(0L)), asList2).close();
    }

    @Test
    public void testReadInsertOnlyChangelogFromEnormousTimestamp() throws Exception {
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L})), null, true, "I");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), false);
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable, createTableWithKafkaLog);
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog, "*", "", scanFromTimeStampMillis(9223372036854775806L)), Collections.emptyList()).close();
    }

    @Test
    public void testReadRetractChangelogFromTimestamp() throws Exception {
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Collections.emptyList(), Collections.singletonList("dt"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-02"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 115L, "2022-01-02"})), "dt:2022-01-01;dt:2022-01-02", false, "I,UA,UB,D");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Collections.emptyList(), Collections.singletonList("dt"), false);
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable, createTableWithKafkaLog);
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog, "*", "", scanFromTimeStampMillis(0L)), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-02"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 114L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 115L, "2022-01-02"}))).close();
        String createTemporaryTable2 = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L, "2022-01-02"})), "dt:2022-01-01;dt:2022-01-02", false, "I,UA,D");
        String createTableWithKafkaLog2 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), false);
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable2, createTableWithKafkaLog2);
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog2, "*", "", scanFromTimeStampMillis(0L)), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"}))).close();
    }

    private Map<String, String> scanFromTimeStampMillis(final Long l) {
        return new HashMap<String, String>() { // from class: org.apache.paimon.flink.StreamingReadWriteTableWithKafkaLogITCase.1
            {
                put(CoreOptions.SCAN_MODE.key(), CoreOptions.StartupMode.FROM_TIMESTAMP.toString());
                put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), String.valueOf(l));
            }
        };
    }
}
