package org.apache.paimon.flink;

import java.util.Arrays;
import java.util.List;
import org.apache.flink.types.Row;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/LookupJoinITCase.class */
public class LookupJoinITCase extends CatalogITCaseBase {
    @Override // org.apache.paimon.flink.CatalogITCaseBase
    public List<String> ddl() {
        return Arrays.asList("CREATE TABLE T (i INT, `proctime` AS PROCTIME())", "CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('continuous.discovery-interval'='1 ms')");
    }

    @Override // org.apache.paimon.flink.CatalogITCaseBase
    protected int defaultParallelism() {
        return 1;
    }

    @Test
    public void testLookupEmptyTable() throws Exception {
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i").collect());
        sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        Assertions.assertThat(of.collect(3)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, null, null, null}), Row.of(new Object[]{2, null, null, null}), Row.of(new Object[]{3, null, null, null})});
        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        Thread.sleep(2000L);
        sql("INSERT INTO T VALUES (1), (2), (4)", new Object[0]);
        Assertions.assertThat(of.collect(3)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111, 1111}), Row.of(new Object[]{2, 22, 222, 2222}), Row.of(new Object[]{4, null, null, null})});
        of.close();
    }

    @Test
    public void testLookup() throws Exception {
        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i").collect());
        sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        Assertions.assertThat(of.collect(3)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111, 1111}), Row.of(new Object[]{2, 22, 222, 2222}), Row.of(new Object[]{3, null, null, null})});
        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111, 1111}), Row.of(new Object[]{2, 44, 444, 4444}), Row.of(new Object[]{3, 33, 333, 3333}), Row.of(new Object[]{4, null, null, null})});
        of.close();
    }

    @Test
    public void testLookupWithLatest() throws Exception {
        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ OPTIONS('scan.mode'='latest') */ for system_time as of T.proctime AS D ON T.i = D.i").collect());
        sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        Assertions.assertThat(of.collect(3)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111, 1111}), Row.of(new Object[]{2, 22, 222, 2222}), Row.of(new Object[]{3, null, null, null})});
        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111, 1111}), Row.of(new Object[]{2, 44, 444, 4444}), Row.of(new Object[]{3, 33, 333, 3333}), Row.of(new Object[]{4, null, null, null})});
        of.close();
    }

    @Test
    public void testLookupProjection() throws Exception {
        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i").collect());
        sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        Assertions.assertThat(of.collect(3)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, null, null})});
        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 44, 444}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, null, null})});
        of.close();
    }

    @Test
    public void testLookupFilterPk() throws Exception {
        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.i > 2").collect());
        sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        Assertions.assertThat(of.collect(3)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, null, null}), Row.of(new Object[]{2, null, null}), Row.of(new Object[]{3, null, null})});
        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, null, null}), Row.of(new Object[]{2, null, null}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, null, null})});
        of.close();
    }

    @Test
    public void testLookupFilterSelect() throws Exception {
        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.k1 > 111").collect());
        sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        Assertions.assertThat(of.collect(3)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, null, null}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, null, null})});
        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, null, null}), Row.of(new Object[]{2, 44, 444}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, null, null})});
        of.close();
    }

    @Test
    public void testLookupFilterUnSelect() throws Exception {
        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.k2 > 1111").collect());
        sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        Assertions.assertThat(of.collect(3)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, null, null}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, null, null})});
        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, null, null}), Row.of(new Object[]{2, 44, 444}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, null, null})});
        of.close();
    }

    @Test
    public void testLookupFilterUnSelectAndUpdate() throws Exception {
        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.k2 < 4444").collect());
        sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        Assertions.assertThat(of.collect(3)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, null, null})});
        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, null, null}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, null, null})});
        of.close();
    }

    @Test
    public void testNonPkLookup() throws Exception {
        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT D.i, T.i, D.k1, D.k2 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j").collect());
        sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111, 1111}), Row.of(new Object[]{2, 22, 222, 2222}), Row.of(new Object[]{3, 22, 333, 3333}), Row.of(new Object[]{null, 33, null, null})});
        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111, 1111}), Row.of(new Object[]{null, 22, null, null}), Row.of(new Object[]{3, 33, 333, 3333}), Row.of(new Object[]{2, 44, 444, 4444})});
        of.close();
    }

    @Test
    public void testNonPkLookupProjection() throws Exception {
        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j").collect());
        sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{11, 111}), Row.of(new Object[]{22, 222}), Row.of(new Object[]{22, 333}), Row.of(new Object[]{33, null})});
        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{11, 111}), Row.of(new Object[]{22, null}), Row.of(new Object[]{33, 333}), Row.of(new Object[]{44, 444})});
        of.close();
    }

    @Test
    public void testNonPkLookupFilterPk() throws Exception {
        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.i > 2").collect());
        sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        Assertions.assertThat(of.collect(3)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{11, null}), Row.of(new Object[]{22, 333}), Row.of(new Object[]{33, null})});
        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{11, null}), Row.of(new Object[]{22, null}), Row.of(new Object[]{33, 333}), Row.of(new Object[]{44, null})});
        of.close();
    }

    @Test
    public void testNonPkLookupFilterSelect() throws Exception {
        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.k1 > 111").collect());
        sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{11, null}), Row.of(new Object[]{22, 222}), Row.of(new Object[]{22, 333}), Row.of(new Object[]{33, null})});
        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{11, null}), Row.of(new Object[]{22, null}), Row.of(new Object[]{33, 333}), Row.of(new Object[]{44, 444})});
        of.close();
    }

    @Test
    public void testNonPkLookupFilterUnSelect() throws Exception {
        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.k2 > 1111").collect());
        sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{11, null}), Row.of(new Object[]{22, 222}), Row.of(new Object[]{22, 333}), Row.of(new Object[]{33, null})});
        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{11, null}), Row.of(new Object[]{22, null}), Row.of(new Object[]{33, 333}), Row.of(new Object[]{44, 444})});
        of.close();
    }

    @Test
    public void testNonPkLookupFilterUnSelectAndUpdate() throws Exception {
        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.k2 < 4444").collect());
        sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{11, 111}), Row.of(new Object[]{22, 222}), Row.of(new Object[]{22, 333}), Row.of(new Object[]{33, null})});
        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{11, 111}), Row.of(new Object[]{22, null}), Row.of(new Object[]{33, 333}), Row.of(new Object[]{44, null})});
        of.close();
    }

    @Test
    public void testRepeatRefresh() throws Exception {
        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i").collect());
        sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        Assertions.assertThat(of.collect(3)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 22, 222}), Row.of(new Object[]{3, null, null})});
        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444)", new Object[0]);
        sql("INSERT INTO DIM VALUES (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        Assertions.assertThat(of.collect(4)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111}), Row.of(new Object[]{2, 44, 444}), Row.of(new Object[]{3, 33, 333}), Row.of(new Object[]{4, null, null})});
        of.close();
    }

    @Test
    public void testLookupPartialUpdateIllegal() {
        sql("CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('merge-engine'='partial-update','continuous.discovery-interval'='1 ms')", new Object[0]);
        String str = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM2 for system_time as of T.proctime AS D ON T.i = D.i";
        Assertions.assertThatThrownBy(() -> {
            this.sEnv.executeSql(str);
        }).hasRootCauseMessage("Partial update streaming reading is not supported. You can use 'lookup' or 'full-compaction' changelog producer to support streaming reading.");
    }

    @Test
    public void testLookupPartialUpdate() throws Exception {
        sql("CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('merge-engine'='partial-update', 'changelog-producer'='full-compaction', 'changelog-producer.compaction-interval'='1 s', 'continuous.discovery-interval'='10 ms')", new Object[0]);
        sql("INSERT INTO DIM2 VALUES (1, CAST(NULL AS INT), 111, CAST(NULL AS INT))", new Object[0]);
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM2 for system_time as of T.proctime AS D ON T.i = D.i").collect());
        sql("INSERT INTO T VALUES (1)", new Object[0]);
        Assertions.assertThat(of.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, null, 111, null})});
        sql("INSERT INTO DIM2 VALUES (1, 11, CAST(NULL AS INT), 1111)", new Object[0]);
        Thread.sleep(2000L);
        sql("INSERT INTO T VALUES (1)", new Object[0]);
        Assertions.assertThat(of.collect(1)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111, 1111})});
        of.close();
    }

    @Test
    public void testRetryLookup() throws Exception {
        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        BlockingIterator of = BlockingIterator.of(this.sEnv.executeSql("SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s','max-attempts'='60') */ T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i").collect());
        sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        Thread.sleep(2000L);
        sql("INSERT INTO DIM VALUES (3, 33, 333, 3333)", new Object[0]);
        Assertions.assertThat(of.collect(3)).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 11, 111, 1111}), Row.of(new Object[]{2, 22, 222, 2222}), Row.of(new Object[]{3, 33, 333, 3333})});
        of.close();
    }
}
