package org.apache.paimon.flink;

import java.io.File;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.FailingFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.class */
public class UnawareBucketAppendOnlyTableITCase extends CatalogITCaseBase {
    private static final Random RANDOM = new Random();

    @Test
    public void testReadEmpty() {
        Assertions.assertThat(batchSql("SELECT * FROM append_table", new Object[0])).isEmpty();
    }

    @Test
    public void testReadWrite() {
        batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')", new Object[0]);
        List<Row> batchSql = batchSql("SELECT * FROM append_table", new Object[0]);
        Assertions.assertThat(batchSql.size()).isEqualTo(2);
        Assertions.assertThat(batchSql).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "AAA"}), Row.of(new Object[]{2, "BBB"})});
        List<Row> batchSql2 = batchSql("SELECT id FROM append_table", new Object[0]);
        Assertions.assertThat(batchSql2.size()).isEqualTo(2);
        Assertions.assertThat(batchSql2).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1}), Row.of(new Object[]{2})});
        List<Row> batchSql3 = batchSql("SELECT data from append_table", new Object[0]);
        Assertions.assertThat(batchSql3.size()).isEqualTo(2);
        Assertions.assertThat(batchSql3).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"AAA"}), Row.of(new Object[]{"BBB"})});
    }

    @Test
    public void testSkipDedup() {
        batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (1, 'AAA'), (2, 'BBB'), (3, 'BBB')", new Object[0]);
        List<Row> batchSql = batchSql("SELECT * FROM append_table", new Object[0]);
        Assertions.assertThat(batchSql.size()).isEqualTo(4);
        Assertions.assertThat(batchSql).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "AAA"}), Row.of(new Object[]{1, "AAA"}), Row.of(new Object[]{2, "BBB"}), Row.of(new Object[]{3, "BBB"})});
        List<Row> batchSql2 = batchSql("SELECT id FROM append_table", new Object[0]);
        Assertions.assertThat(batchSql2.size()).isEqualTo(4);
        Assertions.assertThat(batchSql2).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1}), Row.of(new Object[]{1}), Row.of(new Object[]{2}), Row.of(new Object[]{3})});
        List<Row> batchSql3 = batchSql("SELECT data FROM append_table", new Object[0]);
        Assertions.assertThat(batchSql3.size()).isEqualTo(4);
        Assertions.assertThat(batchSql3).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"AAA"}), Row.of(new Object[]{"AAA"}), Row.of(new Object[]{"BBB"}), Row.of(new Object[]{"BBB"})});
    }

    @Test
    public void testIngestFromSource() {
        batchSql("CREATE TEMPORARY TABLE source (id INT, data STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')", TestValuesTableFactory.registerData(Arrays.asList(Row.ofKind(RowKind.INSERT, new Object[]{1, "AAA"}), Row.ofKind(RowKind.INSERT, new Object[]{1, "AAA"}), Row.ofKind(RowKind.INSERT, new Object[]{1, "BBB"}), Row.ofKind(RowKind.INSERT, new Object[]{2, "AAA"}))));
        batchSql("INSERT INTO append_table SELECT * FROM source", new Object[0]);
        List<Row> batchSql = batchSql("SELECT * FROM append_table", new Object[0]);
        Assertions.assertThat(batchSql.size()).isEqualTo(4);
        Assertions.assertThat(batchSql).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "AAA"}), Row.of(new Object[]{1, "AAA"}), Row.of(new Object[]{1, "BBB"}), Row.of(new Object[]{2, "AAA"})});
        List<Row> batchSql2 = batchSql("SELECT id FROM append_table", new Object[0]);
        Assertions.assertThat(batchSql2.size()).isEqualTo(4);
        Assertions.assertThat(batchSql2).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1}), Row.of(new Object[]{1}), Row.of(new Object[]{1}), Row.of(new Object[]{2})});
        List<Row> batchSql3 = batchSql("SELECT data FROM append_table", new Object[0]);
        Assertions.assertThat(batchSql3.size()).isEqualTo(4);
        Assertions.assertThat(batchSql3).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{"AAA"}), Row.of(new Object[]{"AAA"}), Row.of(new Object[]{"BBB"}), Row.of(new Object[]{"AAA"})});
    }

    @Test
    public void testNoCompactionInBatchMode() {
        batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '2')", new Object[0]);
        batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')", new Object[0]);
        assertExecuteExpected("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')", 1L, Snapshot.CommitKind.APPEND);
        assertExecuteExpected("INSERT INTO append_table VALUES (3, 'CCC'), (4, 'DDD')", 2L, Snapshot.CommitKind.APPEND);
        assertExecuteExpected("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD')", 3L, Snapshot.CommitKind.APPEND);
        assertExecuteExpected("INSERT INTO append_table VALUES (5, 'EEE'), (6, 'FFF')", 4L, Snapshot.CommitKind.APPEND);
        assertExecuteExpected("INSERT INTO append_table VALUES (7, 'HHH'), (8, 'III')", 5L, Snapshot.CommitKind.APPEND);
        assertExecuteExpected("INSERT INTO append_table VALUES (9, 'JJJ'), (10, 'KKK')", 6L, Snapshot.CommitKind.APPEND);
        assertExecuteExpected("INSERT INTO append_table VALUES (11, 'LLL'), (12, 'MMM')", 7L, Snapshot.CommitKind.APPEND);
        assertExecuteExpected("INSERT INTO append_table VALUES (13, 'NNN'), (14, 'OOO')", 8L, Snapshot.CommitKind.APPEND);
        List<Row> batchSql = batchSql("SELECT * FROM append_table", new Object[0]);
        Assertions.assertThat(batchSql.size()).isEqualTo(18);
        Assertions.assertThat(batchSql).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, "AAA"}), Row.of(new Object[]{2, "BBB"}), Row.of(new Object[]{3, "CCC"}), Row.of(new Object[]{4, "DDD"}), Row.of(new Object[]{1, "AAA"}), Row.of(new Object[]{2, "BBB"}), Row.of(new Object[]{3, "CCC"}), Row.of(new Object[]{4, "DDD"}), Row.of(new Object[]{5, "EEE"}), Row.of(new Object[]{6, "FFF"}), Row.of(new Object[]{7, "HHH"}), Row.of(new Object[]{8, "III"}), Row.of(new Object[]{9, "JJJ"}), Row.of(new Object[]{10, "KKK"}), Row.of(new Object[]{11, "LLL"}), Row.of(new Object[]{12, "MMM"}), Row.of(new Object[]{13, "NNN"}), Row.of(new Object[]{14, "OOO"})});
    }

    @Test
    public void testCompactionInStreamingMode() throws Exception {
        batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '2')", new Object[0]);
        batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')", new Object[0]);
        this.sEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMillis(500L));
        this.sEnv.executeSql("CREATE TEMPORARY TABLE Orders_in (\n    f0        INT,\n    f1        STRING\n) WITH (\n    'connector' = 'datagen',\n    'rows-per-second' = '1',\n    'number-of-rows' = '10'\n)");
        assertStreamingHasCompact("INSERT INTO append_table SELECT * FROM Orders_in", 60000L);
        Thread.sleep(5000L);
        Assertions.assertThat(batchSql("SELECT * FROM append_table", new Object[0]).size()).isEqualTo(10);
    }

    @Test
    public void testRejectDelete() {
        testRejectChanges(RowKind.DELETE);
    }

    @Test
    public void testRejectUpdateBefore() {
        testRejectChanges(RowKind.UPDATE_BEFORE);
    }

    @Test
    public void testRejectUpdateAfter() {
        testRejectChanges(RowKind.UPDATE_BEFORE);
    }

    @Test
    public void testComplexType() {
        batchSql("INSERT INTO complex_table VALUES (1, CAST(NULL AS MAP<INT, INT>))", new Object[0]);
        Assertions.assertThat(batchSql("SELECT * FROM complex_table", new Object[0])).containsExactly(new Row[]{Row.of(new Object[]{1, null})});
    }

    /* JADX WARN: Type inference failed for: r7v4, types: [java.time.ZonedDateTime] */
    @Test
    public void testTimestampLzType() {
        sql("CREATE TABLE t_table (id INT, data TIMESTAMP_LTZ(3))", new Object[0]);
        batchSql("INSERT INTO t_table VALUES (1, TIMESTAMP '2023-02-03 20:20:20')", new Object[0]);
        Assertions.assertThat(batchSql("SELECT * FROM t_table", new Object[0])).containsExactly(new Row[]{Row.of(new Object[]{1, LocalDateTime.parse("2023-02-03T20:20:20").atZone(ZoneId.systemDefault()).toInstant()})});
    }

    @Test
    public void testReadWriteFailRandom() throws Exception {
        setFailRate(100, 1000);
        int i = 1000;
        ArrayList arrayList = new ArrayList();
        StringBuilder sb = new StringBuilder("");
        for (int i2 = 0; i2 < 1000; i2++) {
            Integer valueOf = Integer.valueOf(RANDOM.nextInt());
            arrayList.add(Row.of(new Object[]{valueOf, String.valueOf(valueOf)}));
            sb.append("(" + valueOf + ",'" + valueOf + "'),");
        }
        FailingFileIO.retryArtificialException(() -> {
            return batchSql(String.format("INSERT INTO append_table VALUES %s", sb.toString().substring(0, sb.length() - 1)), new Object[0]);
        });
        FailingFileIO.retryArtificialException(() -> {
            batchSql("SELECT * FROM append_table", new Object[0]);
            List<Row> batchSql = batchSql("SELECT * FROM append_table", new Object[0]);
            Assertions.assertThat(batchSql.size()).isGreaterThanOrEqualTo(i);
            Assertions.assertThat(batchSql).containsExactlyInAnyOrder(arrayList.toArray(new Row[0]));
        });
    }

    @Test
    public void testReadWriteFailRandomString() throws Exception {
        setFailRate(100, 1000);
        int i = 1000;
        ArrayList arrayList = new ArrayList();
        StringBuilder sb = new StringBuilder("");
        for (int i2 = 0; i2 < 1000; i2++) {
            Integer valueOf = Integer.valueOf(RANDOM.nextInt());
            String valueOf2 = String.valueOf(RANDOM.nextInt());
            arrayList.add(Row.of(new Object[]{valueOf, valueOf2}));
            sb.append("(" + valueOf + ",'" + valueOf2 + "'),");
        }
        FailingFileIO.retryArtificialException(() -> {
            return batchSql(String.format("INSERT INTO append_table VALUES %s", sb.toString().substring(0, sb.length() - 1)), new Object[0]);
        });
        FailingFileIO.retryArtificialException(() -> {
            batchSql("SELECT * FROM append_table", new Object[0]);
            List<Row> batchSql = batchSql("SELECT * FROM append_table", new Object[0]);
            Assertions.assertThat(batchSql.size()).isGreaterThanOrEqualTo(i);
            Assertions.assertThat(batchSql).containsExactlyInAnyOrder(arrayList.toArray(new Row[0]));
        });
    }

    @Test
    public void testLimit() {
        sql("INSERT INTO append_table VALUES (1, 'AAA')", new Object[0]);
        sql("INSERT INTO append_table VALUES (2, 'BBB')", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM append_table LIMIT 1", new Object[0])).hasSize(1);
    }

    @Test
    public void testFileIndex() {
        batchSql("INSERT INTO index_table VALUES (1, 'a', 'AAA'), (1, 'a', 'AAA'), (2, 'c', 'BBB'), (3, 'c', 'BBB')", new Object[0]);
        Assertions.assertThat(batchSql("SELECT * FROM index_table WHERE indexc = 'c'", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{2, "c", "BBB"}), Row.of(new Object[]{3, "c", "BBB"})});
    }

    @Override // org.apache.paimon.flink.CatalogITCaseBase
    protected List<String> ddl() {
        return Arrays.asList("CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) WITH ('bucket' = '-1')", "CREATE TABLE IF NOT EXISTS part_table (id INT, data STRING, dt STRING) PARTITIONED BY (dt) WITH ('bucket' = '-1')", "CREATE TABLE IF NOT EXISTS complex_table (id INT, data MAP<INT, INT>) WITH ('bucket' = '-1')", "CREATE TABLE IF NOT EXISTS index_table (id INT, indexc STRING, data STRING) WITH ('bucket' = '-1', 'file-index.bloom-filter.columns'='indexc', 'file-index.bloom-filter.indexc.items' = '500')");
    }

    @Override // org.apache.paimon.flink.CatalogITCaseBase
    protected String toWarehouse(String str) {
        File file = new File(str);
        String name = file.getName();
        String path = file.getPath();
        FailingFileIO.reset(name, 0, 1);
        return FailingFileIO.getFailingPath(name, path);
    }

    private void setFailRate(int i, int i2) {
        FailingFileIO.reset(new Path(this.path).getName(), i, i2);
    }

    private void testRejectChanges(RowKind rowKind) {
        batchSql("CREATE TEMPORARY TABLE source (id INT, data STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')", TestValuesTableFactory.registerData(Collections.singletonList(Row.ofKind(rowKind, new Object[]{1, "AAA"}))));
        Assertions.assertThatThrownBy(() -> {
            batchSql("INSERT INTO append_table SELECT * FROM source", new Object[0]);
        }).hasRootCauseInstanceOf(IllegalStateException.class).hasRootCauseMessage("Append only writer can not accept row with RowKind %s", new Object[]{rowKind});
    }

    private void assertExecuteExpected(String str, long j, Snapshot.CommitKind commitKind) {
        batchSql(str, new Object[0]);
        Snapshot findLatestSnapshot = findLatestSnapshot("append_table");
        Assertions.assertThat(findLatestSnapshot.id()).isEqualTo(j);
        Assertions.assertThat(findLatestSnapshot.commitKind()).isEqualTo(commitKind);
    }

    private void assertStreamingHasCompact(String str, long j) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        long j2 = 1;
        this.sEnv.executeSql(str);
        while (true) {
            Snapshot findSnapshot = findSnapshot("append_table", j2);
            if (findSnapshot != null) {
                if (findSnapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
                    return;
                } else {
                    j2++;
                }
            }
            if (System.currentTimeMillis() - currentTimeMillis > j) {
                throw new RuntimeException("Time up for streaming execute, don't get expected result.");
            }
            Thread.sleep(1000L);
        }
    }
}
