package org.apache.flink.connectors.hive;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.hadoop.hive.conf.HiveConf;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/connectors/hive/HiveTableCompactSinkITCase.class */
class HiveTableCompactSinkITCase {

    @RegisterExtension
    public static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension();
    private TableEnvironment tableEnv;
    private HiveCatalog hiveCatalog;
    private String warehouse;

    HiveTableCompactSinkITCase() {
    }

    @BeforeEach
    void setUp() {
        this.hiveCatalog = HiveTestUtils.createHiveCatalog();
        this.hiveCatalog.open();
        this.warehouse = this.hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
        this.tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        this.tableEnv.registerCatalog(this.hiveCatalog.getName(), this.hiveCatalog);
        this.tableEnv.useCatalog(this.hiveCatalog.getName());
    }

    @AfterEach
    void tearDown() {
        if (this.hiveCatalog != null) {
            this.hiveCatalog.close();
        }
    }

    @Test
    void testNoCompaction() throws Exception {
        this.tableEnv.executeSql("CREATE TABLE src ( key string, value string) TBLPROPERTIES ( 'auto-compaction' = 'true',  'compaction.small-files.avg-size' = '1b',  'sink.parallelism' = '4')");
        this.tableEnv.executeSql("insert into src values ('k1', 'v1'), ('k2', 'v2'),('k3', 'v3'), ('k4', 'v4')").await();
        Assertions.assertThat(listDataFiles(Paths.get(this.warehouse, "src"))).hasSize(4);
        Assertions.assertThat(toSortedResult(this.tableEnv.executeSql("select * from src")).toString()).isEqualTo("[+I[k1, v1], +I[k2, v2], +I[k3, v3], +I[k4, v4]]");
    }

    @Test
    void testCompactNonPartitionedTable() throws Exception {
        this.tableEnv.executeSql("CREATE TABLE src ( key string, value string) TBLPROPERTIES ( 'auto-compaction' = 'true',  'sink.parallelism' = '4')");
        this.tableEnv.executeSql("insert into src values ('k1', 'v1'), ('k2', 'v2'),('k3', 'v3'), ('k4', 'v4')").await();
        Assertions.assertThat(listDataFiles(Paths.get(this.warehouse, "src"))).hasSize(1);
        Assertions.assertThat(toSortedResult(this.tableEnv.executeSql("select * from src")).toString()).isEqualTo("[+I[k1, v1], +I[k2, v2], +I[k3, v3], +I[k4, v4]]");
    }

    @Test
    void testCompactPartitionedTable() throws Exception {
        this.tableEnv.executeSql("CREATE TABLE src ( key string, value string) partitioned by (p1 int,p2 string) TBLPROPERTIES ( 'auto-compaction' = 'true',  'sink.parallelism' = '8')");
        this.tableEnv.executeSql("insert into src partition (p1=0,p2='static') values (1,'a'),(2,'b'),(3,'c')").await();
        Assertions.assertThat(listDataFiles(Paths.get(this.warehouse, "src/p1=0/p2=static"))).hasSize(1);
        Assertions.assertThat(toSortedResult(this.tableEnv.executeSql("select * from src")).toString()).isEqualTo("[+I[1, a, 0, static], +I[2, b, 0, static], +I[3, c, 0, static]]");
        this.tableEnv.executeSql("insert into src partition (p1=0,p2) values (1,'a','d1'), (2,'b','d2'), (3,'c','d1'), (4,'d','d2')").await();
        Assertions.assertThat(listDataFiles(Paths.get(this.warehouse, "src/p1=0/p2=d1"))).hasSize(1);
        Assertions.assertThat(listDataFiles(Paths.get(this.warehouse, "src/p1=0/p2=d2"))).hasSize(1);
        Assertions.assertThat(toSortedResult(this.tableEnv.executeSql("select * from src")).toString()).isEqualTo("[+I[1, a, 0, d1], +I[1, a, 0, static], +I[2, b, 0, d2], +I[2, b, 0, static], +I[3, c, 0, d1], +I[3, c, 0, static], +I[4, d, 0, d2]]");
    }

    @Test
    void testConditionalCompact() throws Exception {
        this.tableEnv.executeSql("CREATE TABLE src ( key string, value string) partitioned by (p int) TBLPROPERTIES ( 'auto-compaction' = 'true',  'compaction.small-files.avg-size' = '9b',  'sink.parallelism' = '4')");
        this.tableEnv.executeSql("insert into src values ('k1', 'v1', 1), ('k2', 'v2', 1),('k3', 'v3', 2), ('k4', 'v4', 2), ('k5', 'v5', 1)").await();
        Assertions.assertThat(listDataFiles(Paths.get(this.warehouse, "src/p=2"))).hasSize(1);
        Assertions.assertThat(listDataFiles(Paths.get(this.warehouse, "src/p=1"))).hasSize(2);
        Assertions.assertThat(toSortedResult(this.tableEnv.executeSql("select * from src")).toString()).isEqualTo("[+I[k1, v1, 1], +I[k2, v2, 1], +I[k3, v3, 2], +I[k4, v4, 2], +I[k5, v5, 1]]");
    }

    private List<Path> listDataFiles(Path path) throws Exception {
        String str = (String) FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.defaultValue();
        return (List) Files.list(path).filter(path2 -> {
            return (path2.toFile().isHidden() || path2.toFile().getName().equals(str)) ? false : true;
        }).collect(Collectors.toList());
    }

    private List<String> toSortedResult(TableResult tableResult) {
        return (List) CollectionUtil.iteratorToList(tableResult.collect()).stream().map((v0) -> {
            return v0.toString();
        }).sorted().collect(Collectors.toList());
    }
}
