package org.apache.flink.table.store.connector;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.store.connector.sink.TableStoreSink;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/store/connector/ReadWriteTableITCase.class */
public class ReadWriteTableITCase extends ReadWriteTableTestBase {
    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void testBatchWriteWithPartitionedRecordsWithPk() throws Exception {
        List<Row> asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"}));
        String collectAndCheckBatchReadWrite = collectAndCheckBatchReadWrite(true, true, (String) null, Collections.emptyList(), asList);
        checkFileStorePath(this.tEnv, collectAndCheckBatchReadWrite, (String) ReadWriteTableTestUtil.dailyRates().f1);
        StreamTableEnvironment create = StreamTableEnvironment.create(buildStreamEnv());
        registerTable(create, collectAndCheckBatchReadWrite);
        BlockingIterator<Row, Row> collectAndCheck = collectAndCheck(create, collectAndCheckBatchReadWrite, Collections.emptyMap(), null, asList);
        prepareEnvAndOverwrite(collectAndCheckBatchReadWrite, Collections.singletonMap("dt", "'2022-01-02'"), Arrays.asList(new String[]{"'Euro'", "100"}, new String[]{"'Yen'", "1"}));
        assertNoMoreRecords(collectAndCheck);
        collectAndCheck(this.tEnv, collectAndCheckBatchReadWrite, Collections.emptyMap(), "dt IN ('2022-01-02')", Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 100L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-02"}))).close();
        List<Row> asList2 = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 114L, "2022-01-01"}));
        collectAndCheckBatchReadWrite(true, true, "dt <> '2022-01-02'", Collections.emptyList(), asList2);
        collectAndCheckBatchReadWrite(true, true, "dt IN ('2022-01-01')", Collections.emptyList(), asList2);
        collectAndCheckBatchReadWrite(true, true, "rate >= 100", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"})));
        collectAndCheckBatchReadWrite(true, true, "rate >= 100 AND dt = '2022-01-02'", Collections.emptyList(), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"})));
        collectAndCheckBatchReadWrite(true, true, (String) null, Collections.singletonList("dt"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"2022-01-02"})));
        collectAndCheckBatchReadWrite(true, true, (String) null, Collections.singletonList("dt, currency, rate"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"2022-01-01", "US Dollar", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"2022-01-01", "Yen", 1L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"2022-01-01", "Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"2022-01-02", "Euro", 119L})));
        collectAndCheckBatchReadWrite(true, true, "rate = 114", Collections.singletonList("rate"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{114L})));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void testBatchWriteWithPartitionedRecordsWithoutPk() throws Exception {
        String collectAndCheckBatchReadWrite = collectAndCheckBatchReadWrite(true, false, (String) null, Collections.emptyList(), (List<Row>) ReadWriteTableTestUtil.dailyRates().f0);
        checkFileStorePath(this.tEnv, collectAndCheckBatchReadWrite, (String) ReadWriteTableTestUtil.dailyRates().f1);
        prepareEnvAndOverwrite(collectAndCheckBatchReadWrite, Collections.emptyMap(), Arrays.asList(new String[]{"'Euro'", "90", "'2022-01-01'"}, new String[]{"'Yen'", "2", "'2022-01-02'"}));
        StreamTableEnvironment create = StreamTableEnvironment.create(buildStreamEnv());
        registerTable(create, collectAndCheckBatchReadWrite);
        collectAndCheck(create, collectAndCheckBatchReadWrite, Collections.emptyMap(), "dt IN ('2022-01-01', '2022-01-02')", Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 90L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 2L, "2022-01-02"}))).close();
        collectAndCheckBatchReadWrite(true, false, "dt >= '2022-01-01'", Collections.emptyList(), (List<Row>) ReadWriteTableTestUtil.dailyRates().f0);
        collectAndCheckBatchReadWrite(true, false, "currency = 'US Dollar'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 114L, "2022-01-01"})));
        collectAndCheckBatchReadWrite(true, false, "dt = '2022-01-01' OR rate > 115", Collections.emptyList(), (List<Row>) ReadWriteTableTestUtil.dailyRates().f0);
        collectAndCheckBatchReadWrite(true, false, (String) null, Collections.singletonList("currency"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"})));
        collectAndCheckBatchReadWrite(true, false, "rate = 119", Arrays.asList("currency", "dt"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "2022-01-02"})));
    }

    @Test
    public void testBatchWriteWithNonPartitionedRecordsWithPk() throws Exception {
        String collectAndCheckBatchReadWrite = collectAndCheckBatchReadWrite(false, true, (String) null, Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L})));
        checkFileStorePath(this.tEnv, collectAndCheckBatchReadWrite, null);
        prepareEnvAndOverwrite(collectAndCheckBatchReadWrite, Collections.emptyMap(), Collections.singletonList(new String[]{"'Euro'", "100"}));
        collectAndCheck(this.tEnv, collectAndCheckBatchReadWrite, Collections.emptyMap(), null, Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 100L})));
        collectAndCheckBatchReadWrite(false, true, "currency = 'Euro'", Collections.emptyList(), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L})));
        collectAndCheckBatchReadWrite(false, true, "119 >= rate AND 102 < rate", Collections.emptyList(), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L})));
        collectAndCheckBatchReadWrite(false, true, (String) null, Arrays.asList("rate", "currency"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{102L, "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1L, "Yen"}), TestValuesTableFactory.changelogRow("+I", new Object[]{119L, "Euro"})));
        collectAndCheckBatchReadWrite(false, true, "currency IN ('Yen')", Collections.singletonList("rate"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{1L})));
    }

    @Test
    public void testBatchWriteNonPartitionedRecordsWithoutPk() throws Exception {
        checkFileStorePath(this.tEnv, collectAndCheckBatchReadWrite(false, false, (String) null, Collections.emptyList(), ReadWriteTableTestUtil.rates()), null);
        collectAndCheckBatchReadWrite(false, false, "currency = 'Euro'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L})));
        collectAndCheckBatchReadWrite(false, false, "rate >= 1", Collections.emptyList(), ReadWriteTableTestUtil.rates());
        collectAndCheckBatchReadWrite(false, false, (String) null, Collections.singletonList("currency"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"})));
        collectAndCheckBatchReadWrite(false, false, "rate > 100 OR currency = 'Yen'", Collections.singletonList("currency"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"})));
    }

    @Test
    public void testPurgeTableUsingBatchOverWrite() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
        create.executeSql(String.format("CREATE CATALOG test_catalog WITH ('type'='table-store', 'warehouse'='%s')", TEMPORARY_FOLDER.newFolder().toURI()));
        create.useCatalog("test_catalog");
        create.executeSql("CREATE TABLE test (k1 INT, k2 STRING, v STRING)");
        validateOverwriteResult(create, "test", "", "*", Collections.emptyList());
    }

    @Test
    public void testPurgePartitionUsingBatchOverWrite() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
        create.executeSql(String.format("CREATE CATALOG test_catalog WITH ('type'='table-store', 'warehouse'='%s')", TEMPORARY_FOLDER.newFolder().toURI()));
        create.useCatalog("test_catalog");
        create.executeSql("CREATE TABLE test_single (k1 INT, k2 STRING, v STRING) PARTITIONED BY (k1)");
        validateOverwriteResult(create, "test_single", "PARTITION (k1 = 0)", "k2, v", Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-01", "flink"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-02", "table"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-02", "store"})));
        create.executeSql("CREATE TABLE test_multiple0 (k1 INT, k2 STRING, v STRING) PARTITIONED BY (k1, k2)");
        validateOverwriteResult(create, "test_multiple0", "PARTITION (k1 = 0)", "k2, v", Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-01", "flink"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-02", "table"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-02", "store"})));
        create.executeSql("CREATE TABLE test_multiple1 (k1 INT, k2 STRING, v STRING) PARTITIONED BY (k1, k2)");
        validateOverwriteResult(create, "test_multiple1", "PARTITION (k1 = 0, k2 = '2023-01-01')", "v", Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{0, "2023-01-02", "world"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-01", "flink"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-02", "table"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-02", "store"})));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void testEnableLogAndStreamingReadWritePartitionedRecordsWithPk() throws Exception {
        Tuple2<String, BlockingIterator<Row, Row>> collectAndCheckStreamingReadWriteWithoutClose = collectAndCheckStreamingReadWriteWithoutClose(Collections.emptyMap(), "dt >= '2022-01-01' AND dt <= '2022-01-03' OR currency = 'HK Dollar'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"})));
        String str = (String) collectAndCheckStreamingReadWriteWithoutClose.f0;
        checkFileStorePath(this.tEnv, str, (String) ReadWriteTableTestUtil.dailyRatesChangelogWithoutUB().f1);
        BlockingIterator blockingIterator = (BlockingIterator) collectAndCheckStreamingReadWriteWithoutClose.f1;
        this.tEnv.executeSql(String.format("INSERT INTO `%s` PARTITION (dt = '2022-01-03')\nVALUES('HK Dollar', 100), ('Yen', 20)\n", str)).await();
        this.tEnv.executeSql(String.format("INSERT INTO `%s` PARTITION (dt = '2022-01-04')\nVALUES('Yen', 20)\n", str)).await();
        Assertions.assertThat(blockingIterator.collect(2, 10L, TimeUnit.SECONDS)).containsExactlyInAnyOrderElementsOf(Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", 100L, "2022-01-03"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 20L, "2022-01-03"})));
        prepareEnvAndOverwrite(str, Collections.singletonMap("dt", "'2022-01-02'"), Arrays.asList(new String[]{"'Euro'", "100"}, new String[]{"'Yen'", "1"}));
        StreamTableEnvironment create = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
        registerTable(create, str);
        collectAndCheck(create, str, Collections.emptyMap(), null, Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 100L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", 100L, "2022-01-03"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 20L, "2022-01-03"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 20L, "2022-01-04"})));
        assertNoMoreRecords(blockingIterator);
        collectAndCheckStreamingReadWriteWithClose(true, true, true, Collections.emptyMap(), "dt = '2022-01-01'", Collections.emptyList(), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"})));
        collectAndCheckStreamingReadWriteWithClose(true, true, true, Collections.emptyMap(), "currency = 'US Dollar'", Collections.emptyList(), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"})));
        collectAndCheckStreamingReadWriteWithClose(true, true, true, Collections.emptyMap(), "dt = '2022-01-01' AND rate = 1", Collections.emptyList(), Collections.emptyList());
        collectAndCheckStreamingReadWriteWithClose(true, true, true, Collections.emptyMap(), "dt = '2022-01-02' AND currency = 'Euro'", Arrays.asList("rate", "dt", "currency"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{119L, "2022-01-02", "Euro"})));
    }

    @Test
    public void testDisableLogAndStreamingReadWritePartitionedRecordsWithPk() throws Exception {
        checkFileStorePath(this.tEnv, collectAndCheckStreamingReadWriteWithClose(false, true, true, Collections.emptyMap(), (String) null, Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"}))), (String) ReadWriteTableTestUtil.dailyRatesChangelogWithoutUB().f1);
        collectAndCheckStreamingReadWriteWithClose(false, true, true, Collections.emptyMap(), "dt < '2022-01-02'", Collections.emptyList(), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"})));
        collectAndCheckStreamingReadWriteWithClose(false, true, true, Collections.emptyMap(), "rate = 102", Collections.emptyList(), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"})));
        collectAndCheckStreamingReadWriteWithClose(false, true, true, Collections.emptyMap(), "rate = 102 or dt < '2022-01-02'", Collections.emptyList(), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"})));
        collectAndCheckStreamingReadWriteWithClose(false, true, true, Collections.emptyMap(), "rate = 102 or dt < '2022-01-02'", Collections.singletonList("currency"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"})));
    }

    @Test
    public void testStreamingReadWritePartitionedRecordsWithoutPk() throws Exception {
        checkFileStorePath(this.tEnv, collectAndCheckStreamingReadWriteWithClose(true, true, false, Collections.emptyMap(), (String) null, Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 115L, "2022-01-02"}))), (String) ReadWriteTableTestUtil.dailyRatesChangelogWithUB().f1);
        collectAndCheckStreamingReadWriteWithClose(true, true, false, Collections.emptyMap(), "dt IS NOT NULL", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 115L, "2022-01-02"})));
        collectAndCheckStreamingReadWriteWithClose(true, true, false, Collections.emptyMap(), "dt IS NULL", Collections.emptyList(), Collections.emptyList());
        collectAndCheckStreamingReadWriteWithClose(true, true, false, Collections.emptyMap(), "currency = 'US Dollar' OR rate = 115", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 115L, "2022-01-02"})));
        collectAndCheckStreamingReadWriteWithClose(true, true, false, Collections.emptyMap(), "(dt = '2022-01-02' AND currency = 'US Dollar') OR (dt = '2022-01-01' AND rate = 115)", Collections.emptyList(), Collections.emptyList());
        collectAndCheckStreamingReadWriteWithClose(true, true, false, Collections.emptyMap(), (String) null, Collections.singletonList("rate"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{115L})));
        collectAndCheckStreamingReadWriteWithClose(true, true, false, Collections.emptyMap(), "dt <> '2022-01-01'", Collections.singletonList("rate"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{115L})));
    }

    @Test
    public void testStreamingReadWriteNonPartitionedRecordsWithPk() throws Exception {
        checkFileStorePath(this.tEnv, collectAndCheckStreamingReadWriteWithClose(true, false, true, Collections.emptyMap(), (String) null, Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}))), null);
        collectAndCheckStreamingReadWriteWithClose(true, false, true, Collections.emptyMap(), "currency = 'Yen'", Collections.emptyList(), Collections.emptyList());
        collectAndCheckStreamingReadWriteWithClose(true, false, true, Collections.emptyMap(), (String) null, Collections.singletonList("currency"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"})));
        collectAndCheckStreamingReadWriteWithClose(true, false, true, Collections.emptyMap(), "rate = 102", Collections.singletonList("currency"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"})));
    }

    @Test
    public void testStreamingReadWriteNonPartitionedRecordsWithoutPk() throws Exception {
        checkFileStorePath(this.tEnv, collectAndCheckStreamingReadWriteWithClose(true, false, false, Collections.emptyMap(), (String) null, Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("+I", new Object[]{null, 100L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", null}))), null);
        collectAndCheckStreamingReadWriteWithClose(true, false, false, Collections.emptyMap(), "currency IS NOT NULL", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", null})));
        collectAndCheckStreamingReadWriteWithClose(true, false, false, Collections.emptyMap(), "rate IS NOT NULL", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("+I", new Object[]{null, 100L})));
        collectAndCheckStreamingReadWriteWithClose(true, false, false, Collections.emptyMap(), "currency IS NOT NULL AND rate is NOT NULL", Collections.singletonList("rate"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{119L})));
    }

    @Test
    public void testReadLatestChangelogOfPartitionedRecordsWithPk() throws Exception {
        collectLatestLogAndCheck(false, true, true, (String) null, Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"})));
        collectLatestLogAndCheck(false, true, true, "dt = '2022-01-01'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01"})));
        collectLatestLogAndCheck(false, true, true, "currency = 'Yen'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"})));
        collectLatestLogAndCheck(false, true, true, "rate = 114", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L, "2022-01-01"})));
        collectLatestLogAndCheck(false, true, true, "rate = 114 AND dt = '2022-01-02'", Collections.emptyList(), Collections.emptyList());
        collectLatestLogAndCheck(false, true, true, (String) null, Collections.singletonList("rate"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{1L}), TestValuesTableFactory.changelogRow("-U", new Object[]{114L}), TestValuesTableFactory.changelogRow("+U", new Object[]{116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{1L}), TestValuesTableFactory.changelogRow("-D", new Object[]{116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{119L})));
        collectLatestLogAndCheck(false, true, true, "dt = '2022-01-02'", Collections.singletonList("rate"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{119L})));
    }

    @Test
    public void testReadLatestChangelogOfPartitionedRecordsWithoutPk() throws Exception {
        collectLatestLogAndCheck(false, true, false, (String) null, Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-02"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 114L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 115L, "2022-01-02"})));
    }

    @Test
    public void testReadLatestChangelogOfNonPartitionedRecordsWithPk() throws Exception {
        collectLatestLogAndCheck(false, false, true, (String) null, Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L})));
        collectLatestLogAndCheck(false, false, true, "currency = 'Euro'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L})));
        collectLatestLogAndCheck(false, false, true, (String) null, Collections.singletonList("currency"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen"})));
        collectLatestLogAndCheck(false, false, true, "currency = 'Euro'", Collections.singletonList("rate"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{114L}), TestValuesTableFactory.changelogRow("-U", new Object[]{114L}), TestValuesTableFactory.changelogRow("+U", new Object[]{116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{119L})));
    }

    @Test
    public void testReadLatestChangelogOfNonPartitionedRecordsWithoutPk() throws Exception {
        collectLatestLogAndCheck(false, false, false, (String) null, Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("+I", new Object[]{null, 100L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", null})));
        collectLatestLogAndCheck(false, false, false, "currency = 'Euro'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L})));
        collectLatestLogAndCheck(false, false, false, (String) null, Arrays.asList("currency", "rate"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("+I", new Object[]{null, 100L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", null})));
        collectLatestLogAndCheck(false, false, false, "currency IS NOT NULL", Collections.singletonList("currency"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar"})));
        collectLatestLogAndCheck(false, false, false, "rate = 119", Collections.singletonList("currency"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"})));
    }

    @Test
    public void testReadLatestChangelogOfInsertOnlyRecords() throws Exception {
        List<Row> asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L}));
        collectLatestLogAndCheck(true, false, true, (String) null, Collections.emptyList(), asList);
        collectLatestLogAndCheck(true, false, true, (String) null, Collections.emptyList(), asList);
        collectLatestLogAndCheck(true, false, true, "rate = 114", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L})));
        collectLatestLogAndCheck(true, false, true, (String) null, Collections.singletonList("rate"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{1L}), TestValuesTableFactory.changelogRow("-U", new Object[]{114L}), TestValuesTableFactory.changelogRow("+U", new Object[]{119L})));
        collectLatestLogAndCheck(true, false, true, "rate = 114", Collections.singletonList("currency"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro"})));
    }

    @Test
    public void testReadInsertOnlyChangelogFromTimestamp() throws Exception {
        collectChangelogFromTimestampAndCheck(true, true, true, 0L, Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"})));
        collectChangelogFromTimestampAndCheck(true, true, false, 0L, (List<Row>) ReadWriteTableTestUtil.dailyRates().f0);
        collectChangelogFromTimestampAndCheck(true, false, true, 0L, Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L})));
        collectChangelogFromTimestampAndCheck(true, false, false, 0L, ReadWriteTableTestUtil.rates());
        collectChangelogFromTimestampAndCheck(true, false, false, 9223372036854775806L, Collections.emptyList());
    }

    @Test
    public void testReadRetractChangelogFromTimestamp() throws Exception {
        collectChangelogFromTimestampAndCheck(false, true, false, 0L, Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-02"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 114L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 115L, "2022-01-02"})));
        collectChangelogFromTimestampAndCheck(false, true, true, 0L, Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"})));
    }

    @Test
    public void testSourceParallelism() throws Exception {
        String str = (String) createSourceAndManagedTable(false, false, false, Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), null).f1;
        Assertions.assertThat(sourceParallelism(ShowCreateUtil.buildSimpleSelectQuery(str, Collections.emptyMap()))).isEqualTo(this.env.getParallelism());
        Assertions.assertThat(sourceParallelism(ShowCreateUtil.buildSimpleSelectQuery(str, Collections.singletonMap(FlinkConnectorOptions.SCAN_PARALLELISM.key(), "66")))).isEqualTo(66);
    }

    @Test
    public void testSinkParallelism() throws IOException {
        testSinkParallelism(null, this.env.getParallelism());
        testSinkParallelism(23, 23);
    }

    @Test
    public void testQueryContainsDefaultFieldName() throws Exception {
        this.rootPath = TEMPORARY_FOLDER.newFolder().getPath();
        this.tEnv = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
        this.tEnv.executeSql(String.format("create table dummy_source (f0 int, f1 string) with ('connector' = 'values', 'bounded' = 'true', 'data-id' = '%s')", TestValuesTableFactory.registerData(Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{1, "abc"})))));
        this.tEnv.executeSql(String.format("create table managed_table with ('root-path' = '%s') like dummy_source (excluding options)", this.rootPath));
        this.tEnv.executeSql("insert into managed_table select * from dummy_source").await();
        Assertions.assertThat(BlockingIterator.of(this.tEnv.executeSql("select * from managed_table").collect()).collect(1, 5L, TimeUnit.SECONDS)).containsOnly(new Row[]{TestValuesTableFactory.changelogRow("+I", new Object[]{1, "abc"})});
    }

    @Test
    public void testLike() throws Exception {
        this.rootPath = TEMPORARY_FOLDER.newFolder().getPath();
        this.tEnv = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
        String registerData = TestValuesTableFactory.registerData(Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1, "test_1"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2, "test_2"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "test_%"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2, "test%2"}), TestValuesTableFactory.changelogRow("+I", new Object[]{3, "university"}), TestValuesTableFactory.changelogRow("+I", new Object[]{4, "very"}), TestValuesTableFactory.changelogRow("+I", new Object[]{5, "yield"})));
        UUID randomUUID = UUID.randomUUID();
        this.tEnv.executeSql(String.format("create table `helper_source_%s` (f0 int, f1 string) with ('connector' = 'values', 'bounded' = 'true', 'data-id' = '%s')", randomUUID, registerData));
        UUID randomUUID2 = UUID.randomUUID();
        this.tEnv.executeSql(String.format("create table `managed_table_%s` with ('%s' = '%s') like `helper_source_%s` (excluding options)", randomUUID2, FlinkConnectorOptions.ROOT_PATH.key(), this.rootPath, randomUUID));
        this.tEnv.executeSql(String.format("insert into `managed_table_%s` select * from `helper_source_%s`", randomUUID2, randomUUID)).await();
        this.tEnv.executeSql(String.format("insert into `managed_table_%s` values (7, 'villa'), (8, 'tests'), (20, 'test_123')", randomUUID2)).await();
        this.tEnv.executeSql("insert into `managed_table_" + randomUUID2 + "` values (9, 'valley'), (10, 'tested'), (100, 'test%fff')").await();
        collectAndCheck(this.tEnv, "managed_table_" + randomUUID2, Collections.emptyMap(), "f1 like 'test%'", Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1, "test_1"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2, "test_2"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "test_%"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2, "test%2"}), TestValuesTableFactory.changelogRow("+I", new Object[]{8, "tests"}), TestValuesTableFactory.changelogRow("+I", new Object[]{10, "tested"}), TestValuesTableFactory.changelogRow("+I", new Object[]{20, "test_123"})));
        collectAndCheck(this.tEnv, "managed_table_" + randomUUID2, Collections.emptyMap(), "f1 like 'v%'", Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{4, "very"}), TestValuesTableFactory.changelogRow("+I", new Object[]{7, "villa"}), TestValuesTableFactory.changelogRow("+I", new Object[]{9, "valley"})));
        collectAndCheck(this.tEnv, "managed_table_" + randomUUID2, Collections.emptyMap(), "f1 like 'test=_%' escape '='", Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1, "test_1"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2, "test_2"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "test_%"}), TestValuesTableFactory.changelogRow("+I", new Object[]{20, "test_123"})));
        collectAndCheck(this.tEnv, "managed_table_" + randomUUID2, Collections.emptyMap(), "f1 like 'test=__' escape '='", Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1, "test_1"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2, "test_2"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "test_%"})));
        collectAndCheck(this.tEnv, "managed_table_" + randomUUID2, Collections.emptyMap(), "f1 like 'test$%%' escape '$'", Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{2, "test%2"}), TestValuesTableFactory.changelogRow("+I", new Object[]{100, "test%fff"})));
    }

    @Test
    public void testIn() throws Exception {
        this.rootPath = TEMPORARY_FOLDER.newFolder().getPath();
        this.tEnv = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
        List asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1, "aaa"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2, "bbb"}), TestValuesTableFactory.changelogRow("+I", new Object[]{3, "ccc"}), TestValuesTableFactory.changelogRow("+I", new Object[]{4, "ddd"}), TestValuesTableFactory.changelogRow("+I", new Object[]{5, "eee"}), TestValuesTableFactory.changelogRow("+I", new Object[]{6, "aaa"}), TestValuesTableFactory.changelogRow("+I", new Object[]{7, "bbb"}), TestValuesTableFactory.changelogRow("+I", new Object[]{8, "ccc"}), TestValuesTableFactory.changelogRow("+I", new Object[]{9, "ddd"}), TestValuesTableFactory.changelogRow("+I", new Object[]{10, "eee"}), TestValuesTableFactory.changelogRow("+I", new Object[]{11, "aaa"}), TestValuesTableFactory.changelogRow("+I", new Object[]{12, "bbb"}), TestValuesTableFactory.changelogRow("+I", new Object[]{13, "ccc"}), TestValuesTableFactory.changelogRow("+I", new Object[]{14, "ddd"}), TestValuesTableFactory.changelogRow("+I", new Object[]{15, "eee"}), TestValuesTableFactory.changelogRow("+I", new Object[]{16, "aaa"}), TestValuesTableFactory.changelogRow("+I", new Object[]{17, "bbb"}), TestValuesTableFactory.changelogRow("+I", new Object[]{18, "ccc"}), TestValuesTableFactory.changelogRow("+I", new Object[]{19, "ddd"}), TestValuesTableFactory.changelogRow("+I", new Object[]{20, "eee"}), TestValuesTableFactory.changelogRow("+I", new Object[]{21, "fff"}));
        String registerData = TestValuesTableFactory.registerData(asList);
        UUID randomUUID = UUID.randomUUID();
        this.tEnv.executeSql(String.format("create table `helper_source_%s` (f0 int, f1 string) with ('connector' = 'values', 'bounded' = 'true', 'data-id' = '%s')", randomUUID, registerData));
        UUID randomUUID2 = UUID.randomUUID();
        this.tEnv.executeSql(String.format("create table `managed_table_%s` with ('%s' = '%s') like `helper_source_%s` (excluding options)", randomUUID2, FlinkConnectorOptions.ROOT_PATH.key(), this.rootPath, randomUUID));
        this.tEnv.executeSql(String.format("insert into `managed_table_%s` select * from `helper_source_%s`", randomUUID2, randomUUID)).await();
        collectAndCheck(this.tEnv, "managed_table_" + randomUUID2, Collections.emptyMap(), "f0 in (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21)", asList);
        collectAndCheck(this.tEnv, "managed_table_" + randomUUID2, Collections.emptyMap(), "f1 in ('aaa', 'bbb', 'ccc', 'ddd', 'eee', 'fff')", asList);
    }

    @Test
    public void testUnsupportedPredicate() throws Exception {
        collectAndCheckBatchReadWrite(true, true, "currency similar to 'Euro'", Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"})));
    }

    @Test
    public void testChangeBucketNumber() throws Exception {
        this.rootPath = TEMPORARY_FOLDER.newFolder().getPath();
        this.tEnv = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
        this.tEnv.executeSql(String.format("CREATE TABLE IF NOT EXISTS rates (\ncurrency STRING,\n rate BIGINT,\n dt STRING\n) PARTITIONED BY (dt)\nWITH (\n 'bucket' = '2',\n 'root-path' = '%s'\n)", this.rootPath));
        this.tEnv.executeSql("INSERT INTO rates VALUES('US Dollar', 102, '2022-06-20')").await();
        assertChangeBucketWithoutRescale(3);
        assertChangeBucketWithoutRescale(1);
    }

    private void assertChangeBucketWithoutRescale(int i) throws Exception {
        this.tEnv.executeSql(String.format("ALTER TABLE rates SET ('bucket' = '%d')", Integer.valueOf(i)));
        Assertions.assertThat(BlockingIterator.of(this.tEnv.executeSql("SELECT * FROM rates").collect()).collect()).containsExactlyInAnyOrder(new Row[]{TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-06-20"})});
        Assertions.assertThatThrownBy(() -> {
            this.tEnv.executeSql("INSERT INTO rates VALUES('US Dollar', 102, '2022-06-20')").await();
        }).getRootCause().isInstanceOf(TableException.class).hasMessage(String.format("Try to write partition {dt=2022-06-20} with a new bucket num %d, but the previous bucket num is 2. Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.", Integer.valueOf(i)));
    }

    @Test
    public void testSuccessiveWriteAndRead() throws Exception {
        String collectAndCheckBatchReadWrite = collectAndCheckBatchReadWrite(false, false, (String) null, Collections.emptyList(), ReadWriteTableTestUtil.rates());
        this.tEnv.executeSql(String.format("INSERT INTO `%s` VALUES ('US Dollar', 102),\n('Euro', 114),\n('Yen', 1),\n('Euro', 114),\n('Euro', 119)", collectAndCheckBatchReadWrite)).await();
        collectAndCheck(this.tEnv, collectAndCheckBatchReadWrite, Collections.emptyMap(), "currency = 'Yen'", Collections.nCopies(2, TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L})));
    }

    @Test
    public void testStreamingInsertOverwrite() throws Exception {
        this.rootPath = TEMPORARY_FOLDER.newFolder().getPath();
        this.tEnv = StreamTableEnvironment.create(buildStreamEnv(), EnvironmentSettings.inStreamingMode());
        this.tEnv.executeSql(String.format("CREATE TABLE IF NOT EXISTS rates (\ncurrency STRING,\n rate BIGINT,\n dt STRING\n) PARTITIONED BY (dt)\nWITH (\n 'bucket' = '2',\n 'root-path' = '%s'\n)", this.rootPath));
        Assertions.assertThatThrownBy(() -> {
            this.tEnv.executeSql("INSERT OVERWRITE rates VALUES('US Dollar', 102, '2022-06-20')");
        }).isInstanceOf(UnsupportedOperationException.class).hasMessage("Table store doesn't support streaming INSERT OVERWRITE.");
    }

    private void validateOverwriteResult(TableEnvironment tableEnvironment, String str, String str2, String str3, List<Row> list) throws Exception {
        tableEnvironment.executeSql(String.format("INSERT INTO %s VALUES ", str) + "(0, '2023-01-01', 'hi'), (0, '2023-01-01', 'hello'), (0, '2023-01-02', 'world'), (1, '2023-01-01', 'flink'), (1, '2023-01-02', 'table'), (1, '2023-01-02', 'store')").await();
        Assertions.assertThat(CollectionUtil.iteratorToList(tableEnvironment.executeSql("SELECT * FROM " + str).collect())).containsExactlyInAnyOrderElementsOf(Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{0, "2023-01-01", "hi"}), TestValuesTableFactory.changelogRow("+I", new Object[]{0, "2023-01-01", "hello"}), TestValuesTableFactory.changelogRow("+I", new Object[]{0, "2023-01-02", "world"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-01", "flink"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-02", "table"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-02", "store"})));
        tableEnvironment.executeSql(String.format("INSERT OVERWRITE %s %s SELECT %s FROM %s WHERE false", str, str2, str3, str)).await();
        Assertions.assertThat(CollectionUtil.iteratorToList(tableEnvironment.executeSql("SELECT * FROM " + str).collect())).containsExactlyInAnyOrderElementsOf(list);
    }

    private String collectAndCheckBatchReadWrite(boolean z, boolean z2, @Nullable String str, List<String> list, List<Row> list2) throws Exception {
        return collectAndCheckBatchReadWrite(z ? Collections.singletonList("dt") : Collections.emptyList(), z2 ? z ? Arrays.asList("currency", "dt") : Collections.singletonList("currency") : Collections.emptyList(), str, list, list2);
    }

    private String collectAndCheckStreamingReadWriteWithClose(boolean z, boolean z2, boolean z3, Map<String, String> map, @Nullable String str, List<String> list, List<Row> list2) throws Exception {
        return collectAndCheckStreamingReadWriteWithClose(z, z2 ? Collections.singletonList("dt") : Collections.emptyList(), z3 ? z2 ? Arrays.asList("currency", "dt") : Collections.singletonList("currency") : Collections.emptyList(), map, str, list, list2);
    }

    private Tuple2<String, BlockingIterator<Row, Row>> collectAndCheckStreamingReadWriteWithoutClose(Map<String, String> map, @Nullable String str, List<String> list, List<Row> list2) throws Exception {
        return collectAndCheckStreamingReadWriteWithoutClose(Collections.singletonList("dt"), Arrays.asList("currency", "dt"), map, str, list, list2);
    }

    private void collectLatestLogAndCheck(boolean z, boolean z2, boolean z3, @Nullable String str, List<String> list, List<Row> list2) throws Exception {
        collectLatestLogAndCheck(z, z2 ? Collections.singletonList("dt") : Collections.emptyList(), z3 ? z2 ? Arrays.asList("currency", "dt") : Collections.singletonList("currency") : Collections.emptyList(), str, list, list2);
    }

    private void collectChangelogFromTimestampAndCheck(boolean z, boolean z2, boolean z3, long j, List<Row> list) throws Exception {
        collectChangelogFromTimestampAndCheck(z, z2 ? Collections.singletonList("dt") : Collections.emptyList(), z3 ? z2 ? Arrays.asList("currency", "dt") : Collections.singletonList("currency") : Collections.emptyList(), j, list);
    }

    private int sourceParallelism(String str) {
        return this.tEnv.toChangelogStream(this.tEnv.sqlQuery(str)).getParallelism();
    }

    private void testSinkParallelism(Integer num, int i) throws IOException {
        HashMap hashMap = new HashMap();
        if (num != null) {
            hashMap.put(FlinkConnectorOptions.SINK_PARALLELISM.key(), num.toString());
        }
        hashMap.put("path", new File(TEMPORARY_FOLDER.newFolder(), UUID.randomUUID().toString()).toURI().toString());
        FactoryUtil.DefaultDynamicTableContext defaultDynamicTableContext = new FactoryUtil.DefaultDynamicTableContext(ObjectIdentifier.of("default", "default", "t1"), TableStoreTestBase.createResolvedTable(hashMap, RowType.of(new LogicalType[]{new VarCharType(Integer.MAX_VALUE)}, new String[]{"a"}), (List<String>) Collections.emptyList(), (List<String>) Collections.emptyList()), Collections.emptyMap(), new Configuration(), Thread.currentThread().getContextClassLoader(), false);
        new TableStoreManagedFactory().onCreateTable(defaultDynamicTableContext, false);
        DynamicTableSink createDynamicTableSink = new TableStoreManagedFactory().createDynamicTableSink(defaultDynamicTableContext);
        Assertions.assertThat(createDynamicTableSink).isInstanceOf(TableStoreSink.class);
        DataStreamSinkProvider sinkRuntimeProvider = createDynamicTableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
        Assertions.assertThat(sinkRuntimeProvider).isInstanceOf(DataStreamSinkProvider.class);
        Transformation transformation = sinkRuntimeProvider.consumeDataStream((ProviderContext) null, this.env.fromCollection(Collections.singletonList(GenericRowData.of(new Object[0])))).getTransformation();
        while (true) {
            Transformation transformation2 = transformation;
            if (transformation2 instanceof PartitionTransformation) {
                return;
            }
            Assertions.assertThat(transformation2.getParallelism()).isIn(new Object[]{1, Integer.valueOf(i)});
            transformation = (Transformation) transformation2.getInputs().get(0);
        }
    }
}
