package org.apache.paimon.flink;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.apache.flink.types.Row;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/TimeTravelITCase.class */
public class TimeTravelITCase extends CatalogITCaseBase {
    @Test
    public void testTravelToTimestampString() throws Exception {
        sql("CREATE TABLE t (k INT, v STRING)", new Object[0]);
        sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')", new Object[0]);
        Thread.sleep(3000L);
        String now = now();
        sql("INSERT INTO t VALUES(1, 'flink'), (2, 'paimon')", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM t", new Object[0]).toString()).isEqualTo("[+I[1, hello], +I[2, world], +I[1, flink], +I[2, paimon]]");
        Assertions.assertThat(sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '%s'", now).toString()).isEqualTo("[+I[1, hello], +I[2, world]]");
    }

    @Test
    public void testExpression() throws Exception {
        sql("CREATE TABLE t (k INT, v STRING)", new Object[0]);
        sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')", new Object[0]);
        String now = now();
        Thread.sleep(3000L);
        sql("INSERT INTO t VALUES(1, 'flink'), (2, 'paimon')", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM t", new Object[0]).toString()).isEqualTo("[+I[1, hello], +I[2, world], +I[1, flink], +I[2, paimon]]");
        Assertions.assertThat(sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '%s' + INTERVAL '1' SECOND", now).toString()).isEqualTo("[+I[1, hello], +I[2, world]]");
    }

    @Test
    public void testTravelToOldSchema() throws Exception {
        sql("CREATE TABLE t (k INT, v STRING)", new Object[0]);
        sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')", new Object[0]);
        Thread.sleep(3000L);
        String now = now();
        sql("ALTER TABLE t ADD dt STRING", new Object[0]);
        sql("INSERT INTO t VALUES(1, 'flink', '2020-01-01'), (2, 'paimon', '2020-01-02')", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM t", new Object[0]).toString()).isEqualTo("[+I[1, hello, null], +I[2, world, null], +I[1, flink, 2020-01-01], +I[2, paimon, 2020-01-02]]");
        Assertions.assertThat(sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '%s'", now).toString()).isEqualTo("[+I[1, hello], +I[2, world]]");
    }

    @Test
    public void testTravelToNonExistedTimestamp() {
        sql("CREATE TABLE t (k INT, v STRING)", new Object[0]);
        sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '1900-01-01 00:00:00'", new Object[0])).isEmpty();
    }

    @Test
    public void testSystemTableTimeTravel() throws Exception {
        sql("CREATE TABLE t (k INT, v STRING)", new Object[0]);
        sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')", new Object[0]);
        Thread.sleep(3000L);
        String now = now();
        sql("INSERT INTO t VALUES(1, 'flink'), (2, 'paimon')", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM t$files", new Object[0]).size()).isEqualTo(2);
        Assertions.assertThat(sql("SELECT * FROM t$files FOR SYSTEM_TIME AS OF TIMESTAMP '%s'", now).size()).isEqualTo(1);
    }

    @Test
    public void testStreamingTravel() throws Exception {
        sql("CREATE TABLE t (k INT PRIMARY KEY NOT ENFORCED, v STRING)", new Object[0]);
        BlockingIterator<Row, Row> streamSqlBlockIter = streamSqlBlockIter("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '%s'", now());
        sql("INSERT INTO t VALUES(1, 'hello')", new Object[0]);
        sql("INSERT INTO t VALUES(1, 'apache')", new Object[0]);
        Assertions.assertThat(streamSqlBlockIter.collect(3).toString()).isEqualTo("[+I[1, hello], -U[1, hello], +U[1, apache]]");
        streamSqlBlockIter.close();
    }

    private String now() {
        return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    }
}
