package org.apache.flink.table.store.connector;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/store/connector/ContinuousFileStoreITCase.class */
public class ContinuousFileStoreITCase extends FileStoreTableITCase {
    @Override // org.apache.flink.table.store.connector.FileStoreTableITCase
    protected List<String> ddl() {
        return Arrays.asList("CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)", "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)");
    }

    @Test
    public void testWithoutPrimaryKey() throws Exception {
        testSimple("T1");
    }

    @Test
    public void testWithPrimaryKey() throws Exception {
        testSimple("T2");
    }

    @Test
    public void testProjectionWithoutPrimaryKey() throws Exception {
        testProjection("T1");
    }

    @Test
    public void testProjectionWithPrimaryKey() throws Exception {
        testProjection("T2");
    }

    private void testSimple(String str) throws ExecutionException, InterruptedException, TimeoutException {
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT * FROM " + str).collect());
        this.bEnv.executeSql(String.format("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", str)).await();
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        this.bEnv.executeSql(String.format("INSERT INTO %s VALUES ('7', '8', '9')", str)).await();
        Assertions.assertThat(of.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"7", "8", "9"})});
    }

    private void testProjection(String str) throws ExecutionException, InterruptedException, TimeoutException {
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT b, c FROM " + str).collect());
        this.bEnv.executeSql(String.format("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", str)).await();
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"2", "3"}), Row.of(new Object[]{"5", "6"})});
        this.bEnv.executeSql(String.format("INSERT INTO %s VALUES ('7', '8', '9')", str)).await();
        Assertions.assertThat(of.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"8", "9"})});
    }

    @Test
    public void testContinuousLatest() throws ExecutionException, InterruptedException, TimeoutException {
        this.bEnv.executeSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')").await();
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT * FROM T1 /*+ OPTIONS('log.scan'='latest') */").collect());
        this.bEnv.executeSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')").await();
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"7", "8", "9"}), Row.of(new Object[]{"10", "11", "12"})});
    }

    @Test
    public void testIgnoreOverwrite() throws ExecutionException, InterruptedException, TimeoutException {
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT * FROM T1").collect());
        this.bEnv.executeSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')").await();
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        this.bEnv.executeSql("INSERT OVERWRITE T1 VALUES ('7', '8', '9')").await();
        this.bEnv.executeSql("INSERT INTO T1 VALUES ('9', '10', '11')").await();
        Assertions.assertThat(of.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"9", "10", "11"})});
    }

    @Test
    public void testUnsupportedUpsert() {
        Assertions.assertThatThrownBy(() -> {
            this.sEnv.executeSql("SELECT * FROM T1 /*+ OPTIONS('log.changelog-mode'='upsert') */").collect();
        }, "File store continuous reading dose not support upsert changelog mode", new Object[0]);
    }

    @Test
    public void testUnsupportedEventual() {
        Assertions.assertThatThrownBy(() -> {
            this.sEnv.executeSql("SELECT * FROM T1 /*+ OPTIONS('log.consistency'='eventual') */").collect();
        }, "File store continuous reading dose not support eventual consistency mode", new Object[0]);
    }

    @Test
    public void testUnsupportedStartupTimestamp() {
        Assertions.assertThatThrownBy(() -> {
            this.sEnv.executeSql("SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp') */").collect();
        }, "File store continuous reading dose not support from_timestamp scan mode, you can add timestamp filters instead.", new Object[0]);
    }
}
