package org.apache.flink.table.planner.runtime.stream.sql;

import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/WatermarkITCase.class */
class WatermarkITCase extends StreamingTestBase {
    WatermarkITCase() {
    }

    @Test
    void testWatermarkNotMovingBack() {
        tEnv().executeSql(String.format("CREATE Table VirtualTable (\n  a INT,\n  c TIMESTAMP(3),\n  WATERMARK FOR c as c\n) with (\n  'connector' = 'values',\n  'bounded' = 'false',\n  'scan.watermark.emit.strategy' = 'on-periodic',\n  'enable-watermark-push-down' = 'true',\n  'disable-lookup' = 'true',\n  'data-id' = '%s'\n)\n", TestValuesTableFactory.registerData(Arrays.asList(Row.of(new Object[]{1, LocalDateTime.parse("2024-01-01T00:00:00")}), Row.of(new Object[]{3, LocalDateTime.parse("2024-01-03T00:00:00")}), Row.of(new Object[]{2, LocalDateTime.parse("2024-01-02T00:00:00")})))));
        tEnv().getConfig().set(CoreOptions.DEFAULT_PARALLELISM, 1);
        List iteratorToList = CollectionUtil.iteratorToList(tEnv().executeSql("SELECT a, c, current_watermark(c) FROM VirtualTable order by c").collect());
        Assertions.assertThat((List) TestValuesTableFactory.getWatermarkOutput("VirtualTable").stream().map(watermark -> {
            return TimestampData.fromEpochMillis(watermark.getTimestamp()).toLocalDateTime().toString();
        }).collect(Collectors.toList())).containsExactly(new String[]{"2024-01-01T00:00", "2024-01-03T00:00", "2024-01-03T00:00"});
        Assertions.assertThat(iteratorToList).containsExactly(new Row[]{Row.of(new Object[]{1, LocalDateTime.parse("2024-01-01T00:00"), LocalDateTime.parse("2024-01-01T00:00")}), Row.of(new Object[]{2, LocalDateTime.parse("2024-01-02T00:00"), LocalDateTime.parse("2024-01-03T00:00")}), Row.of(new Object[]{3, LocalDateTime.parse("2024-01-03T00:00"), LocalDateTime.parse("2024-01-03T00:00")})});
    }
}
