package org.apache.paimon.flink;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.sink.FlinkTableSink;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.flink.util.ReadWriteTableTestUtil;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/paimon/flink/ReadWriteTableITCase.class */
public class ReadWriteTableITCase extends AbstractTestBase {
    private final Map<String, String> streamingReadOverwrite = Collections.singletonMap(CoreOptions.STREAMING_READ_OVERWRITE.key(), "true");
    private final Map<String, String> staticPartitionOverwrite = Collections.singletonMap(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false");

    @BeforeEach
    public void setUp() {
        ReadWriteTableTestUtil.init(getTempDirPath());
    }

    @Test
    public void testBatchReadWriteWithPartitionedRecordsWithPk() throws Exception {
        List asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"}));
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt String"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"));
        ReadWriteTableTestUtil.insertInto(createTable, "('US Dollar', 114, '2022-01-01')", "('Yen', 1, '2022-01-01')", "('Euro', 114, '2022-01-01')", "('Euro', 119, '2022-01-02')");
        ReadWriteTableTestUtil.checkFileStorePath(createTable, Arrays.asList("dt=2022-01-01", "dt=2022-01-02"));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(createTable), asList);
        ReadWriteTableTestUtil.insertOverwritePartition(createTable, "PARTITION (dt = '2022-01-02')", "('Euro', 100)", "('Yen', 1)");
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE dt IN ('2022-01-02')"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 100L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-02"})));
        List asList2 = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 114L, "2022-01-01"}));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE dt <> '2022-01-02'"), asList2);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE dt IN ('2022-01-01')"), asList2);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE rate >= 100"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 100L, "2022-01-02"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE dt = '2022-01-02' AND rate >= 100"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 100L, "2022-01-02"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "dt", ""), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"2022-01-02"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "dt, currency, rate", ""), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"2022-01-01", "US Dollar", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"2022-01-01", "Yen", 1L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"2022-01-01", "Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"2022-01-02", "Euro", 100L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"2022-01-02", "Yen", 1L})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "currency, dt", "WHERE rate = 114"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "2022-01-01"})));
    }

    @Test
    public void testNaNType() throws Exception {
        ReadWriteTableTestUtil.bEnv.executeSql("CREATE TEMPORARY TABLE S ( a DOUBLE,b DOUBLE,c STRING) WITH ( 'connector' = 'filesystem', 'format'='json' , 'path' ='" + ReadWriteTableTestUtil.warehouse + "/S' )");
        ReadWriteTableTestUtil.bEnv.executeSql("INSERT INTO S VALUES (1.0,2.0,'a'),\n(0.0,0.0,'b'),\n(1.0,1.0,'c'),\n(0.0,0.0,'d'),\n(1.0,0.0,'e'),\n(0.0,0.0,'f'),\n(-1.0,0.0,'g'),\n(1.0,-1.0,'h'),\n(1.0,-2.0,'i')").await();
        ReadWriteTableTestUtil.bEnv.executeSql("CREATE TABLE T (d STRING, e DOUBLE)");
        ReadWriteTableTestUtil.bEnv.executeSql("INSERT INTO T SELECT c,a/b FROM S").await();
        Assertions.assertThat(BlockingIterator.of(ReadWriteTableTestUtil.bEnv.executeSql("SELECT * FROM  T").collect()).collect(9)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"a", Double.valueOf(0.5d)}), Row.of(new Object[]{"b", Double.valueOf(Double.NaN)}), Row.of(new Object[]{"c", Double.valueOf(1.0d)}), Row.of(new Object[]{"d", Double.valueOf(Double.NaN)}), Row.of(new Object[]{"e", Double.valueOf(Double.POSITIVE_INFINITY)}), Row.of(new Object[]{"f", Double.valueOf(Double.NaN)}), Row.of(new Object[]{"g", Double.valueOf(Double.NEGATIVE_INFINITY)}), Row.of(new Object[]{"h", Double.valueOf(-1.0d)}), Row.of(new Object[]{"i", Double.valueOf(-0.5d)})});
    }

    @Test
    public void testBatchReadWriteWithPartitionedRecordsWithoutPk() throws Exception {
        List asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"}));
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt String"), Collections.emptyList(), Collections.singletonList("dt"));
        ReadWriteTableTestUtil.insertInto(createTable, "('US Dollar', 102, '2022-01-01')", "('Euro', 114, '2022-01-01')", "('Yen', 1, '2022-01-01')", "('Euro', 114, '2022-01-01')", "('US Dollar', 114, '2022-01-01')", "('Euro', 119, '2022-01-02')");
        ReadWriteTableTestUtil.checkFileStorePath(createTable, Arrays.asList("dt=2022-01-01", "dt=2022-01-02"));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(createTable), asList);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE dt >= '2022-01-01'"), asList);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE currency = 'US Dollar'"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 114L, "2022-01-01"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE dt = '2022-01-01' OR rate > 115"), asList);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "currency", ""), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "currency, dt", "WHERE rate = 119"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "2022-01-02"})));
    }

    @Test
    public void testBatchReadWriteWithNonPartitionedRecordsWithPk() throws Exception {
        List asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}));
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList());
        ReadWriteTableTestUtil.insertInto(createTable, "('US Dollar', 102)", "('Yen', 1)", "('Euro', 119)");
        ReadWriteTableTestUtil.checkFileStorePath(createTable, Collections.emptyList());
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", ""), asList);
        ReadWriteTableTestUtil.insertOverwrite(createTable, "('Euro', 100)");
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(createTable), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 100L})));
        ReadWriteTableTestUtil.insertOverwrite(createTable, "('US Dollar', 102)", "('Yen', 1)", "('Euro', 119)");
        List singletonList = Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE currency = 'Euro'"), singletonList);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE rate > 102 AND rate <= 119"), singletonList);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "currency", ""), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "rate", "WHERE currency IN ('Yen')"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{1L})));
    }

    @Test
    public void testBatchReadWriteWithNonPartitionedRecordsWithoutPk() throws Exception {
        List asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}));
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList());
        ReadWriteTableTestUtil.insertInto(createTable, "('US Dollar', 102)", "('Euro', 114)", "('Yen', 1)", "('Euro', 114)", "('Euro', 119)");
        ReadWriteTableTestUtil.checkFileStorePath(createTable, Collections.emptyList());
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(createTable), asList);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE rate >= 1"), asList);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE currency = 'Euro'"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "currency", ""), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "currency", "WHERE rate > 100 OR currency = 'Yen'"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"})));
    }

    @Test
    public void testStreamingReadWriteWithPartitionedRecordsWithPk() throws Exception {
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L, "2022-01-02"})), "dt:2022-01-01;dt:2022-01-02", false, "I,UA,D");
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"));
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable, createTable);
        ReadWriteTableTestUtil.checkFileStorePath(createTable, Arrays.asList("dt=2022-01-01", "dt=2022-01-02"));
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildSimpleQuery(createTable), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE dt < '2022-01-02'"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE rate = 102"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE rate = 102 OR dt < '2022-01-02'"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01"}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTable, "currency", "WHERE rate = 102 OR dt < '2022-01-02'"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"}))).close();
    }

    @Test
    void testStreamingReadWriteWithNonPartitionedRecordsWithPk() throws Exception {
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L})), null, false, "I,UA,D");
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList());
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable, createTable);
        ReadWriteTableTestUtil.checkFileStorePath(createTable, Collections.emptyList());
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildSimpleQuery(createTable), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE currency = 'Yen'"), Collections.emptyList()).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTable, "currency", ""), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro"}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTable, "currency", "WHERE rate = 102"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar"}))).close();
    }

    @Test
    public void testDynamicOverwrite() throws Exception {
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("pk INT", "part0 INT", "part1 STRING", "v STRING"), Arrays.asList("pk", "part0", "part1"), Arrays.asList("part0", "part1"), this.streamingReadOverwrite);
        ReadWriteTableTestUtil.insertInto(createTable, "(1, 1, 'A', 'Hi')", "(2, 1, 'A', 'Hello')", "(3, 1, 'A', 'World')", "(4, 1, 'B', 'To')", "(5, 1, 'B', 'Apache')", "(6, 1, 'B', 'Paimon')", "(7, 2, 'A', 'Test')", "(8, 2, 'B', 'Case')");
        BlockingIterator<Row, Row> testStreamingRead = ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildSimpleQuery(createTable), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1, 1, "A", "Hi"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2, 1, "A", "Hello"}), TestValuesTableFactory.changelogRow("+I", new Object[]{3, 1, "A", "World"}), TestValuesTableFactory.changelogRow("+I", new Object[]{4, 1, "B", "To"}), TestValuesTableFactory.changelogRow("+I", new Object[]{5, 1, "B", "Apache"}), TestValuesTableFactory.changelogRow("+I", new Object[]{6, 1, "B", "Paimon"}), TestValuesTableFactory.changelogRow("+I", new Object[]{7, 2, "A", "Test"}), TestValuesTableFactory.changelogRow("+I", new Object[]{8, 2, "B", "Case"})));
        ReadWriteTableTestUtil.bEnv.executeSql(String.format("INSERT OVERWRITE `%s` VALUES (4, 1, 'B', 'Where'), (5, 1, 'B', 'When'), (10, 2, 'A', 'Static'), (11, 2, 'A', 'Dynamic')", createTable)).await();
        Assertions.assertThat(testStreamingRead.collect(8)).containsExactlyInAnyOrder(new Row[]{TestValuesTableFactory.changelogRow("-D", new Object[]{4, 1, "B", "To"}), TestValuesTableFactory.changelogRow("-D", new Object[]{5, 1, "B", "Apache"}), TestValuesTableFactory.changelogRow("-D", new Object[]{6, 1, "B", "Paimon"}), TestValuesTableFactory.changelogRow("-D", new Object[]{7, 2, "A", "Test"}), TestValuesTableFactory.changelogRow("+I", new Object[]{4, 1, "B", "Where"}), TestValuesTableFactory.changelogRow("+I", new Object[]{5, 1, "B", "When"}), TestValuesTableFactory.changelogRow("+I", new Object[]{10, 2, "A", "Static"}), TestValuesTableFactory.changelogRow("+I", new Object[]{11, 2, "A", "Dynamic"})});
        ReadWriteTableTestUtil.assertNoMoreRecords(testStreamingRead);
        testStreamingRead.close();
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(createTable), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1, 1, "A", "Hi"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2, 1, "A", "Hello"}), TestValuesTableFactory.changelogRow("+I", new Object[]{3, 1, "A", "World"}), TestValuesTableFactory.changelogRow("+I", new Object[]{4, 1, "B", "Where"}), TestValuesTableFactory.changelogRow("+I", new Object[]{5, 1, "B", "When"}), TestValuesTableFactory.changelogRow("+I", new Object[]{10, 2, "A", "Static"}), TestValuesTableFactory.changelogRow("+I", new Object[]{11, 2, "A", "Dynamic"}), TestValuesTableFactory.changelogRow("+I", new Object[]{8, 2, "B", "Case"})));
    }

    @Test
    public void testPurgeTableUsingBatchOverWrite() throws Exception {
        validatePurgingResult(ReadWriteTableTestUtil.createTable(Arrays.asList("k0 INT", "k1 STRING", "v STRING"), Collections.emptyList(), Collections.emptyList(), this.staticPartitionOverwrite), "", "*", Collections.emptyList());
    }

    @Test
    public void testPurgePartitionUsingBatchOverWrite() throws Exception {
        List asList = Arrays.asList("k0 INT", "k1 STRING", "v STRING");
        validatePurgingResult(ReadWriteTableTestUtil.createTable(asList, Collections.emptyList(), Collections.singletonList("k0"), this.staticPartitionOverwrite), "PARTITION (k0 = 0)", "k1, v", Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-01", "flink"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-02", "table"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-02", "store"})));
        validatePurgingResult(ReadWriteTableTestUtil.createTable(asList, Collections.emptyList(), Arrays.asList("k0", "k1"), this.staticPartitionOverwrite), "PARTITION (k0 = 0)", "k1, v", Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-01", "flink"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-02", "table"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-02", "store"})));
        validatePurgingResult(ReadWriteTableTestUtil.createTable(asList, Collections.emptyList(), Arrays.asList("k0", "k1"), this.staticPartitionOverwrite), "PARTITION (k0 = 0, k1 = '2023-01-01')", "v", Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{0, "2023-01-02", "world"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-01", "flink"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-02", "table"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-02", "store"})));
    }

    @Test
    public void testStreamingReadOverwriteWithPartitionedRecords() throws Exception {
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt String"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), this.streamingReadOverwrite);
        ReadWriteTableTestUtil.insertInto(createTable, "('US Dollar', 114, '2022-01-01')", "('Yen', 1, '2022-01-01')", "('Euro', 114, '2022-01-01')", "('Euro', 119, '2022-01-02')");
        ReadWriteTableTestUtil.checkFileStorePath(createTable, Arrays.asList("dt=2022-01-01", "dt=2022-01-02"));
        ReadWriteTableTestUtil.insertOverwritePartition(createTable, "PARTITION (dt = '2022-01-01')", "('US Dollar', 120)");
        BlockingIterator<Row, Row> testStreamingRead = ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildSimpleQuery(createTable), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 120L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"})));
        ReadWriteTableTestUtil.insertOverwritePartition(createTable, "PARTITION (dt = '2022-01-02')", "('Euro', 100)", "('Yen', 1)");
        ReadWriteTableTestUtil.validateStreamingReadResult(testStreamingRead, Arrays.asList(TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 100L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-02"})));
        testStreamingRead.close();
    }

    @Test
    public void testStreamingReadOverwriteWithoutPartitionedRecords() throws Exception {
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Collections.singletonList("currency"), Collections.emptyList(), this.streamingReadOverwrite);
        ReadWriteTableTestUtil.insertInto(createTable, "('US Dollar', 102, '2022-01-01')", "('Yen', 1, '2022-01-02')", "('Euro', 119, '2022-01-02')");
        ReadWriteTableTestUtil.checkFileStorePath(createTable, Collections.emptyList());
        BlockingIterator<Row, Row> testStreamingRead = ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTable, "currency, rate", "WHERE dt = '2022-01-02'"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L})));
        ReadWriteTableTestUtil.insertOverwrite(createTable, "('US Dollar', 100, '2022-01-02')", "('Yen', 10, '2022-01-01')");
        ReadWriteTableTestUtil.validateStreamingReadResult(testStreamingRead, Arrays.asList(TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 100L})));
        testStreamingRead.close();
    }

    @Test
    public void testUnsupportStreamingReadOverwriteWithoutPk() {
        Assertions.assertThatThrownBy(() -> {
            ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt String"), Collections.emptyList(), Collections.singletonList("dt"), this.streamingReadOverwrite);
        }).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(RuntimeException.class, "Doesn't support streaming read the changes from overwrite when the primary keys are not defined.")});
    }

    @Test
    public void testLike() throws Exception {
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("f0 INT", "f1 STRING"), Collections.emptyList(), Collections.emptyList());
        ReadWriteTableTestUtil.insertInto(createTable, "(1, 'test_1')", "(2, 'test_2')", "(1, 'test_%')", "(2, 'test%2')", "(3, 'university')", "(4, 'very')", "(5, 'yield')");
        ReadWriteTableTestUtil.insertInto(createTable, "(7, 'villa')", "(8, 'tests')", "(20, 'test_123')", "(9, 'valley')", "(10, 'tested')", "(100, 'test%fff')");
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE f1 LIKE 'test%'"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1, "test_1"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2, "test_2"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "test_%"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2, "test%2"}), TestValuesTableFactory.changelogRow("+I", new Object[]{8, "tests"}), TestValuesTableFactory.changelogRow("+I", new Object[]{10, "tested"}), TestValuesTableFactory.changelogRow("+I", new Object[]{20, "test_123"}), TestValuesTableFactory.changelogRow("+I", new Object[]{100, "test%fff"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE f1 LIKE 'v%'"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{4, "very"}), TestValuesTableFactory.changelogRow("+I", new Object[]{7, "villa"}), TestValuesTableFactory.changelogRow("+I", new Object[]{9, "valley"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE f1 LIKE 'test=_%' ESCAPE '='"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1, "test_1"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2, "test_2"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "test_%"}), TestValuesTableFactory.changelogRow("+I", new Object[]{20, "test_123"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE f1 LIKE 'test=__' ESCAPE '='"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1, "test_1"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2, "test_2"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "test_%"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE f1 LIKE 'test$%%' ESCAPE '$'"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{2, "test%2"}), TestValuesTableFactory.changelogRow("+I", new Object[]{100, "test%fff"})));
    }

    @Test
    public void testIn() throws Exception {
        List asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1, "aaa"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2, "bbb"}), TestValuesTableFactory.changelogRow("+I", new Object[]{3, "ccc"}), TestValuesTableFactory.changelogRow("+I", new Object[]{4, "ddd"}), TestValuesTableFactory.changelogRow("+I", new Object[]{5, "eee"}), TestValuesTableFactory.changelogRow("+I", new Object[]{6, "aaa"}), TestValuesTableFactory.changelogRow("+I", new Object[]{7, "bbb"}), TestValuesTableFactory.changelogRow("+I", new Object[]{8, "ccc"}), TestValuesTableFactory.changelogRow("+I", new Object[]{9, "ddd"}), TestValuesTableFactory.changelogRow("+I", new Object[]{10, "eee"}), TestValuesTableFactory.changelogRow("+I", new Object[]{11, "aaa"}), TestValuesTableFactory.changelogRow("+I", new Object[]{12, "bbb"}), TestValuesTableFactory.changelogRow("+I", new Object[]{13, "ccc"}), TestValuesTableFactory.changelogRow("+I", new Object[]{14, "ddd"}), TestValuesTableFactory.changelogRow("+I", new Object[]{15, "eee"}), TestValuesTableFactory.changelogRow("+I", new Object[]{16, "aaa"}), TestValuesTableFactory.changelogRow("+I", new Object[]{17, "bbb"}), TestValuesTableFactory.changelogRow("+I", new Object[]{18, "ccc"}), TestValuesTableFactory.changelogRow("+I", new Object[]{19, "ddd"}), TestValuesTableFactory.changelogRow("+I", new Object[]{20, "eee"}), TestValuesTableFactory.changelogRow("+I", new Object[]{21, "fff"}));
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("f0 INT", "f1 STRING"), Collections.emptyList(), Collections.emptyList());
        ReadWriteTableTestUtil.insertInto(createTable, "(1, 'aaa')", "(2, 'bbb')", "(3, 'ccc')", "(4, 'ddd')", "(5, 'eee')", "(6, 'aaa')", "(7, 'bbb')", "(8, 'ccc')", "(9, 'ddd')", "(10, 'eee')", "(11, 'aaa')", "(12, 'bbb')", "(13, 'ccc')", "(14, 'ddd')", "(15, 'eee')", "(16, 'aaa')", "(17, 'bbb')", "(18, 'ccc')", "(19, 'ddd')", "(20, 'eee')", "(21, 'fff')");
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE f0 IN (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21)"), asList);
        ArrayList arrayList = new ArrayList(asList);
        arrayList.remove(20);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE f1 IN ('aaa', 'bbb', 'ccc', 'ddd', 'eee')"), arrayList);
    }

    @Test
    public void testUnsupportedPredicate() throws Exception {
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"));
        ReadWriteTableTestUtil.insertInto(createTable, "('US Dollar', 102, '2022-01-01')", "('Euro', 114, '2022-01-01')", "('Yen', 1, '2022-01-01')", "('Euro', 114, '2022-01-01')", "('US Dollar', 114, '2022-01-01')", "('Euro', 119, '2022-01-02')");
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "*", "WHERE currency SIMILAR TO 'Euro'"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02"})));
    }

    @Test
    public void testSourceParallelism() throws Exception {
        List asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}));
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), Collections.singletonMap(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "false"));
        ReadWriteTableTestUtil.insertInto(createTable, "('US Dollar', 102)", "('Euro', 114)", "('Yen', 1)", "('Euro', 114)", "('Euro', 119)");
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(createTable), asList);
        Assertions.assertThat(sourceParallelism(ReadWriteTableTestUtil.buildSimpleQuery(createTable))).isEqualTo(ReadWriteTableTestUtil.bExeEnv.getParallelism());
        Assertions.assertThat(sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(createTable, "*", "", new HashMap<String, String>() { // from class: org.apache.paimon.flink.ReadWriteTableITCase.1
            {
                put(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "false");
                put(FlinkConnectorOptions.SCAN_PARALLELISM.key(), "66");
            }
        }))).isEqualTo(66);
    }

    @Test
    void testConvertRowType2Serializer() {
        StreamTableEnvironment create = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment(), EnvironmentSettings.newInstance().inBatchMode().build());
        create.executeSql("CREATE CATALOG my_catalog WITH (\n    'type' = 'paimon',\n    'warehouse' = '" + getTempDirPath() + "'\n)");
        create.executeSql("USE CATALOG my_catalog");
        create.executeSql("CREATE TABLE tmp (\nexecution\nROW<`execution_server` STRING, `execution_insertion` ARRAY<ROW<`platform_id` BIGINT, `user_info` ROW<`user_id` STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>, `timing` ROW<`client_log_timestamp` BIGINT, `event_api_timestamp` BIGINT, `log_timestamp` BIGINT, `processing_timestamp` BIGINT>, `client_info` ROW<`client_type` STRING, `traffic_type` STRING>, `insertion_id` STRING, `request_id` STRING, `view_id` STRING, `auto_view_id` STRING, `session_id` STRING, `content_id` STRING, `position` BIGINT, `properties` ROW<`struct` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>> NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `struct_json` STRING>, `feature_stage` ROW<`features` ROW<`numeric` ARRAY<ROW<`key` INT, `value` FLOAT> NOT NULL>, `categorical` ARRAY<ROW<`key` INT, `value` STRING> NOT NULL>, `sparse` ARRAY<ROW<`key` BIGINT, `value` FLOAT> NOT NULL>, `sparse_id` ARRAY<ROW<`key` BIGINT, `value` BIGINT> NOT NULL>, `embeddings` ARRAY<ROW<`key` BIGINT, `value` ROW<`embeddings` ARRAY<FLOAT>>> NOT NULL>, `feature_references` ARRAY<ROW<`type` STRING, `key` STRING, `version` STRING, `timestamp` BIGINT> NOT NULL>, `sparse_id_list` ARRAY<ROW<`key` BIGINT, `value` ROW<`ids` ARRAY<BIGINT>>> NOT NULL>, `user_events` ROW<`user_events` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>, `user_events_all` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>>, `string_features` ARRAY<ROW<`key` BIGINT, `value` STRING> NOT NULL>>>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY<ROW<`key` STRING, `value` ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT>> NOT NULL>, `model_scores` ARRAY<ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT> NOT NULL>, `models` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY<ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT> NOT NULL>, `backoff_predictors` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>, `models` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>>, `blender_stage` ROW<`score` FLOAT, `steps` ARRAY<ROW<`force_step` ROW<`reason` STRING>, `boost_step` ROW<`fid` BIGINT, `delta` FLOAT>> NOT NULL>, `sort_key` ARRAY<FLOAT>, `experiments` ARRAY<ROW<`experiment_ref` INT, `score` FLOAT> NOT NULL>>, `retrieval_rank` BIGINT, `retrieval_score` FLOAT> NOT NULL>, `latency` ARRAY<ROW<`method` STRING, `start_millis` BIGINT, `duration_millis` INT> NOT NULL>, `execution_stats` ROW<`stages` ARRAY<ROW<`key` INT, `value` ROW<`stats` ARRAY<ROW<`key` INT, `value` BIGINT> NOT NULL>>> NOT NULL>>, `request_feature_stage` ROW<`features` ROW<`numeric` ARRAY<ROW<`key` INT, `value` FLOAT> NOT NULL>, `categorical` ARRAY<ROW<`key` INT, `value` STRING> NOT NULL>, `sparse` ARRAY<ROW<`key` BIGINT, `value` FLOAT> NOT NULL>, `sparse_id` ARRAY<ROW<`key` BIGINT, `value` BIGINT> NOT NULL>, `embeddings` ARRAY<ROW<`key` BIGINT, `value` ROW<`embeddings` ARRAY<FLOAT>>> NOT NULL>, `feature_references` ARRAY<ROW<`type` STRING, `key` STRING, `version` STRING, `timestamp` BIGINT> NOT NULL>, `sparse_id_list` ARRAY<ROW<`key` BIGINT, `value` ROW<`ids` ARRAY<BIGINT>>> NOT NULL>, `user_events` ROW<`user_events` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>, `user_events_all` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>>, `string_features` ARRAY<ROW<`key` BIGINT, `value` STRING> NOT NULL>>>, `user_feature_stage` ROW<`features` ROW<`numeric` ARRAY<ROW<`key` INT, `value` FLOAT> NOT NULL>, `categorical` ARRAY<ROW<`key` INT, `value` STRING> NOT NULL>, `sparse` ARRAY<ROW<`key` BIGINT, `value` FLOAT> NOT NULL>, `sparse_id` ARRAY<ROW<`key` BIGINT, `value` BIGINT> NOT NULL>, `embeddings` ARRAY<ROW<`key` BIGINT, `value` ROW<`embeddings` ARRAY<FLOAT>>> NOT NULL>, `feature_references` ARRAY<ROW<`type` STRING, `key` STRING, `version` STRING, `timestamp` BIGINT> NOT NULL>, `sparse_id_list` ARRAY<ROW<`key` BIGINT, `value` ROW<`ids` ARRAY<BIGINT>>> NOT NULL>, `user_events` ROW<`user_events` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>, `user_events_all` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>>, `string_features` ARRAY<ROW<`key` BIGINT, `value` STRING> NOT NULL>>>, `model_ref` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>, `server_version` STRING, `after_response_stage` ROW<`removed_execution_insertion_count` INT>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY<ROW<`key` STRING, `value` ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT>> NOT NULL>, `model_scores` ARRAY<ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT> NOT NULL>, `models` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY<ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT> NOT NULL>, `backoff_predictors` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>, `models` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>>, `blender_config` STRING, `hyperloop_log` ROW<`parameter_logs` ARRAY<ROW<`key` BIGINT, `value` ROW<`bucket` INT, `value` FLOAT>> NOT NULL>>, `blender_session_log` ROW<`config_statements` ARRAY<STRING>, `ids` ARRAY<STRING>, `variable_logs` ARRAY<ROW<`name` STRING, `values` ARRAY<FLOAT>> NOT NULL>, `allocation_logs` ARRAY<ROW<`indexes` ARRAY<INT>, `name` STRING, `positions_considered` ARRAY<INT>, `positions_filled` ARRAY<INT>> NOT NULL>>, `experiments` ARRAY<ROW<`name` STRING, `cohort_arm` INT> NOT NULL>, `effective_user_info` ROW<`user_id` STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>>);");
        Assertions.assertThatCode(() -> {
            create.executeSql("INSERT INTO tmp VALUES (CAST(NULL AS ROW<`execution_server` STRING, `execution_insertion` ARRAY<ROW<`platform_id` BIGINT, `user_info` ROW<`user_id` STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>, `timing` ROW<`client_log_timestamp` BIGINT, `event_api_timestamp` BIGINT, `log_timestamp` BIGINT, `processing_timestamp` BIGINT>, `client_info` ROW<`client_type` STRING, `traffic_type` STRING>, `insertion_id` STRING, `request_id` STRING, `view_id` STRING, `auto_view_id` STRING, `session_id` STRING, `content_id` STRING, `position` BIGINT, `properties` ROW<`struct` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>> NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `struct_json` STRING>, `feature_stage` ROW<`features` ROW<`numeric` ARRAY<ROW<`key` INT, `value` FLOAT> NOT NULL>, `categorical` ARRAY<ROW<`key` INT, `value` STRING> NOT NULL>, `sparse` ARRAY<ROW<`key` BIGINT, `value` FLOAT> NOT NULL>, `sparse_id` ARRAY<ROW<`key` BIGINT, `value` BIGINT> NOT NULL>, `embeddings` ARRAY<ROW<`key` BIGINT, `value` ROW<`embeddings` ARRAY<FLOAT>>> NOT NULL>, `feature_references` ARRAY<ROW<`type` STRING, `key` STRING, `version` STRING, `timestamp` BIGINT> NOT NULL>, `sparse_id_list` ARRAY<ROW<`key` BIGINT, `value` ROW<`ids` ARRAY<BIGINT>>> NOT NULL>, `user_events` ROW<`user_events` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>, `user_events_all` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>>, `string_features` ARRAY<ROW<`key` BIGINT, `value` STRING> NOT NULL>>>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY<ROW<`key` STRING, `value` ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT>> NOT NULL>, `model_scores` ARRAY<ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT> NOT NULL>, `models` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY<ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT> NOT NULL>, `backoff_predictors` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>, `models` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>>, `blender_stage` ROW<`score` FLOAT, `steps` ARRAY<ROW<`force_step` ROW<`reason` STRING>, `boost_step` ROW<`fid` BIGINT, `delta` FLOAT>> NOT NULL>, `sort_key` ARRAY<FLOAT>, `experiments` ARRAY<ROW<`experiment_ref` INT, `score` FLOAT> NOT NULL>>, `retrieval_rank` BIGINT, `retrieval_score` FLOAT> NOT NULL>, `latency` ARRAY<ROW<`method` STRING, `start_millis` BIGINT, `duration_millis` INT> NOT NULL>, `execution_stats` ROW<`stages` ARRAY<ROW<`key` INT, `value` ROW<`stats` ARRAY<ROW<`key` INT, `value` BIGINT> NOT NULL>>> NOT NULL>>, `request_feature_stage` ROW<`features` ROW<`numeric` ARRAY<ROW<`key` INT, `value` FLOAT> NOT NULL>, `categorical` ARRAY<ROW<`key` INT, `value` STRING> NOT NULL>, `sparse` ARRAY<ROW<`key` BIGINT, `value` FLOAT> NOT NULL>, `sparse_id` ARRAY<ROW<`key` BIGINT, `value` BIGINT> NOT NULL>, `embeddings` ARRAY<ROW<`key` BIGINT, `value` ROW<`embeddings` ARRAY<FLOAT>>> NOT NULL>, `feature_references` ARRAY<ROW<`type` STRING, `key` STRING, `version` STRING, `timestamp` BIGINT> NOT NULL>, `sparse_id_list` ARRAY<ROW<`key` BIGINT, `value` ROW<`ids` ARRAY<BIGINT>>> NOT NULL>, `user_events` ROW<`user_events` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>, `user_events_all` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>>, `string_features` ARRAY<ROW<`key` BIGINT, `value` STRING> NOT NULL>>>, `user_feature_stage` ROW<`features` ROW<`numeric` ARRAY<ROW<`key` INT, `value` FLOAT> NOT NULL>, `categorical` ARRAY<ROW<`key` INT, `value` STRING> NOT NULL>, `sparse` ARRAY<ROW<`key` BIGINT, `value` FLOAT> NOT NULL>, `sparse_id` ARRAY<ROW<`key` BIGINT, `value` BIGINT> NOT NULL>, `embeddings` ARRAY<ROW<`key` BIGINT, `value` ROW<`embeddings` ARRAY<FLOAT>>> NOT NULL>, `feature_references` ARRAY<ROW<`type` STRING, `key` STRING, `version` STRING, `timestamp` BIGINT> NOT NULL>, `sparse_id_list` ARRAY<ROW<`key` BIGINT, `value` ROW<`ids` ARRAY<BIGINT>>> NOT NULL>, `user_events` ROW<`user_events` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>, `user_events_all` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>>, `string_features` ARRAY<ROW<`key` BIGINT, `value` STRING> NOT NULL>>>, `model_ref` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>, `server_version` STRING, `after_response_stage` ROW<`removed_execution_insertion_count` INT>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY<ROW<`key` STRING, `value` ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT>> NOT NULL>, `model_scores` ARRAY<ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT> NOT NULL>, `models` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY<ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT> NOT NULL>, `backoff_predictors` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>, `models` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>>, `blender_config` STRING, `hyperloop_log` ROW<`parameter_logs` ARRAY<ROW<`key` BIGINT, `value` ROW<`bucket` INT, `value` FLOAT>> NOT NULL>>, `blender_session_log` ROW<`config_statements` ARRAY<STRING>, `ids` ARRAY<STRING>, `variable_logs` ARRAY<ROW<`name` STRING, `values` ARRAY<FLOAT>> NOT NULL>, `allocation_logs` ARRAY<ROW<`indexes` ARRAY<INT>, `name` STRING, `positions_considered` ARRAY<INT>, `positions_filled` ARRAY<INT>> NOT NULL>>, `experiments` ARRAY<ROW<`name` STRING, `cohort_arm` INT> NOT NULL>, `effective_user_info` ROW<`user_id` STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>>))");
        }).doesNotThrowAnyException();
    }

    @Test
    public void testInferParallelism() throws Exception {
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), new HashMap<String, String>() { // from class: org.apache.paimon.flink.ReadWriteTableITCase.2
            {
                put(CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST.key(), "1KB");
                put(CoreOptions.SOURCE_SPLIT_TARGET_SIZE.key(), "1KB");
                put(CoreOptions.BUCKET.key(), "2");
            }
        });
        Assertions.assertThat(sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(createTable, "*", "", Collections.singletonMap(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "true")))).isEqualTo(1);
        Assertions.assertThat(sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(createTable, "*", "", new HashMap<String, String>() { // from class: org.apache.paimon.flink.ReadWriteTableITCase.3
            {
                put(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "true");
                put(FlinkConnectorOptions.SCAN_PARALLELISM.key(), "3");
            }
        }))).isEqualTo(3);
        Assertions.assertThatThrownBy(() -> {
            sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(createTable, "*", "", new HashMap<String, String>() { // from class: org.apache.paimon.flink.ReadWriteTableITCase.4
                {
                    put(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "true");
                    put(FlinkConnectorOptions.SCAN_PARALLELISM.key(), "-2");
                }
            }));
        }).hasMessageContaining("The parallelism of an operator must be at least 1");
        ReadWriteTableTestUtil.insertInto(createTable, "('Euro', 119)");
        ReadWriteTableTestUtil.insertInto(createTable, "('US Dollar', 102)");
        Assertions.assertThat(sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(createTable, "*", "", Collections.singletonMap(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "true")))).isEqualTo(2);
        Assertions.assertThat(sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(createTable, "*", "WHERE currency='Euro'", Collections.singletonMap(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "true")))).isEqualTo(1);
        Assertions.assertThat(sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(createTable, "*", "", 1L, Collections.singletonMap(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "true")))).isEqualTo(1);
        Assertions.assertThat(sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(createTable, "*", "", 3L, Collections.singletonMap(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "true")))).isEqualTo(1);
        Assertions.assertThat(sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(createTable, "*", "", new HashMap<String, String>() { // from class: org.apache.paimon.flink.ReadWriteTableITCase.5
            {
                put(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "false");
                put(FlinkConnectorOptions.SCAN_PARALLELISM.key(), "3");
            }
        }))).isEqualTo(3);
        Assertions.assertThat(sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(createTable, "*", "", new HashMap<String, String>() { // from class: org.apache.paimon.flink.ReadWriteTableITCase.6
            {
                put(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "true");
                put(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM.key(), "1");
            }
        }))).isEqualTo(1);
        Assertions.assertThat(sourceParallelismStreaming(ReadWriteTableTestUtil.buildQueryWithTableOptions(createTable, "*", "", new HashMap()))).isEqualTo(2);
    }

    @Test
    public void testSinkParallelism() throws Exception {
        testSinkParallelism(null, ReadWriteTableTestUtil.bExeEnv.getParallelism());
        testSinkParallelism(23, 23);
    }

    @Test
    public void testChangeBucketNumber() throws Exception {
        String str = "MyTable_" + UUID.randomUUID();
        ReadWriteTableTestUtil.bEnv.executeSql(String.format("CREATE TABLE `%s` (\ncurrency STRING,\n rate BIGINT,\n dt STRING\n) PARTITIONED BY (dt)\nWITH (\n 'bucket' = '2'\n)", str));
        ReadWriteTableTestUtil.insertInto(str, "('US Dollar', 102, '2022-06-20')");
        assertChangeBucketWithoutRescale(str, 3);
        assertChangeBucketWithoutRescale(str, 1);
    }

    @Test
    public void testStreamingInsertOverwrite() {
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt String"), Collections.emptyList(), Collections.singletonList("dt"));
        Assertions.assertThatThrownBy(() -> {
            ReadWriteTableTestUtil.sEnv.executeSql(String.format("INSERT OVERWRITE `%s` VALUES('US Dollar', 102, '2022-06-20')", createTable));
        }).isInstanceOf(UnsupportedOperationException.class).hasMessage("Paimon doesn't support streaming INSERT OVERWRITE.");
    }

    @Test
    public void testPhysicalColumnComments() {
        ReadWriteTableTestUtil.bEnv.executeSql("CREATE TABLE T(a INT COMMENT 'comment of a', b INT);");
        Assertions.assertThat((List) CollectionUtil.iteratorToList(ReadWriteTableTestUtil.bEnv.executeSql("DESC T").collect()).stream().map((v0) -> {
            return Objects.toString(v0);
        }).collect(Collectors.toList())).containsExactlyInAnyOrder(new String[]{"+I[a, INT, true, null, null, null, comment of a]", "+I[b, INT, true, null, null, null, null]"});
    }

    @Test
    public void testComputedColumnComments() {
        ReadWriteTableTestUtil.bEnv.executeSql("CREATE TABLE T(a INT , b INT, c AS a + b COMMENT 'computed');");
        Assertions.assertThat((List) CollectionUtil.iteratorToList(ReadWriteTableTestUtil.bEnv.executeSql("DESC T").collect()).stream().map((v0) -> {
            return Objects.toString(v0);
        }).collect(Collectors.toList())).containsExactlyInAnyOrder(new String[]{"+I[a, INT, true, null, null, null, null]", "+I[b, INT, true, null, null, null, null]", "+I[c, INT, true, null, AS `a` + `b`, null, computed]"});
    }

    @Test
    public void testCleanedSchemaOptions() {
        ReadWriteTableTestUtil.bEnv.executeSql("CREATE TABLE T (\nid INT,\nprice INT,\nrecord_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,\ncomp AS price * 2,\norder_time TIMESTAMP(3),\nWATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND,\nPRIMARY KEY (id) NOT ENFORCED\n);");
        TableSchema tableSchema = (TableSchema) new SchemaManager(LocalFileIO.create(), new Path(ReadWriteTableTestUtil.warehouse, "default.db/T")).latest().get();
        HashMap hashMap = new HashMap();
        hashMap.put("schema.2.name", "record_time");
        hashMap.put("schema.2.data-type", "TIMESTAMP(3) WITH LOCAL TIME ZONE");
        hashMap.put("schema.2.metadata", "timestamp");
        hashMap.put("schema.2.virtual", "true");
        hashMap.put("schema.3.name", "comp");
        hashMap.put("schema.3.data-type", "INT");
        hashMap.put("schema.3.expr", "`price` * 2");
        hashMap.put("schema.watermark.0.rowtime", "order_time");
        hashMap.put("schema.watermark.0.strategy.expr", "`order_time` - INTERVAL '5' SECOND");
        hashMap.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
        Assertions.assertThat(tableSchema.options()).containsExactlyInAnyOrderEntriesOf(hashMap);
        validateSchemaOptionResult();
    }

    @Test
    public void testReadFromOldStyleSchemaOptions() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("schema.0.name", "id");
        hashMap.put("schema.0.data-type", "INT NOT NULL");
        hashMap.put("schema.1.name", "price");
        hashMap.put("schema.1.data-type", "INT");
        hashMap.put("schema.2.name", "record_time");
        hashMap.put("schema.2.data-type", "TIMESTAMP(3) WITH LOCAL TIME ZONE");
        hashMap.put("schema.2.metadata", "timestamp");
        hashMap.put("schema.2.virtual", "true");
        hashMap.put("schema.3.name", "comp");
        hashMap.put("schema.3.data-type", "INT");
        hashMap.put("schema.3.expr", "`price` * 2");
        hashMap.put("schema.4.name", "order_time");
        hashMap.put("schema.4.data-type", "TIMESTAMP(3)");
        hashMap.put("schema.watermark.0.rowtime", "order_time");
        hashMap.put("schema.watermark.0.strategy.expr", "`order_time` - INTERVAL '5' SECOND");
        hashMap.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
        hashMap.put("schema.primary-key.name", "constrain_pk");
        hashMap.put("schema.primary-key.columns", "id");
        new SchemaManager(LocalFileIO.create(), new Path(ReadWriteTableTestUtil.warehouse, "default.db/T")).createTable(Schema.newBuilder().column("id", DataTypes.INT().notNull()).column("price", DataTypes.INT()).column("order_time", DataTypes.TIMESTAMP(3)).options(hashMap).primaryKey(new String[]{"id"}).build());
        validateSchemaOptionResult();
    }

    @ValueSource(strings = {"deduplicate", "partial-update"})
    @ParameterizedTest
    public void testUpdateWithPrimaryKey(String str) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.MERGE_ENGINE.key(), str);
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("id BIGINT NOT NULL", "currency STRING", "rate BIGINT", "dt String"), Arrays.asList("id", "dt"), Collections.singletonList("dt"), hashMap);
        ReadWriteTableTestUtil.insertInto(createTable, "(1, 'US Dollar', 114, '2022-01-01')", "(2, 'UNKNOWN', -1, '2022-01-01')", "(3, 'Euro', 114, '2022-01-01')", "(3, 'Euro', 119, '2022-01-02')");
        ReadWriteTableTestUtil.bEnv.executeSql(String.format("UPDATE %s SET currency = 'Yen', rate = 1 WHERE currency = 'UNKNOWN' and dt = '2022-01-01'", createTable)).await();
        ReadWriteTableTestUtil.testBatchRead(String.format("SELECT * FROM %s", createTable), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1L, "US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow(str.equals("deduplicate") ? "+U" : "+I", new Object[]{2L, "Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{3L, "Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{3L, "Euro", 119L, "2022-01-02"})));
    }

    @Test
    public void testDefaultValueWithoutPrimaryKey() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("fields.rate.default-value", "1000");
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("id BIGINT NOT NULL", "currency STRING", "rate BIGINT", "dt String"), Collections.emptyList(), Collections.emptyList(), hashMap);
        ReadWriteTableTestUtil.insertInto(createTable, "(1, 'US Dollar', 114, '2022-01-01')", "(2, 'Yen', cast(null as int), '2022-01-01')", "(3, 'Euro', cast(null as int), '2022-01-01')", "(3, 'Euro', 119, '2022-01-02')");
        ReadWriteTableTestUtil.testBatchRead(String.format("SELECT * FROM %s where rate = 1000", createTable), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{2L, "Yen", 1000L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{3L, "Euro", 1000L, "2022-01-01"})));
    }

    @EnumSource(CoreOptions.MergeEngine.class)
    @ParameterizedTest
    public void testDefaultValueWithPrimaryKey(CoreOptions.MergeEngine mergeEngine) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("fields.rate.default-value", "1000");
        hashMap.put(CoreOptions.MERGE_ENGINE.key(), mergeEngine.toString());
        if (mergeEngine == CoreOptions.MergeEngine.FIRST_ROW) {
            hashMap.put(CoreOptions.CHANGELOG_PRODUCER.key(), CoreOptions.ChangelogProducer.LOOKUP.toString());
        }
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("id BIGINT NOT NULL", "currency STRING", "rate BIGINT", "dt String"), Lists.newArrayList(new String[]{"id", "dt"}), Lists.newArrayList(new String[]{"dt"}), hashMap);
        ReadWriteTableTestUtil.insertInto(createTable, "(1, 'US Dollar', 114, '2022-01-01')", "(2, 'Yen', cast(null as int), '2022-01-01')", "(2, 'Yen', cast(null as int), '2022-01-01')", "(3, 'Euro', cast(null as int) , '2022-01-02')");
        ReadWriteTableTestUtil.testBatchRead(String.format("SELECT * FROM %s where rate = 1000 and currency ='Euro'", createTable), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{3L, "Euro", 1000L, "2022-01-02"})));
    }

    @Test
    public void testUpdateWithoutPrimaryKey() throws Exception {
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("id BIGINT NOT NULL", "currency STRING", "rate BIGINT", "dt String"), Collections.emptyList(), Collections.singletonList("dt"), new HashMap());
        ReadWriteTableTestUtil.insertInto(createTable, "(1, 'US Dollar', 114, '2022-01-01')", "(2, 'UNKNOWN', -1, '2022-01-01')", "(3, 'Euro', 114, '2022-01-01')", "(3, 'Euro', 119, '2022-01-02')");
        String format = String.format("UPDATE %s SET currency = 'Yen', rate = 1 WHERE currency = 'UNKNOWN' and dt = '2022-01-01'", createTable);
        Assertions.assertThatThrownBy(() -> {
            ReadWriteTableTestUtil.bEnv.executeSql(format).await();
        }).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(UnsupportedOperationException.class)});
    }

    @Test
    public void testDeleteWithPrimaryKey() throws Exception {
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("id BIGINT NOT NULL", "currency STRING", "rate BIGINT", "dt String"), Arrays.asList("id", "dt"), Collections.singletonList("dt"), Collections.emptyMap());
        ReadWriteTableTestUtil.insertInto(createTable, "(1, 'US Dollar', 114, '2022-01-01')", "(2, 'UNKNOWN', -1, '2022-01-01')", "(3, 'Euro', 119, '2022-01-02')");
        ReadWriteTableTestUtil.bEnv.executeSql(String.format("DELETE FROM %s WHERE currency = 'UNKNOWN'", createTable)).await();
        ReadWriteTableTestUtil.testBatchRead(String.format("SELECT * FROM %s", createTable), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1L, "US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{3L, "Euro", 119L, "2022-01-02"})));
    }

    @Test
    public void testDeleteWithoutPrimaryKey() throws Exception {
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("id BIGINT NOT NULL", "currency STRING", "rate BIGINT", "dt String"), Collections.emptyList(), Collections.singletonList("dt"), new HashMap());
        ReadWriteTableTestUtil.insertInto(createTable, "(1, 'US Dollar', 114, '2022-01-01')", "(2, 'UNKNOWN', -1, '2022-01-01')", "(3, 'Euro', 119, '2022-01-02')");
        String format = String.format("DELETE FROM %s WHERE currency = 'UNKNOWN'", createTable);
        Assertions.assertThatThrownBy(() -> {
            ReadWriteTableTestUtil.bEnv.executeSql(format).await();
        }).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(UnsupportedOperationException.class)});
    }

    @Test
    public void testDeleteWithPrimaryKeyFilter() throws Exception {
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("id BIGINT NOT NULL", "currency STRING", "rate BIGINT", "dt String"), Arrays.asList("id", "dt"), Collections.singletonList("dt"), Collections.emptyMap());
        ReadWriteTableTestUtil.insertInto(createTable, "(1, 'US Dollar', 114, '2022-01-01')", "(2, 'UNKNOWN', -1, '2022-01-01')", "(3, 'Euro', 119, '2022-01-02')", "(4, 'CNY', 119, '2022-01-02')", "(5, 'HKD', 119, '2022-01-03')", "(6, 'CAD', 119, '2022-01-03')", "(7, 'INR', 119, '2022-01-03')", "(8, 'MOP', 119, '2022-01-03')");
        String format = String.format("DELETE FROM %s WHERE id = 2 and dt = '2022-01-01'", createTable);
        List asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1L, "US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{3L, "Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{4L, "CNY", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{5L, "HKD", 119L, "2022-01-03"}), TestValuesTableFactory.changelogRow("+I", new Object[]{6L, "CAD", 119L, "2022-01-03"}), TestValuesTableFactory.changelogRow("+I", new Object[]{7L, "INR", 119L, "2022-01-03"}), TestValuesTableFactory.changelogRow("+I", new Object[]{8L, "MOP", 119L, "2022-01-03"}));
        ReadWriteTableTestUtil.bEnv.executeSql(format).await();
        ReadWriteTableTestUtil.testBatchRead(String.format("SELECT * FROM %s", createTable), asList);
        ReadWriteTableTestUtil.bEnv.executeSql(String.format("DELETE FROM %s", createTable)).await();
        ReadWriteTableTestUtil.testBatchRead(String.format("SELECT * FROM %s", createTable), Collections.emptyList());
        ReadWriteTableTestUtil.bEnv.executeSql(String.format("DELETE FROM %s WHERE dt = '2022-01-03'", createTable)).await();
        ReadWriteTableTestUtil.testBatchRead(String.format("SELECT * FROM %s", createTable), Collections.emptyList());
    }

    @Test
    public void testDeletePushDownWithPartitionKey() throws Exception {
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("id BIGINT NOT NULL", "currency STRING", "rate BIGINT", "dt String", "hh String"), Arrays.asList("id", "dt", "hh"), Arrays.asList("dt", "hh"), Collections.emptyMap());
        ReadWriteTableTestUtil.insertInto(createTable, "(1, 'US Dollar', 114, '2022-01-01', '11')", "(2, 'UNKNOWN', -1, '2022-01-01', '12')", "(3, 'Euro', 119, '2022-01-02', '13')", "(4, 'CNY', 119, '2022-01-03', '14')", "(5, 'HKD', 119, '2022-01-03', '15')", "(6, 'CAD', 119, '2022-01-03', '16')", "(7, 'INR', 119, '2022-01-03', '17')", "(8, 'MOP', 119, '2022-01-03', '18')");
        String format = String.format("DELETE FROM %s WHERE dt = '2022-01-03' AND currency = 'CNY'", createTable);
        List asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1L, "US Dollar", 114L, "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2L, "UNKNOWN", -1L, "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{3L, "Euro", 119L, "2022-01-02", "13"}), TestValuesTableFactory.changelogRow("+I", new Object[]{5L, "HKD", 119L, "2022-01-03", "15"}), TestValuesTableFactory.changelogRow("+I", new Object[]{6L, "CAD", 119L, "2022-01-03", "16"}), TestValuesTableFactory.changelogRow("+I", new Object[]{7L, "INR", 119L, "2022-01-03", "17"}), TestValuesTableFactory.changelogRow("+I", new Object[]{8L, "MOP", 119L, "2022-01-03", "18"}));
        ReadWriteTableTestUtil.bEnv.executeSql(format).await();
        ReadWriteTableTestUtil.testBatchRead(String.format("SELECT * FROM %s", createTable), asList);
        String format2 = String.format("DELETE FROM %s WHERE dt = '2022-01-02' or hh = '15'", createTable);
        List asList2 = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1L, "US Dollar", 114L, "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2L, "UNKNOWN", -1L, "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{6L, "CAD", 119L, "2022-01-03", "16"}), TestValuesTableFactory.changelogRow("+I", new Object[]{7L, "INR", 119L, "2022-01-03", "17"}), TestValuesTableFactory.changelogRow("+I", new Object[]{8L, "MOP", 119L, "2022-01-03", "18"}));
        ReadWriteTableTestUtil.bEnv.executeSql(format2).await();
        ReadWriteTableTestUtil.testBatchRead(String.format("SELECT * FROM %s", createTable), asList2);
        String format3 = String.format("DELETE FROM %s WHERE dt = '2022-01-03' and hh = '16'", createTable);
        List asList3 = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1L, "US Dollar", 114L, "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2L, "UNKNOWN", -1L, "2022-01-01", "12"}), TestValuesTableFactory.changelogRow("+I", new Object[]{7L, "INR", 119L, "2022-01-03", "17"}), TestValuesTableFactory.changelogRow("+I", new Object[]{8L, "MOP", 119L, "2022-01-03", "18"}));
        ReadWriteTableTestUtil.bEnv.executeSql(format3).await();
        ReadWriteTableTestUtil.testBatchRead(String.format("SELECT * FROM %s", createTable), asList3);
        String format4 = String.format("DELETE FROM %s WHERE dt = '2022-01-03'", createTable);
        List asList4 = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1L, "US Dollar", 114L, "2022-01-01", "11"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2L, "UNKNOWN", -1L, "2022-01-01", "12"}));
        ReadWriteTableTestUtil.bEnv.executeSql(format4).await();
        ReadWriteTableTestUtil.testBatchRead(String.format("SELECT * FROM %s", createTable), asList4);
    }

    private void validatePurgingResult(String str, String str2, String str3, List<Row> list) throws Exception {
        ReadWriteTableTestUtil.insertInto(str, "(0, '2023-01-01', 'hi')", "(0, '2023-01-01', 'hello')", "(0, '2023-01-02', 'world')", "(1, '2023-01-01', 'flink')", "(1, '2023-01-02', 'table')", "(1, '2023-01-02', 'store')");
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(str), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{0, "2023-01-01", "hi"}), TestValuesTableFactory.changelogRow("+I", new Object[]{0, "2023-01-01", "hello"}), TestValuesTableFactory.changelogRow("+I", new Object[]{0, "2023-01-02", "world"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-01", "flink"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-02", "table"}), TestValuesTableFactory.changelogRow("+I", new Object[]{1, "2023-01-02", "store"})));
        ReadWriteTableTestUtil.bEnv.executeSql(String.format("INSERT OVERWRITE `%s` %s SELECT %s FROM `%s` WHERE false", str, str2, str3, str)).await();
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(str), list);
    }

    private int sourceParallelism(String str) {
        return ReadWriteTableTestUtil.bEnv.toChangelogStream(ReadWriteTableTestUtil.bEnv.sqlQuery(str)).getParallelism();
    }

    private int sourceParallelismStreaming(String str) {
        return ReadWriteTableTestUtil.sEnv.toChangelogStream(ReadWriteTableTestUtil.sEnv.sqlQuery(str)).getParallelism();
    }

    private void testSinkParallelism(Integer num, int i) throws Exception {
        HashMap hashMap = new HashMap();
        if (num != null) {
            hashMap.put(FlinkConnectorOptions.SINK_PARALLELISM.key(), num.toString());
        }
        hashMap.put("path", getTempFilePath(UUID.randomUUID().toString()));
        hashMap.put("bucket", "1");
        FactoryUtil.DefaultDynamicTableContext defaultDynamicTableContext = new FactoryUtil.DefaultDynamicTableContext(ObjectIdentifier.of(FlinkTestBase.CURRENT_DATABASE, FlinkTestBase.CURRENT_DATABASE, "t1"), FlinkTestBase.createResolvedTable(hashMap, RowType.of(new LogicalType[]{new VarCharType(Integer.MAX_VALUE)}, new String[]{"a"}), (List<String>) Collections.emptyList(), (List<String>) Collections.emptyList()), Collections.emptyMap(), new Configuration(), Thread.currentThread().getContextClassLoader(), false);
        Path path = CoreOptions.path(defaultDynamicTableContext.getCatalogTable().getOptions());
        LocalFileIO.create().mkdirs(path);
        new SchemaManager(LocalFileIO.create(), path).createTable(FlinkCatalog.fromCatalogTable(defaultDynamicTableContext.getCatalogTable()));
        FlinkTableSink flinkTableSink = new FlinkTableSink(defaultDynamicTableContext.getObjectIdentifier(), AbstractFlinkTableFactory.buildPaimonTable(defaultDynamicTableContext), defaultDynamicTableContext, (LogStoreTableFactory) null);
        Assertions.assertThat(flinkTableSink).isInstanceOf(FlinkTableSink.class);
        DataStreamSinkProvider sinkRuntimeProvider = flinkTableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
        Assertions.assertThat(sinkRuntimeProvider).isInstanceOf(DataStreamSinkProvider.class);
        Transformation transformation = sinkRuntimeProvider.consumeDataStream((ProviderContext) null, ReadWriteTableTestUtil.bExeEnv.fromCollection(Collections.singletonList(GenericRowData.of(new Object[0])))).getTransformation();
        while (true) {
            Transformation transformation2 = transformation;
            if (transformation2 instanceof PartitionTransformation) {
                return;
            }
            Assertions.assertThat(transformation2.getParallelism()).isIn(new Object[]{1, Integer.valueOf(i)});
            transformation = (Transformation) transformation2.getInputs().get(0);
        }
    }

    private void assertChangeBucketWithoutRescale(String str, int i) throws Exception {
        ReadWriteTableTestUtil.bEnv.executeSql(String.format("ALTER TABLE `%s` SET ('bucket' = '%d')", str, Integer.valueOf(i)));
        Assertions.assertThat(BlockingIterator.of(ReadWriteTableTestUtil.bEnv.executeSql(ReadWriteTableTestUtil.buildSimpleQuery(str)).collect()).collect()).containsExactlyInAnyOrder(new Row[]{TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-06-20"})});
        Assertions.assertThatThrownBy(() -> {
            ReadWriteTableTestUtil.insertInto(str, "('US Dollar', 102, '2022-06-20')");
        }).rootCause().isInstanceOf(RuntimeException.class).hasMessage(String.format("Try to write partition {dt=2022-06-20} with a new bucket num %d, but the previous bucket num is 2. Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.", Integer.valueOf(i)));
    }

    private void validateSchemaOptionResult() {
        Assertions.assertThat((List) CollectionUtil.iteratorToList(ReadWriteTableTestUtil.bEnv.executeSql("DESC T").collect()).stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toList())).isEqualTo(Arrays.asList("+I[id, INT, false, PRI(id), null, null]", "+I[price, INT, true, null, null, null]", "+I[record_time, TIMESTAMP_LTZ(3), true, null, METADATA FROM 'timestamp' VIRTUAL, null]", "+I[comp, INT, true, null, AS `price` * 2, null]", "+I[order_time, TIMESTAMP(3), true, null, null, `order_time` - INTERVAL '5' SECOND]"));
        Assertions.assertThat(((Row) CollectionUtil.iteratorToList(ReadWriteTableTestUtil.bEnv.executeSql("SHOW CREATE TABLE T").collect()).get(0)).toString().contains("schema.")).isFalse();
    }
}
