package org.apache.paimon.flink.util;

import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;

/* loaded from: input_file:org/apache/paimon/flink/util/ReadWriteTableTestUtil.class */
public class ReadWriteTableTestUtil {
    public static final int DEFAULT_PARALLELISM = 2;
    public static TableEnvironment sEnv;
    public static StreamExecutionEnvironment bExeEnv;
    public static TableEnvironment bEnv;
    public static String warehouse;
    private static final Time TIME_OUT = Time.seconds(10);
    public static final Map<String, String> SCAN_LATEST = new HashMap<String, String>() { // from class: org.apache.paimon.flink.util.ReadWriteTableTestUtil.1
        {
            put(CoreOptions.SCAN_MODE.key(), CoreOptions.StartupMode.LATEST.toString());
        }
    };

    public static void init(String str) {
        init(str, 2);
    }

    public static void init(String str, int i) {
        StreamExecutionEnvironment buildStreamEnv = buildStreamEnv(i);
        buildStreamEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
        sEnv = StreamTableEnvironment.create(buildStreamEnv);
        bExeEnv = buildBatchEnv(i);
        bExeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
        bEnv = StreamTableEnvironment.create(bExeEnv, EnvironmentSettings.inBatchMode());
        warehouse = str;
        sEnv.executeSql(String.format("CREATE CATALOG %s WITH ('type'='paimon', 'warehouse'='%s');", "PAIMON", str));
        sEnv.useCatalog("PAIMON");
        bEnv.registerCatalog("PAIMON", (Catalog) sEnv.getCatalog("PAIMON").get());
        bEnv.useCatalog("PAIMON");
    }

    public static StreamExecutionEnvironment buildStreamEnv(int i) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        executionEnvironment.enableCheckpointing(100L);
        executionEnvironment.setParallelism(i);
        return executionEnvironment;
    }

    public static StreamExecutionEnvironment buildBatchEnv(int i) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        executionEnvironment.setParallelism(i);
        return executionEnvironment;
    }

    public static String createTable(List<String> list, List<String> list2, List<String> list3) {
        return createTable(list, list2, list3, new HashMap());
    }

    public static String createTable(List<String> list, List<String> list2, List<String> list3, Map<String, String> map) {
        String replace = ("MyTable_" + UUID.randomUUID()).replace("-", "_");
        HashMap hashMap = new HashMap(map);
        if (!hashMap.containsKey("bucket")) {
            hashMap.put("bucket", "1");
        }
        sEnv.executeSql(buildDdl(replace, list, list2, list3, hashMap));
        return replace;
    }

    public static String createTemporaryTable(List<String> list, List<String> list2, List<String> list3, List<Row> list4, @Nullable String str, boolean z, String str2) {
        String str3 = "temp_" + UUID.randomUUID();
        TableEnvironment tableEnvironment = sEnv;
        Object[] objArr = new Object[8];
        objArr[0] = str3;
        objArr[1] = String.join(",", list);
        objArr[2] = buildPkConstraint(list2);
        objArr[3] = buildPartitionSpec(list3);
        objArr[4] = TestValuesTableFactory.registerData(list4);
        objArr[5] = str == null ? "" : String.format("'partition-list' = '%s',\n", str);
        objArr[6] = Boolean.valueOf(z);
        objArr[7] = str2;
        tableEnvironment.executeSql(String.format("CREATE TEMPORARY TABLE `%s`( %s %s) %s WITH (\n'connector' = 'values',\n'disable-lookup' = 'true',\n'data-id' = '%s',\n%s'bounded' = '%s',\n'changelog-mode' = '%s'\n);", objArr));
        return str3;
    }

    public static void insertInto(String str, String... strArr) throws Exception {
        insertIntoPartition(str, "", strArr);
    }

    public static void insertIntoPartition(String str, String str2, String... strArr) throws Exception {
        sEnv.executeSql(String.format("INSERT INTO `%s` %s VALUES %s;", str, str2, String.join(",", strArr))).await();
    }

    public static void insertIntoFromTable(String str, String str2) throws Exception {
        sEnv.executeSql(String.format("INSERT INTO `%s` SELECT * FROM `%s`;", str2, str)).await();
    }

    public static void insertOverwrite(String str, String... strArr) throws Exception {
        insertOverwritePartition(str, "", strArr);
    }

    public static void insertOverwritePartition(String str, String str2, String... strArr) throws Exception {
        bEnv.executeSql(String.format("INSERT OVERWRITE `%s` %s VALUES %s;", str, str2, String.join(",", strArr))).await();
    }

    public static String buildSimpleQuery(String str) {
        return buildQuery(str, "*", "");
    }

    public static String buildQuery(String str, String str2, String str3) {
        return buildQueryWithTableOptions(str, str2, str3, new HashMap());
    }

    public static String buildQueryWithTableOptions(String str, String str2, String str3, Long l, Map<String, String> map) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(str2);
        arrayList.add(str);
        arrayList.add(buildTableOptionsSpec(map));
        arrayList.add(str3);
        StringBuilder sb = new StringBuilder("SELECT %s FROM `%s` %s %s");
        if (null != l) {
            sb.append(" limit %s");
            arrayList.add(l);
        }
        return String.format(sb.toString(), arrayList.toArray());
    }

    public static String buildQueryWithTableOptions(String str, String str2, String str3, Map<String, String> map) {
        return buildQueryWithTableOptions(str, str2, str3, null, map);
    }

    public static void checkFileStorePath(String str, List<String> list) {
        String format = String.format("/%s.db/%s", sEnv.getCurrentDatabase(), str);
        Assertions.assertThat(Paths.get(warehouse, format, "snapshot")).exists();
        Assertions.assertThat(Paths.get(warehouse, format, "manifest")).exists();
        if (list.isEmpty()) {
            list = Collections.singletonList("");
        }
        list.stream().map(str2 -> {
            return str2.replaceAll(",", "/");
        }).map(str3 -> {
            return str3.replaceAll("null", "__DEFAULT_PARTITION__");
        }).forEach(str4 -> {
            Assertions.assertThat(Paths.get(warehouse, format, str4)).exists();
            Assertions.assertThat(Paths.get(warehouse, format, str4, "bucket-0")).exists();
        });
    }

    public static void testBatchRead(String str, List<Row> list) throws Exception {
        CloseableIterator collect = bEnv.executeSql(str).collect();
        BlockingIterator of = BlockingIterator.of(collect);
        Throwable th = null;
        try {
            if (!list.isEmpty()) {
                Assertions.assertThat(toInsertOnlyRows(of.collect(list.size(), TIME_OUT.getSize(), TIME_OUT.getUnit()))).containsExactlyInAnyOrderElementsOf(toInsertOnlyRows(list));
            }
            Assertions.assertThat(collect.hasNext()).isFalse();
            if (of != null) {
                if (0 == 0) {
                    of.close();
                    return;
                }
                try {
                    of.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (of != null) {
                if (0 != 0) {
                    try {
                        of.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    of.close();
                }
            }
            throw th3;
        }
    }

    private static List<Row> toInsertOnlyRows(List<Row> list) {
        ArrayList arrayList = new ArrayList();
        for (Row row : list) {
            Assertions.assertThat(row.getKind()).isIn(new Object[]{RowKind.INSERT, RowKind.UPDATE_AFTER});
            Row row2 = new Row(row.getArity());
            for (int i = 0; i < row.getArity(); i++) {
                row2.setField(i, row.getField(i));
            }
            arrayList.add(row2);
        }
        return arrayList;
    }

    public static BlockingIterator<Row, Row> testStreamingRead(String str, List<Row> list) throws Exception {
        BlockingIterator<Row, Row> of = BlockingIterator.of(sEnv.executeSql(str).collect());
        validateStreamingReadResult(of, list);
        return of;
    }

    public static BlockingIterator<Row, Row> testStreamingReadWithReadFirst(String str, String str2, String str3, List<Row> list) throws Exception {
        BlockingIterator<Row, Row> of = BlockingIterator.of(sEnv.executeSql(str3).collect());
        insertIntoFromTable(str, str2);
        validateStreamingReadResult(of, list);
        return of;
    }

    public static void validateStreamingReadResult(BlockingIterator<Row, Row> blockingIterator, List<Row> list) throws Exception {
        if (list.isEmpty()) {
            assertNoMoreRecords(blockingIterator);
        } else {
            Assertions.assertThat(blockingIterator.collect(list.size())).containsExactlyInAnyOrderElementsOf(list);
        }
    }

    public static void assertNoMoreRecords(BlockingIterator<Row, Row> blockingIterator) {
        List emptyList = Collections.emptyList();
        try {
            emptyList = blockingIterator.collect(1, 5L, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
        }
        Assertions.assertThat(emptyList).isEmpty();
    }

    public static String buildDdl(String str, List<String> list, List<String> list2, List<String> list3, Map<String, String> map) {
        return String.format("CREATE TABLE `%s`(%s %s) %s %s;", str, String.join(",", list), buildPkConstraint(list2), buildPartitionSpec(list3), buildOptionsSpec(map));
    }

    private static String buildPkConstraint(List<String> list) {
        return !list.isEmpty() ? String.format(",PRIMARY KEY (%s) NOT ENFORCED", String.join(",", list)) : "";
    }

    private static String buildPartitionSpec(List<String> list) {
        return !list.isEmpty() ? String.format("PARTITIONED BY (%s)", String.join(",", list)) : "";
    }

    private static String buildOptionsSpec(Map<String, String> map) {
        return !map.isEmpty() ? String.format("WITH ( %s )", optionsToString(map)) : "";
    }

    private static String buildTableOptionsSpec(Map<String, String> map) {
        return !map.isEmpty() ? String.format("/*+ OPTIONS ( %s ) */", optionsToString(map)) : "";
    }

    private static String optionsToString(Map<String, String> map) {
        ArrayList arrayList = new ArrayList();
        map.forEach((str, str2) -> {
            arrayList.add(String.format("'%s' = '%s'", str, str2));
        });
        return String.join(",", arrayList);
    }
}
