package org.apache.flink.table.store.connector;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.flink.core.fs.Path;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/table/store/connector/ContinuousFileStoreITCase.class */
public class ContinuousFileStoreITCase extends FileStoreTableITCase {
    private final boolean changelogFile;

    public ContinuousFileStoreITCase(boolean z) {
        this.changelogFile = z;
    }

    @Parameterized.Parameters(name = "changelogFile-{0}")
    public static Collection<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    @Override // org.apache.flink.table.store.connector.FileStoreTableITCase
    protected List<String> ddl() {
        String str = this.changelogFile ? " WITH('changelog-producer'='input')" : "";
        return Arrays.asList("CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)" + str, "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)" + str);
    }

    @Test
    public void testWithoutPrimaryKey() throws Exception {
        testSimple("T1");
    }

    @Test
    public void testWithPrimaryKey() throws Exception {
        testSimple("T2");
    }

    @Test
    public void testProjectionWithoutPrimaryKey() throws Exception {
        testProjection("T1");
    }

    @Test
    public void testProjectionWithPrimaryKey() throws Exception {
        testProjection("T2");
    }

    private void testSimple(String str) throws TimeoutException {
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM %s", str));
        batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", str);
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        batchSql("INSERT INTO %s VALUES ('7', '8', '9')", str);
        Assertions.assertThat(of.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"7", "8", "9"})});
    }

    private void testProjection(String str) throws TimeoutException {
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT b, c FROM %s", str));
        batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", str);
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"2", "3"}), Row.of(new Object[]{"5", "6"})});
        batchSql("INSERT INTO %s VALUES ('7', '8', '9')", str);
        Assertions.assertThat(of.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"8", "9"})});
    }

    @Test
    public void testContinuousLatest() throws TimeoutException {
        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')", new Object[0]);
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='latest') */", new Object[0]));
        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')", new Object[0]);
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"7", "8", "9"}), Row.of(new Object[]{"10", "11", "12"})});
    }

    @Test
    public void testContinuousFromTimestamp() throws Exception {
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 'log.scan.timestamp-millis'='%s') */", 0));
        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')", new Object[0]);
        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')", new Object[0]);
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        of.close();
        SnapshotManager snapshotManager = new SnapshotManager(new Path(this.path, "default_catalog.catalog/default_database.db/T1"));
        ArrayList arrayList = new ArrayList((Collection) ImmutableList.copyOf(snapshotManager.snapshots()));
        arrayList.sort(Comparator.comparingLong((v0) -> {
            return v0.timeMillis();
        }));
        Snapshot snapshot = (Snapshot) arrayList.get(0);
        Snapshot snapshot2 = (Snapshot) arrayList.get(1);
        BlockingIterator of2 = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 'log.scan.timestamp-millis'='%s') */", Long.valueOf(snapshot2.timeMillis() - 1)));
        batchSql("INSERT INTO T1 VALUES ('13', '14', '15')", new Object[0]);
        Assertions.assertThat(of2.collect(3)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"7", "8", "9"}), Row.of(new Object[]{"10", "11", "12"}), Row.of(new Object[]{"13", "14", "15"})});
        of2.close();
        BlockingIterator of3 = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 'log.scan.timestamp-millis'='%s') */", Long.valueOf(snapshot2.timeMillis())));
        Assertions.assertThat(of3.collect(3)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"7", "8", "9"}), Row.of(new Object[]{"10", "11", "12"}), Row.of(new Object[]{"13", "14", "15"})});
        of3.close();
        BlockingIterator of4 = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 'log.scan.timestamp-millis'='%s') */", Long.valueOf(snapshot.timeMillis() - 1)));
        Assertions.assertThat(of4.collect(5)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"}), Row.of(new Object[]{"7", "8", "9"}), Row.of(new Object[]{"10", "11", "12"}), Row.of(new Object[]{"13", "14", "15"})});
        of4.close();
        BlockingIterator of5 = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 'log.scan.timestamp-millis'='%s') */", Long.valueOf(snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue()).timeMillis() + 1)));
        batchSql("INSERT INTO T1 VALUES ('16', '17', '18')", new Object[0]);
        Assertions.assertThat(of5.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"16", "17", "18"})});
        of5.close();
    }

    @Test
    public void testLackStartupTimestamp() {
        Assertions.assertThatThrownBy(() -> {
            streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp') */", new Object[0]);
        }).hasMessageContaining("Unable to create a source for reading table");
    }

    @Test
    public void testConfigureStartupTimestamp() throws Exception {
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan.timestamp-millis'='%s') */", 0));
        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')", new Object[0]);
        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')", new Object[0]);
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        of.close();
        Assertions.assertThatThrownBy(() -> {
            streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='latest', 'log.scan.timestamp-millis'='%s') */", 0);
        }).hasMessageContaining("Unable to create a source for reading table");
    }

    @Test
    public void testConfigureStartupSnapshot() throws Exception {
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 1));
        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')", new Object[0]);
        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')", new Object[0]);
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        of.close();
        BlockingIterator of2 = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 0));
        Assertions.assertThat(of2.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        of2.close();
        Assertions.assertThatThrownBy(() -> {
            streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.mode'='latest', 'scan.snapshot-id'='%s') */", 0);
        }).hasMessageContaining("Unable to create a source for reading table");
    }

    @Test
    public void testIgnoreOverwrite() throws TimeoutException {
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM T1", new Object[0]));
        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')", new Object[0]);
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        batchSql("INSERT OVERWRITE T1 VALUES ('7', '8', '9')", new Object[0]);
        batchSql("INSERT INTO T1 VALUES ('9', '10', '11')", new Object[0]);
        Assertions.assertThat(of.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"9", "10", "11"})});
    }

    @Test
    public void testUnsupportedUpsert() {
        Assertions.assertThatThrownBy(() -> {
            streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.changelog-mode'='upsert') */", new Object[0]);
        }, "File store continuous reading dose not support upsert changelog mode", new Object[0]);
    }

    @Test
    public void testUnsupportedEventual() {
        Assertions.assertThatThrownBy(() -> {
            streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.consistency'='eventual') */", new Object[0]);
        }, "File store continuous reading dose not support eventual consistency mode", new Object[0]);
    }
}
