package org.apache.paimon.flink;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/paimon/flink/ContinuousFileStoreITCase.class */
public class ContinuousFileStoreITCase extends CatalogITCaseBase {
    private final boolean changelogFile;

    public ContinuousFileStoreITCase(boolean z) {
        this.changelogFile = z;
    }

    @Parameters(name = "changelogFile-{0}")
    public static Collection<Boolean> parameters() {
        return Arrays.asList(true);
    }

    @Override // org.apache.paimon.flink.CatalogITCaseBase
    protected List<String> ddl() {
        String str = this.changelogFile ? " WITH('write-mode'='change-log','changelog-producer'='input')" : "";
        return Arrays.asList("CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)" + str, "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)" + str);
    }

    @TestTemplate
    public void testSourceReuse() {
        this.sEnv.executeSql("CREATE TEMPORARY TABLE print1 (a STRING) WITH ('connector'='print')");
        this.sEnv.executeSql("CREATE TEMPORARY TABLE print2 (b STRING) WITH ('connector'='print')");
        StatementSet createStatementSet = this.sEnv.createStatementSet();
        createStatementSet.addInsertSql("INSERT INTO print1 SELECT a FROM T1 /*+ OPTIONS('scan.push-down' = 'false') */");
        createStatementSet.addInsertSql("INSERT INTO print2 SELECT b FROM T1 /*+ OPTIONS('scan.push-down' = 'false') */");
        Assertions.assertThat(createStatementSet.compilePlan().explain(new ExplainDetail[0])).contains(new CharSequence[]{"Reused"});
        StatementSet createStatementSet2 = this.sEnv.createStatementSet();
        createStatementSet2.addInsertSql("INSERT INTO print1 SELECT a FROM T1");
        createStatementSet2.addInsertSql("INSERT INTO print2 SELECT b FROM T1");
        Assertions.assertThat(createStatementSet2.compilePlan().explain(new ExplainDetail[0])).doesNotContain(new CharSequence[]{"Reused"});
    }

    @TestTemplate
    public void testWithoutPrimaryKey() throws Exception {
        testSimple("T1");
    }

    @TestTemplate
    public void testWithPrimaryKey() throws Exception {
        testSimple("T2");
    }

    @TestTemplate
    public void testProjectionWithoutPrimaryKey() throws Exception {
        testProjection("T1");
    }

    @TestTemplate
    public void testProjectionWithPrimaryKey() throws Exception {
        testProjection("T2");
    }

    @TestTemplate
    public void testConsumerId() throws Exception {
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM %s /*+ OPTIONS('consumer-id'='me') */", "T2"));
        batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", "T2");
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        Thread.sleep(1000L);
        of.close();
        BlockingIterator of2 = BlockingIterator.of(streamSqlIter("SELECT * FROM %s /*+ OPTIONS('consumer-id'='me') */", "T2"));
        batchSql("INSERT INTO %s VALUES ('7', '8', '9')", "T2");
        Assertions.assertThat(of2.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"7", "8", "9"})});
        of2.close();
    }

    @Timeout(120)
    @TestTemplate
    public void testSnapshotWatermark() throws Exception {
        streamSqlIter("CREATE TEMPORARY TABLE gen (a STRING, b STRING, c STRING, dt AS NOW(), WATERMARK FOR dt AS dt) WITH ('connector'='datagen')", new Object[0]);
        CloseableIterator<Row> streamSqlIter = streamSqlIter("INSERT INTO T2 SELECT a, b, c FROM gen", new Object[0]);
        sql("CREATE TABLE WT (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)", new Object[0]);
        CloseableIterator<Row> streamSqlIter2 = streamSqlIter("INSERT INTO WT SELECT * FROM T2 /*+ OPTIONS('consumer-id'='me') */", new Object[0]);
        while (((Set) sql("SELECT `watermark` FROM WT$snapshots", new Object[0]).stream().map(row -> {
            return (Long) row.getField(0);
        }).collect(Collectors.toSet())).size() <= 1) {
            Thread.sleep(1000L);
        }
        streamSqlIter.close();
        streamSqlIter2.close();
    }

    private void testSimple(String str) throws Exception {
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM %s", str));
        batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", str);
        batchSql("INSERT INTO %s VALUES ('7', '8', '9')", str);
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        Assertions.assertThat(of.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"7", "8", "9"})});
        of.close();
    }

    private void testProjection(String str) throws Exception {
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT b, c FROM %s", str));
        batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", str);
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"2", "3"}), Row.of(new Object[]{"5", "6"})});
        batchSql("INSERT INTO %s VALUES ('7', '8', '9')", str);
        Assertions.assertThat(of.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"8", "9"})});
        of.close();
    }

    @TestTemplate
    public void testContinuousLatest() throws Exception {
        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')", new Object[0]);
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='latest') */", new Object[0]));
        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')", new Object[0]);
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"7", "8", "9"}), Row.of(new Object[]{"10", "11", "12"})});
        of.close();
    }

    @TestTemplate
    public void testContinuousFromTimestamp() throws Exception {
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 'log.scan.timestamp-millis'='%s') */", 0));
        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')", new Object[0]);
        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')", new Object[0]);
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        of.close();
        SnapshotManager snapshotManager = new SnapshotManager(LocalFileIO.create(), getTableDirectory("T1"));
        ArrayList arrayList = new ArrayList((Collection) ImmutableList.copyOf(snapshotManager.snapshots()));
        arrayList.sort(Comparator.comparingLong((v0) -> {
            return v0.timeMillis();
        }));
        Snapshot snapshot = (Snapshot) arrayList.get(0);
        Snapshot snapshot2 = (Snapshot) arrayList.get(1);
        BlockingIterator of2 = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 'log.scan.timestamp-millis'='%s') */", Long.valueOf(snapshot2.timeMillis() - 1)));
        batchSql("INSERT INTO T1 VALUES ('13', '14', '15')", new Object[0]);
        Assertions.assertThat(of2.collect(3)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"7", "8", "9"}), Row.of(new Object[]{"10", "11", "12"}), Row.of(new Object[]{"13", "14", "15"})});
        of2.close();
        BlockingIterator of3 = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 'log.scan.timestamp-millis'='%s') */", Long.valueOf(snapshot2.timeMillis())));
        Assertions.assertThat(of3.collect(3)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"7", "8", "9"}), Row.of(new Object[]{"10", "11", "12"}), Row.of(new Object[]{"13", "14", "15"})});
        of3.close();
        BlockingIterator of4 = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 'log.scan.timestamp-millis'='%s') */", Long.valueOf(snapshot2.timeMillis() + 1)));
        Assertions.assertThat(of4.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"13", "14", "15"})});
        of4.close();
        BlockingIterator of5 = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 'log.scan.timestamp-millis'='%s') */", Long.valueOf(snapshot.timeMillis() - 1)));
        Assertions.assertThat(of5.collect(5)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"}), Row.of(new Object[]{"7", "8", "9"}), Row.of(new Object[]{"10", "11", "12"}), Row.of(new Object[]{"13", "14", "15"})});
        of5.close();
        BlockingIterator of6 = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 'log.scan.timestamp-millis'='%s') */", Long.valueOf(snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue()).timeMillis() + 1)));
        batchSql("INSERT INTO T1 VALUES ('16', '17', '18')", new Object[0]);
        Assertions.assertThat(of6.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"16", "17", "18"})});
        of6.close();
    }

    @TestTemplate
    public void testLackStartupTimestamp() {
        Assertions.assertThatThrownBy(() -> {
            streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp') */", new Object[0]);
        }).hasCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("scan.timestamp-millis can not be null when you use from-timestamp for scan.mode");
    }

    @TestTemplate
    public void testConfigureStartupTimestamp() throws Exception {
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan.timestamp-millis'='%s') */", 0));
        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')", new Object[0]);
        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')", new Object[0]);
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        of.close();
        Assertions.assertThatThrownBy(() -> {
            streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='latest', 'log.scan.timestamp-millis'='%s') */", 0);
        }).hasCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("scan.timestamp-millis must be null when you use latest for scan.mode");
    }

    @TestTemplate
    public void testConfigureStartupSnapshot() throws Exception {
        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')", new Object[0]);
        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')", new Object[0]);
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 1));
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        of.close();
        BlockingIterator of2 = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 0));
        Assertions.assertThat(of2.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        of2.close();
        BlockingIterator of3 = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 1));
        Assertions.assertThat(of3.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        of3.close();
        BlockingIterator of4 = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 2));
        Assertions.assertThat(of4.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"7", "8", "9"}), Row.of(new Object[]{"10", "11", "12"})});
        of4.close();
        Assertions.assertThatThrownBy(() -> {
            streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.mode'='latest', 'scan.snapshot-id'='%s') */", 0);
        }).hasCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("scan.snapshot-id must be null when you use latest for scan.mode");
    }

    @TestTemplate
    public void testConfigureStartupSnapshotFull() throws Exception {
        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')", new Object[0]);
        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')", new Object[0]);
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */", 1));
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        of.close();
        batchSql("INSERT INTO T1 VALUES ('13', '14', '15')", new Object[0]);
        BlockingIterator of2 = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */", 2));
        Assertions.assertThat(of2.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"}), Row.of(new Object[]{"7", "8", "9"}), Row.of(new Object[]{"10", "11", "12"})});
        of2.close();
        BlockingIterator of3 = BlockingIterator.of(streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */", 2));
        Assertions.assertThat(of3.collect(5)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"}), Row.of(new Object[]{"7", "8", "9"}), Row.of(new Object[]{"10", "11", "12"}), Row.of(new Object[]{"13", "14", "15"})});
        of3.close();
    }

    @TestTemplate
    public void testIgnoreOverwrite() throws Exception {
        BlockingIterator of = BlockingIterator.of(streamSqlIter("SELECT * FROM T1", new Object[0]));
        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')", new Object[0]);
        Assertions.assertThat(of.collect(2)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"1", "2", "3"}), Row.of(new Object[]{"4", "5", "6"})});
        batchSql("INSERT OVERWRITE T1 VALUES ('7', '8', '9')", new Object[0]);
        batchSql("INSERT INTO T1 VALUES ('9', '10', '11')", new Object[0]);
        Assertions.assertThat(of.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"9", "10", "11"})});
        of.close();
    }

    @TestTemplate
    public void testUnsupportedUpsert() {
        Assertions.assertThatThrownBy(() -> {
            streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.changelog-mode'='upsert') */", new Object[0]);
        }).hasCauseInstanceOf(ValidationException.class).hasRootCauseMessage("File store continuous reading does not support upsert changelog mode.");
    }

    @TestTemplate
    public void testUnsupportedEventual() {
        Assertions.assertThatThrownBy(() -> {
            streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.consistency'='eventual') */", new Object[0]);
        }).hasCauseInstanceOf(ValidationException.class).hasRootCauseMessage("File store continuous reading does not support eventual consistency mode.");
    }

    @TestTemplate
    public void testFlinkMemoryPool() {
        Assertions.assertThatThrownBy(() -> {
            batchSql("INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='0M') */ VALUES ('1', '2', '3'), ('4', '5', '6')", "T1");
        }).hasCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("Weights for operator scope use cases must be greater than 0.");
        batchSql("INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='1M') */ VALUES ('1', '2', '3'), ('4', '5', '6')", "T1");
        Assertions.assertThat(batchSql("SELECT * FROM T1", new Object[0]).size()).isEqualTo(2);
    }
}
