package org.apache.flink.table.store.connector;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/store/connector/CompositePkAndMultiPartitionedTableITCase.class */
public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTestBase {
    @Test
    public void testBatchWriteWithMultiPartitionedRecordsWithMultiPk() throws Exception {
        List<Row> asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar", Double.valueOf(1.11d), "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.13d), "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.74d), "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d), "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar", Double.valueOf(1.12d), "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.129d), "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.741d), "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.00812d), "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.918d), "2022-01-02", "23"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d), "2022-01-02", "23"}));
        String collectAndCheckBatchReadWrite = collectAndCheckBatchReadWrite(Arrays.asList("dt", "hh"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), null, Collections.emptyList(), asList);
        checkFileStorePath(this.tEnv, collectAndCheckBatchReadWrite, (String) ReadWriteTableTestUtil.hourlyExchangeRates().f1);
        StreamTableEnvironment create = StreamTableEnvironment.create(buildStreamEnv());
        registerTable(create, collectAndCheckBatchReadWrite);
        BlockingIterator<Row, Row> collectAndCheck = collectAndCheck(create, collectAndCheckBatchReadWrite, Collections.emptyMap(), null, asList);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("dt", "'2022-01-02'");
        linkedHashMap.put("hh", "'23'");
        prepareEnvAndOverwrite(collectAndCheckBatchReadWrite, linkedHashMap, Collections.singletonList(new String[]{"'US Dollar'", "'Thai Baht'", "33.51"}));
        assertNoMoreRecords(collectAndCheck);
        collectAndCheck(this.tEnv, collectAndCheckBatchReadWrite, Collections.emptyMap(), "dt = '2022-01-02' AND hh = '23'", Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Thai Baht", Double.valueOf(33.51d), "2022-01-02", "23"}))).close();
        collectAndCheckBatchReadWrite(Arrays.asList("dt", "hh"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), "dt = '2022-01-01'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar", Double.valueOf(1.11d), "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.13d), "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.74d), "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d), "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar", Double.valueOf(1.12d), "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.129d), "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.741d), "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.00812d), "2022-01-01", "12"})));
        collectAndCheckBatchReadWrite(Arrays.asList("dt", "hh"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), "dt = '2022-01-01' AND hh = '12'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar", Double.valueOf(1.12d), "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.129d), "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.741d), "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.00812d), "2022-01-01", "12"})));
        collectAndCheckBatchReadWrite(Arrays.asList("dt", "hh"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), "to_currency = 'Euro'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.918d), "2022-01-02", "23"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d), "2022-01-02", "23"})));
        collectAndCheckBatchReadWrite(Arrays.asList("dt", "hh"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), "from_currency = 'HK Dollar'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.13d), "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.129d), "2022-01-01", "12"})));
        collectAndCheckBatchReadWrite(Arrays.asList("dt", "hh"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), "rate_by_to_currency > 0.5", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar", Double.valueOf(1.11d), "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.74d), "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar", Double.valueOf(1.12d), "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.741d), "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.918d), "2022-01-02", "23"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d), "2022-01-02", "23"})));
        collectAndCheckBatchReadWrite(Arrays.asList("dt", "hh"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), "rate_by_to_currency > 0.9 AND hh = '23'", Collections.emptyList(), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.918d), "2022-01-02", "23"})));
        collectAndCheckBatchReadWrite(Arrays.asList("dt", "hh"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), null, Arrays.asList("from_currency", "dt", "hh"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "2022-01-02", "23"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "2022-01-02", "23"})));
        collectAndCheckBatchReadWrite(Arrays.asList("dt", "hh"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), "dt = '2022-01-01' AND hh >= '12' OR rate_by_to_currency > 2", Arrays.asList("from_currency", "to_currency"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar"})));
    }

    @Test
    public void testBatchWriteWithSinglePartitionedRecordsWithMultiPk() throws Exception {
        List<Row> asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar", Double.valueOf(1.11d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.13d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0082d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.74d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "Yen", Double.valueOf(1.0d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "Yen", Double.valueOf(19.25d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(122.46d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-02"}));
        String collectAndCheckBatchReadWrite = collectAndCheckBatchReadWrite(Collections.singletonList("dt"), Arrays.asList("from_currency", "to_currency", "dt"), null, Collections.emptyList(), asList);
        checkFileStorePath(this.tEnv, collectAndCheckBatchReadWrite, (String) ReadWriteTableTestUtil.dailyExchangeRates().f1);
        StreamTableEnvironment create = StreamTableEnvironment.create(buildStreamEnv());
        registerTable(create, collectAndCheckBatchReadWrite);
        BlockingIterator<Row, Row> collectAndCheck = collectAndCheck(create, collectAndCheckBatchReadWrite, Collections.emptyMap(), null, asList);
        prepareEnvAndOverwrite(collectAndCheckBatchReadWrite, Collections.emptyMap(), Collections.singletonList(new String[]{"'US Dollar'", "'Thai Baht'", "33.51", "'2022-01-01'"}));
        assertNoMoreRecords(collectAndCheck);
        collectAndCheck(this.tEnv, collectAndCheckBatchReadWrite, Collections.emptyMap(), "dt = '2022-01-01'", Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Thai Baht", Double.valueOf(33.51d), "2022-01-01"}))).close();
        collectAndCheckBatchReadWrite(Collections.singletonList("dt"), Arrays.asList("from_currency", "to_currency", "dt"), "dt = '2022-01-02'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-02"})));
        collectAndCheckBatchReadWrite(Collections.singletonList("dt"), Arrays.asList("from_currency", "to_currency", "dt"), "rate_by_to_currency < 0.1", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0082d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d), "2022-01-02"})));
        collectAndCheckBatchReadWrite(Collections.singletonList("dt"), Arrays.asList("from_currency", "to_currency", "dt"), "rate_by_to_currency > 0.9 OR dt = '2022-01-02'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar", Double.valueOf(1.11d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "Yen", Double.valueOf(1.0d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "Yen", Double.valueOf(19.25d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(122.46d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-02"})));
        collectAndCheckBatchReadWrite(Collections.singletonList("dt"), Arrays.asList("from_currency", "to_currency", "dt"), null, Arrays.asList("rate_by_to_currency", "dt"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(1.0d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(1.11d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(0.13d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(0.0082d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(0.74d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(0.9d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(0.67d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(1.0d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(19.25d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(122.46d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(0.0081d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(1.0d), "2022-01-02"})));
        collectAndCheckBatchReadWrite(Collections.singletonList("dt"), Arrays.asList("from_currency", "to_currency", "dt"), "dt = '2022-01-02' OR rate_by_to_currency > 100", Arrays.asList("from_currency", "to_currency"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Yen"})));
    }

    @Test
    public void testBatchWriteWithNonPartitionedRecordsWithMultiPk() throws Exception {
        List<Row> asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar", Double.valueOf(1.11d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.13d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.74d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "Yen", Double.valueOf(1.0d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "Yen", Double.valueOf(19.25d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(122.46d)}));
        String collectAndCheckBatchReadWrite = collectAndCheckBatchReadWrite(Collections.emptyList(), Arrays.asList("from_currency", "to_currency"), null, Collections.emptyList(), asList);
        checkFileStorePath(this.tEnv, collectAndCheckBatchReadWrite, null);
        StreamTableEnvironment create = StreamTableEnvironment.create(buildStreamEnv());
        registerTable(create, collectAndCheckBatchReadWrite);
        BlockingIterator<Row, Row> collectAndCheck = collectAndCheck(create, collectAndCheckBatchReadWrite, Collections.emptyMap(), null, asList);
        prepareEnvAndOverwrite(collectAndCheckBatchReadWrite, Collections.emptyMap(), Collections.singletonList(new String[]{"'US Dollar'", "'Thai Baht'", "33.51"}));
        assertNoMoreRecords(collectAndCheck);
        collectAndCheck(this.tEnv, collectAndCheckBatchReadWrite, Collections.emptyMap(), null, Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Thai Baht", Double.valueOf(33.51d)}))).close();
        collectAndCheckBatchReadWrite(Collections.emptyList(), Arrays.asList("from_currency", "to_currency"), "rate_by_to_currency < 0.1", Collections.emptyList(), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d)})));
        collectAndCheckBatchReadWrite(Collections.emptyList(), Arrays.asList("from_currency", "to_currency"), null, Collections.singletonList("rate_by_to_currency"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(1.11d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(0.13d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(0.74d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(0.0081d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(1.0d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(0.9d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(0.67d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(1.0d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(19.25d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{Double.valueOf(122.46d)})));
        collectAndCheckBatchReadWrite(Collections.emptyList(), Arrays.asList("from_currency", "to_currency"), "rate_by_to_currency > 100", Arrays.asList("from_currency", "to_currency"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Yen"})));
    }

    @Test
    public void testBatchWriteMultiPartitionedRecordsWithOnePk() throws Exception {
        List<Row> asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01", "00"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01", "00"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 114L, "2022-01-01", "00"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 114L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02", "12"}));
        String collectAndCheckBatchReadWrite = collectAndCheckBatchReadWrite(Arrays.asList("dt", "hh"), Arrays.asList("currency", "dt", "hh"), null, Collections.emptyList(), asList);
        checkFileStorePath(this.tEnv, collectAndCheckBatchReadWrite, (String) ReadWriteTableTestUtil.hourlyRates().f1);
        StreamTableEnvironment create = StreamTableEnvironment.create(buildStreamEnv());
        registerTable(create, collectAndCheckBatchReadWrite);
        BlockingIterator<Row, Row> collectAndCheck = collectAndCheck(create, collectAndCheckBatchReadWrite, Collections.emptyMap(), null, asList);
        prepareEnvAndOverwrite(collectAndCheckBatchReadWrite, Collections.emptyMap(), Collections.singletonList(new String[]{"'HK Dollar'", "80", "'2022-01-01'", "'00'"}));
        prepareEnvAndOverwrite(collectAndCheckBatchReadWrite, Collections.singletonMap("dt", "'2022-01-02'"), Collections.singletonList(new String[]{"'Euro'", "120", "'12'"}));
        collectAndCheck(this.tEnv, collectAndCheckBatchReadWrite, Collections.emptyMap(), "hh = '00' OR hh = '12'", Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", 80L, "2022-01-01", "00"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 120L, "2022-01-02", "12"})));
        assertNoMoreRecords(collectAndCheck);
        collectAndCheckBatchReadWrite(Arrays.asList("dt", "hh"), Arrays.asList("currency", "dt", "hh"), "hh >= '10'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 114L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02", "12"})));
        collectAndCheckBatchReadWrite(Arrays.asList("dt", "hh"), Arrays.asList("currency", "dt", "hh"), "rate >= 119", Collections.emptyList(), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02", "12"})));
        collectAndCheckBatchReadWrite(Arrays.asList("dt", "hh"), Arrays.asList("currency", "dt", "hh"), null, Collections.singletonList("hh"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"00"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"00"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"00"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"12"})));
        collectAndCheckBatchReadWrite(Arrays.asList("dt", "hh"), Arrays.asList("currency", "dt", "hh"), "rate > 100 AND hh >= '20'", Collections.singletonList("rate"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{114L})));
    }

    @Test
    public void testBatchWriteMultiPartitionedRecordsWithoutPk() throws Exception {
        String collectAndCheckBatchReadWrite = collectAndCheckBatchReadWrite(Arrays.asList("dt", "hh"), Collections.emptyList(), null, Collections.emptyList(), (List) ReadWriteTableTestUtil.hourlyRates().f0);
        checkFileStorePath(this.tEnv, collectAndCheckBatchReadWrite, (String) ReadWriteTableTestUtil.hourlyRates().f1);
        StreamTableEnvironment create = StreamTableEnvironment.create(buildStreamEnv());
        registerTable(create, collectAndCheckBatchReadWrite);
        BlockingIterator<Row, Row> collectAndCheck = collectAndCheck(create, collectAndCheckBatchReadWrite, Collections.emptyMap(), null, (List) ReadWriteTableTestUtil.hourlyRates().f0);
        prepareEnvAndOverwrite(collectAndCheckBatchReadWrite, String.format("INSERT OVERWRITE `%s` SELECT 'Yen', 1, CAST(null AS STRING), CAST(null AS STRING) FROM `%s`", collectAndCheckBatchReadWrite, collectAndCheckBatchReadWrite));
        checkFileStorePath(this.tEnv, collectAndCheckBatchReadWrite, "dt:null,hh:null");
        collectAndCheck(this.tEnv, collectAndCheckBatchReadWrite, Collections.emptyMap(), null, Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, null, null})));
        assertNoMoreRecords(collectAndCheck);
        collectAndCheckBatchReadWrite(Arrays.asList("dt", "hh"), Collections.emptyList(), "hh >= '10'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 114L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02", "12"})));
        collectAndCheckBatchReadWrite(Arrays.asList("dt", "hh"), Collections.emptyList(), "rate >= 119", Collections.emptyList(), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02", "12"})));
        collectAndCheckBatchReadWrite(Arrays.asList("dt", "hh"), Collections.emptyList(), null, Collections.singletonList("currency"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"})));
        collectAndCheckBatchReadWrite(Arrays.asList("dt", "hh"), Collections.emptyList(), "rate > 100 AND hh >= '20'", Collections.singletonList("rate"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{114L})));
    }

    @Test
    public void testEnableLogAndStreamingReadWriteSinglePartitionedRecordsWithMultiPk() throws Exception {
        Tuple2<String, BlockingIterator<Row, Row>> collectAndCheckStreamingReadWriteWithoutClose = collectAndCheckStreamingReadWriteWithoutClose(Collections.singletonList("dt"), Arrays.asList("from_currency", "to_currency", "dt"), Collections.emptyMap(), null, Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.13d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.74d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar", Double.valueOf(1.11d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "Yen", Double.valueOf(1.0d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "Yen", Double.valueOf(19.25d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(122.46d), "2022-01-02"})));
        String str = (String) collectAndCheckStreamingReadWriteWithoutClose.f0;
        checkFileStorePath(this.tEnv, str, (String) ReadWriteTableTestUtil.dailyRatesChangelogWithoutUB().f1);
        BlockingIterator blockingIterator = (BlockingIterator) collectAndCheckStreamingReadWriteWithoutClose.f1;
        this.tEnv.executeSql(String.format("INSERT INTO `%s` PARTITION (dt = '2022-01-03')\nVALUES('Chinese Yuan', 'HK Dollar', 1.231)\n", str)).await();
        Assertions.assertThat(blockingIterator.collect(1, 5L, TimeUnit.SECONDS)).containsExactlyInAnyOrderElementsOf(Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "HK Dollar", Double.valueOf(1.231d), "2022-01-03"})));
        prepareEnvAndOverwrite(str, String.format("INSERT OVERWRITE `%s` SELECT 'US Dollar', 'US Dollar', 1, '2022-04-02' FROM `%s`", str, str));
        checkFileStorePath(this.tEnv, str, "dt:2022-04-02");
        StreamTableEnvironment create = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
        registerTable(create, str);
        collectAndCheck(create, str, Collections.emptyMap(), null, Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-04-02"})));
        assertNoMoreRecords(blockingIterator);
        collectAndCheckStreamingReadWriteWithClose(true, Collections.singletonList("dt"), Arrays.asList("from_currency", "to_currency", "dt"), Collections.emptyMap(), "dt = '2022-01-02' AND from_currency = 'US Dollar'", Collections.emptyList(), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d), "2022-01-02"})));
        collectAndCheckStreamingReadWriteWithClose(true, Collections.singletonList("dt"), Arrays.asList("from_currency", "to_currency", "dt"), Collections.emptyMap(), "dt = '2022-01-01' AND rate_by_to_currency IS NULL", Arrays.asList("from_currency", "to_currency"), Collections.emptyList());
    }

    @Test
    public void testEnableLogAndStreamingReadWriteMultiPartitionedRecordsWithMultiPk() throws Exception {
        Tuple2<String, BlockingIterator<Row, Row>> collectAndCheckStreamingReadWriteWithoutClose = collectAndCheckStreamingReadWriteWithoutClose(Arrays.asList("dt", "hh"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Collections.emptyMap(), null, Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.13d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.76d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro", null, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "Yen", Double.valueOf(1.0d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "Yen", Double.valueOf(25.6d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Yen", Double.valueOf(122.46d), "2022-01-02", "21"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(90.1d), "2022-01-02", "20"})));
        String str = (String) collectAndCheckStreamingReadWriteWithoutClose.f0;
        checkFileStorePath(this.tEnv, str, (String) ReadWriteTableTestUtil.hourlyExchangeRatesChangelogWithoutUB().f1);
        BlockingIterator blockingIterator = (BlockingIterator) collectAndCheckStreamingReadWriteWithoutClose.f1;
        this.tEnv.executeSql(String.format("INSERT INTO `%s` \nVALUES('Chinese Yuan', 'HK Dollar', 1.231, '2022-01-03', '15')\n", str)).await();
        Assertions.assertThat(blockingIterator.collect(1, 5L, TimeUnit.SECONDS)).containsExactlyInAnyOrderElementsOf(Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "HK Dollar", Double.valueOf(1.231d), "2022-01-03", "15"})));
        prepareEnvAndOverwrite(str, String.format("INSERT OVERWRITE `%s` SELECT 'US Dollar', 'US Dollar', 1, '2022-04-02', '10' FROM `%s`", str, str));
        checkFileStorePath(this.tEnv, str, "dt:2022-04-02,hh:10");
        StreamTableEnvironment create = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
        registerTable(create, str);
        collectAndCheck(create, str, Collections.emptyMap(), null, Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-04-02", "10"})));
        assertNoMoreRecords(blockingIterator);
        collectAndCheckStreamingReadWriteWithClose(true, Arrays.asList("dt", "hh"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Collections.emptyMap(), "dt = '2022-01-02' AND from_currency = 'US Dollar'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Yen", Double.valueOf(122.46d), "2022-01-02", "21"})));
        collectAndCheckStreamingReadWriteWithClose(true, Arrays.asList("dt", "hh"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Collections.emptyMap(), "dt = '2022-01-02' AND from_currency = 'US Dollar'", Arrays.asList("from_currency", "to_currency"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Yen"})));
    }

    @Test
    public void testEnableLogAndStreamingReadWriteMultiPartitionedRecordsWithoutPk() throws Exception {
        Tuple2<String, BlockingIterator<Row, Row>> collectAndCheckStreamingReadWriteWithoutClose = collectAndCheckStreamingReadWriteWithoutClose(Collections.singletonList("dt"), Collections.emptyList(), Collections.emptyMap(), null, Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 115L, "2022-01-02"})));
        String str = (String) collectAndCheckStreamingReadWriteWithoutClose.f0;
        checkFileStorePath(this.tEnv, str, (String) ReadWriteTableTestUtil.dailyRatesChangelogWithUB().f1);
        BlockingIterator blockingIterator = (BlockingIterator) collectAndCheckStreamingReadWriteWithoutClose.f1;
        this.tEnv.executeSql(String.format("INSERT INTO `%s` PARTITION (dt = '2022-04-02')\nVALUES('Euro', 116)\n", str)).await();
        Assertions.assertThat(blockingIterator.collect(1, 5L, TimeUnit.SECONDS)).containsExactlyInAnyOrderElementsOf(Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L, "2022-04-02"})));
        prepareEnvAndOverwrite(str, String.format("INSERT OVERWRITE `%s` SELECT 'US Dollar', 103, '2022-04-02' FROM `%s`", str, str));
        checkFileStorePath(this.tEnv, str, "dt:2022-04-02");
        StreamTableEnvironment create = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
        registerTable(create, str);
        collectAndCheck(create, str, Collections.emptyMap(), null, Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 103L, "2022-04-02"})));
        assertNoMoreRecords(blockingIterator);
        collectAndCheckStreamingReadWriteWithClose(true, Collections.singletonList("dt"), Collections.emptyList(), Collections.emptyMap(), "dt = '2022-01-01' AND currency = 'Yen'", Collections.emptyList(), Collections.emptyList());
        collectAndCheckStreamingReadWriteWithClose(true, Collections.singletonList("dt"), Collections.emptyList(), Collections.emptyMap(), "dt = '2022-01-01' AND rate = 103", Collections.singletonList("currency"), Collections.emptyList());
    }

    @Test
    public void testReadLatestChangelogOfMultiPartitionedRecordsWithMultiPk() throws Exception {
        ArrayList arrayList = new ArrayList((Collection) ReadWriteTableTestUtil.hourlyExchangeRatesChangelogWithoutUB().f0);
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0082d), "2022-01-02", "20"}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.74d), "2022-01-02", "20"}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d), "2022-01-02", "20"}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Chinese Yuan", "Yen", Double.valueOf(19.25d), "2022-01-02", "20"}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(90.32d), "2022-01-02", "20"}));
        collectLatestLogAndCheck(false, Arrays.asList("dt", "hh"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), null, Collections.emptyList(), arrayList);
        collectLatestLogAndCheck(false, Arrays.asList("dt", "hh"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), "dt = '2022-01-02' AND hh = '21'", Collections.emptyList(), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Yen", Double.valueOf(122.46d), "2022-01-02", "21"})));
        collectLatestLogAndCheck(false, Arrays.asList("dt", "hh"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), "rate_by_to_currency IS NOT NULL AND from_currency = 'US Dollar'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Yen", Double.valueOf(122.46d), "2022-01-02", "21"})));
        collectLatestLogAndCheck(false, Arrays.asList("dt", "hh"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), "hh = '21' AND from_currency = 'US Dollar'", Collections.emptyList(), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Yen", Double.valueOf(122.46d), "2022-01-02", "21"})));
        collectLatestLogAndCheck(false, Arrays.asList("dt", "hh"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), null, Arrays.asList("from_currency", "to_currency"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"US Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "Yen"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "Yen"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Yen"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Yen"})));
        collectLatestLogAndCheck(false, Arrays.asList("dt", "hh"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), "rate_by_to_currency > 100", Arrays.asList("from_currency", "to_currency"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Yen"})));
    }

    @Test
    public void testReadLatestChangelogOfSinglePartitionedRecordsWithMultiPk() throws Exception {
        ArrayList arrayList = new ArrayList((Collection) ReadWriteTableTestUtil.dailyExchangeRatesChangelogWithoutUB().f0);
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", "US Dollar", null, "2022-01-01"}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"US Dollar", "US Dollar", null, "2022-01-01"}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(90.32d), "2022-01-02"}));
        collectLatestLogAndCheck(false, Collections.singletonList("dt"), Arrays.asList("from_currency", "to_currency", "dt"), null, Collections.emptyList(), arrayList);
        collectLatestLogAndCheck(false, Collections.singletonList("dt"), Arrays.asList("from_currency", "to_currency", "dt"), "dt = '2022-01-02'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d), "2022-01-02"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "Yen", Double.valueOf(1.0d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "Yen", Double.valueOf(19.25d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(90.32d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(122.46d), "2022-01-02"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(90.32d), "2022-01-02"})));
        collectLatestLogAndCheck(false, Collections.singletonList("dt"), Arrays.asList("from_currency", "to_currency", "dt"), "rate_by_to_currency IS NULL", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", null, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar", null, "2022-01-01"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", "US Dollar", null, "2022-01-01"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"US Dollar", "US Dollar", null, "2022-01-01"})));
        collectLatestLogAndCheck(false, Collections.singletonList("dt"), Arrays.asList("from_currency", "to_currency", "dt"), "dt = '2022-01-02' AND from_currency = 'Yen'", Collections.emptyList(), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "Yen", Double.valueOf(1.0d), "2022-01-02"})));
        collectLatestLogAndCheck(false, Collections.singletonList("dt"), Arrays.asList("from_currency", "to_currency", "dt"), "rate_by_to_currency > 100", Arrays.asList("from_currency", "to_currency"), Collections.singletonList(TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Yen"})));
    }

    @Test
    public void testReadLatestChangelogOfNonPartitionedRecordsWithMultiPk() throws Exception {
        ArrayList arrayList = new ArrayList(ReadWriteTableTestUtil.exchangeRatesChangelogWithoutUB());
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0082d)}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", "US Dollar", Double.valueOf(1.11d)}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d)}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(90.32d)}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(122.46d)}));
        collectLatestLogAndCheck(false, Collections.emptyList(), Arrays.asList("from_currency", "to_currency"), null, Collections.emptyList(), arrayList);
        collectLatestLogAndCheck(false, Collections.emptyList(), Arrays.asList("from_currency", "to_currency"), "rate_by_to_currency < 1 OR rate_by_to_currency > 100", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.13d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0082d)}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0082d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.74d)}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d)}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d)}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d)}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.69d)}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(122.46d)}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(122.46d)}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(122.0d)})));
        collectLatestLogAndCheck(false, Collections.emptyList(), Arrays.asList("from_currency", "to_currency"), "rate_by_to_currency < 1 OR rate_by_to_currency > 100", Arrays.asList("from_currency", "to_currency"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Yen", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Yen", "US Dollar"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Euro"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Euro"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Yen"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Yen"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Yen"})));
    }

    @Test
    public void testReadLatestChangelogOfMultiPartitionedRecordsWithOnePk() throws Exception {
        ArrayList arrayList = new ArrayList((Collection) ReadWriteTableTestUtil.hourlyRatesChangelogWithoutUB().f0);
        arrayList.remove(TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L, "2022-01-02", "23"}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L, "2022-01-01", "15"}));
        collectLatestLogAndCheck(false, Arrays.asList("dt", "hh"), Arrays.asList("currency", "dt", "hh"), null, Collections.emptyList(), arrayList);
        collectLatestLogAndCheck(false, Arrays.asList("dt", "hh"), Arrays.asList("currency", "dt", "hh"), "dt >= '2022-01-02'", Collections.emptyList(), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02", "23"})));
        collectLatestLogAndCheck(false, Arrays.asList("dt", "hh"), Arrays.asList("currency", "dt", "hh"), "rate = 1", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01", "15"})));
        collectLatestLogAndCheck(false, Arrays.asList("dt", "hh"), Arrays.asList("currency", "dt", "hh"), "rate = 1", Collections.singletonList("currency"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen"})));
    }

    @Test
    public void testReadLatestChangelogOfMultiPartitionedRecordsWithoutPk() throws Exception {
        collectLatestLogAndCheck(false, Arrays.asList("dt", "hh"), Collections.emptyList(), null, Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 114L, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 119L, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 115L, "2022-01-02", "20"})));
        collectLatestLogAndCheck(false, Arrays.asList("dt", "hh"), Collections.emptyList(), "hh <> '20'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01", "15"})));
        collectLatestLogAndCheck(false, Arrays.asList("dt", "hh"), Collections.emptyList(), "rate = 1", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01", "15"})));
        collectLatestLogAndCheck(false, Arrays.asList("dt", "hh"), Collections.emptyList(), "rate = 1", Collections.singletonList("currency"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen"})));
    }
}
