package org.apache.flink.table.store.connector;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/table/store/connector/ContinuousFileStoreITCase.class */
public class ContinuousFileStoreITCase extends FileStoreTableITCase {
    private final boolean changelogFile;

    public ContinuousFileStoreITCase(boolean z) {
        this.changelogFile = z;
    }

    @Parameterized.Parameters(name = "changelogFile-{0}")
    public static Collection<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    @Override // org.apache.flink.table.store.connector.FileStoreTableITCase
    protected List<String> ddl() {
        String str = this.changelogFile ? " WITH('changelog-producer'='input')" : "";
        return Arrays.asList("CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)" + str, "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)" + str);
    }

    @Test
    public void testWithoutPrimaryKey() throws Exception {
        testSimple("T1");
    }

    @Test
    public void testWithPrimaryKey() throws Exception {
        testSimple("T2");
    }

    @Test
    public void testProjectionWithoutPrimaryKey() throws Exception {
        testProjection("T1");
    }

    @Test
    public void testProjectionWithPrimaryKey() throws Exception {
        testProjection("T2");
    }

    private void testSimple(String str) throws TimeoutException {
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM %s", str));
        batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", str);
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        batchSql("INSERT INTO %s VALUES ('7', '8', '9')", str);
        Assertions.assertThat(of.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"7", "8", "9"})});
    }

    private void testProjection(String str) throws TimeoutException {
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT b, c FROM %s", str));
        batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", str);
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"2", "3"}), Row.of(new Object[]{"5", "6"})});
        batchSql("INSERT INTO %s VALUES ('7', '8', '9')", str);
        Assertions.assertThat(of.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"8", "9"})});
    }

    @Test
    public void testContinuousLatest() throws TimeoutException {
        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')", new Object[0]);
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='latest') */", new Object[0]));
        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')", new Object[0]);
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"7", "8", "9"}), Row.of(new Object[]{"10", "11", "12"})});
    }

    @Test
    public void testIgnoreOverwrite() throws TimeoutException {
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM T1", new Object[0]));
        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')", new Object[0]);
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        batchSql("INSERT OVERWRITE T1 VALUES ('7', '8', '9')", new Object[0]);
        batchSql("INSERT INTO T1 VALUES ('9', '10', '11')", new Object[0]);
        Assertions.assertThat(of.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"9", "10", "11"})});
    }

    @Test
    public void testUnsupportedUpsert() {
        Assertions.assertThatThrownBy(() -> {
            streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.changelog-mode'='upsert') */", new Object[0]);
        }, "File store continuous reading dose not support upsert changelog mode", new Object[0]);
    }

    @Test
    public void testUnsupportedEventual() {
        Assertions.assertThatThrownBy(() -> {
            streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.consistency'='eventual') */", new Object[0]);
        }, "File store continuous reading dose not support eventual consistency mode", new Object[0]);
    }

    @Test
    public void testUnsupportedStartupTimestamp() {
        Assertions.assertThatThrownBy(() -> {
            streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp') */", new Object[0]);
        }, "File store continuous reading dose not support from_timestamp scan mode, you can add timestamp filters instead.", new Object[0]);
    }
}
