package org.apache.paimon.flink.kafka;

import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.paimon.flink.util.ReadWriteTableTestUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/kafka/ComputedColumnAndWatermarkTableITCase.class */
public class ComputedColumnAndWatermarkTableITCase extends KafkaTableTestBase {
    @BeforeEach
    public void setUp() {
        ReadWriteTableTestUtil.init(createAndRegisterTempFile("").toString());
    }

    @Test
    public void testBatchSelectComputedColumn() throws Exception {
        List asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L}));
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), asList, (String) null, true, "I");
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "capital_currency AS UPPER(currency)"), Collections.emptyList(), Collections.emptyList());
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable, createTable);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable, "capital_currency", ""), (List) asList.stream().map(row -> {
            return TestValuesTableFactory.changelogRow(row.getKind().shortString(), new Object[]{((String) row.getField(0)).toUpperCase()});
        }).collect(Collectors.toList()));
        String createTable2 = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "capital_currency AS LOWER(currency)"), Collections.singletonList("currency"), Collections.emptyList());
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable, createTable2);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable2, "capital_currency", ""), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"us dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"yen"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"euro"})));
        String createTemporaryTable2 = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Arrays.asList("currency", "dt", "hh"), Arrays.asList("dt", "hh"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01", "00"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01", "00"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01", "00"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01", "00"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 114L, "2022-01-01", "00"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 114L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02", "12"})), "dt:2022-01-01,hh:00;dt:2022-01-01,hh:20;dt:2022-01-02,hh:12", true, "I");
        String createTable3 = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING", "dth AS dt || ' ' || hh"), Arrays.asList("currency", "dt", "hh"), Arrays.asList("dt", "hh"));
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable2, createTable3);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable3, "dth", "WHERE dth = '2022-01-02 12'"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"2022-01-02 12"})));
        String createTable4 = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING", "ptime AS PROCTIME()"), Collections.singletonList("currency"), Collections.emptyList());
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable2, createTable4);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(createTable4, "CHAR_LENGTH(DATE_FORMAT(ptime, 'yyyy-MM-dd HH:mm'))", "WHERE currency = 'US Dollar'"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{16})));
    }

    @Test
    public void testBatchSelectWithWatermark() throws Exception {
        List asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, LocalDateTime.parse("1990-04-07T10:00:11.120")}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, LocalDateTime.parse("2020-04-07T10:10:11.120")}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, LocalDateTime.parse("2022-04-07T09:54:11.120")}));
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "ts TIMESTAMP(3)"), Collections.emptyList(), Collections.emptyList(), asList, (String) null, true, "I");
        String createTable = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "ts TIMESTAMP(3)", "WATERMARK FOR ts AS ts - INTERVAL '3' YEAR"), Collections.emptyList(), Collections.emptyList());
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable, createTable);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(createTable), asList);
    }

    @Test
    public void testStreamingSelectWithWatermark() throws Exception {
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "ts TIMESTAMP(3)"), Collections.emptyList(), Collections.emptyList(), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, LocalDateTime.parse("1990-04-07T10:00:11.120")}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, LocalDateTime.parse("2020-04-07T10:10:11.120")}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, LocalDateTime.parse("2022-04-07T09:54:11.120")})), (String) null, true, "I");
        String createTableWithKafkaLog = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "ts TIMESTAMP(3)", "WATERMARK FOR ts AS ts - INTERVAL '3' YEAR"), Collections.emptyList(), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog, "*", "WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts)", ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, LocalDateTime.parse("1990-04-07T10:00:11.120")}))).close();
        String createTableWithKafkaLog2 = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "ts TIMESTAMP(3)", "ts1 AS ts", "WATERMARK FOR ts1 AS ts1 - INTERVAL '3' YEAR"), Collections.emptyList(), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog2, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog2, "currency, rate, ts1", "WHERE CURRENT_WATERMARK(ts1) IS NULL OR ts1 > CURRENT_WATERMARK(ts1)", ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, LocalDateTime.parse("1990-04-07T10:00:11.120")}))).close();
        String createTableWithKafkaLog3 = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "ts TIMESTAMP(3)", "ptime AS PROCTIME()", "WATERMARK FOR ts AS ts - INTERVAL '3' YEAR"), Collections.emptyList(), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog3, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog3, "currency, rate, ts, CHAR_LENGTH(DATE_FORMAT(ptime, 'yyyy-MM-dd HH:mm'))", "WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts)", ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, LocalDateTime.parse("1990-04-07T10:00:11.120"), 16}))).close();
    }
}
