package org.apache.paimon.flink;

import java.util.Collections;
import java.util.List;
import org.apache.flink.types.Row;
import org.apache.paimon.CoreOptions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/BatchFileStoreITCase.class */
public class BatchFileStoreITCase extends CatalogITCaseBase {
    @Override // org.apache.paimon.flink.CatalogITCaseBase
    protected List<String> ddl() {
        return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT, b INT, c INT)");
    }

    @Test
    public void testAdaptiveParallelism() {
        batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)", new Object[0]);
        Assertions.assertThatThrownBy(() -> {
            batchSql("INSERT INTO T SELECT a, b, c FROM T GROUP BY a,b,c", new Object[0]);
        }).hasMessageContaining("Paimon Sink does not support Flink's Adaptive Parallelism mode.");
        batchSql("INSERT INTO T /*+ OPTIONS('sink.parallelism'='1') */ SELECT a, b, c FROM T GROUP BY a,b,c", new Object[0]);
    }

    @Test
    public void testOverwriteEmpty() {
        batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)", new Object[0]);
        Assertions.assertThat(batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222})});
        batchSql("INSERT OVERWRITE T SELECT * FROM T WHERE 1 <> 1", new Object[0]);
        Assertions.assertThat(batchSql("SELECT * FROM T", new Object[0])).isEmpty();
    }

    @Test
    public void testTimeTravelRead() throws Exception {
        batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)", new Object[0]);
        long currentTimeMillis = System.currentTimeMillis();
        Thread.sleep(10L);
        batchSql("INSERT INTO T VALUES (3, 33, 333), (4, 44, 444)", new Object[0]);
        long currentTimeMillis2 = System.currentTimeMillis();
        Thread.sleep(10L);
        batchSql("INSERT INTO T VALUES (5, 55, 555), (6, 66, 666)", new Object[0]);
        long currentTimeMillis3 = System.currentTimeMillis();
        Thread.sleep(10L);
        batchSql("INSERT INTO T VALUES (7, 77, 777), (8, 88, 888)", new Object[0]);
        paimonTable("T").createTag("tag2", 2L);
        Assertions.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='1') */", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222})});
        Assertions.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='1') */", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222})});
        Assertions.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='0') */", new Object[0])).isEmpty();
        Assertions.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='0') */", new Object[0])).isEmpty();
        Assertions.assertThat(batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s') */", Long.valueOf(currentTimeMillis)), new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222})});
        Assertions.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='2') */", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, 44, 444})});
        Assertions.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='2') */", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, 44, 444})});
        Assertions.assertThat(batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s') */", Long.valueOf(currentTimeMillis2)), new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, 44, 444})});
        Assertions.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='3') */", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, 44, 444}), Row.of(new Object[]{5, 55, 555}), Row.of(new Object[]{6, 66, 666})});
        Assertions.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='3') */", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, 44, 444}), Row.of(new Object[]{5, 55, 555}), Row.of(new Object[]{6, 66, 666})});
        Assertions.assertThat(batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s') */", Long.valueOf(currentTimeMillis3)), new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, 44, 444}), Row.of(new Object[]{5, 55, 555}), Row.of(new Object[]{6, 66, 666})});
        Assertions.assertThatThrownBy(() -> {
            batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s', 'scan.snapshot-id'='1') */", Long.valueOf(currentTimeMillis3)), new Object[0]);
        }).hasRootCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("[scan.snapshot-id] must be null when you set [scan.timestamp-millis]");
        Assertions.assertThatThrownBy(() -> {
            batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='full', 'scan.snapshot-id'='1') */", new Object[0]);
        }).hasRootCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("%s must be null when you use latest-full for scan.mode", new Object[]{CoreOptions.SCAN_SNAPSHOT_ID.key()});
        Assertions.assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag2') */", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, 44, 444})});
        Assertions.assertThatThrownBy(() -> {
            batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='unknown') */", new Object[0]);
        }).hasRootCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("Tag 'unknown' doesn't exist.");
    }

    @Test
    public void testSortSpillMerge() {
        sql("CREATE TABLE IF NOT EXISTS KT (a INT PRIMARY KEY NOT ENFORCED, b STRING) WITH ('sort-spill-threshold'='2')", new Object[0]);
        sql("INSERT INTO KT VALUES (1, '1')", new Object[0]);
        sql("INSERT INTO KT VALUES (1, '2')", new Object[0]);
        sql("INSERT INTO KT VALUES (1, '3')", new Object[0]);
        sql("INSERT INTO KT VALUES (1, '4')", new Object[0]);
        sql("INSERT INTO KT VALUES (1, '5')", new Object[0]);
        sql("INSERT INTO KT VALUES (1, '6')", new Object[0]);
        sql("INSERT INTO KT VALUES (1, '7')", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM KT", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "7"})});
        Assertions.assertThat(sql("SELECT b FROM KT", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"7"})});
    }
}
