package org.apache.flink.table.store.connector;

import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.kafka.KafkaLogOptions;
import org.apache.flink.table.store.kafka.KafkaTableTestBase;
import org.apache.flink.table.store.log.LogOptions;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;

/* loaded from: input_file:org/apache/flink/table/store/connector/ReadWriteTableTestBase.class */
public class ReadWriteTableTestBase extends KafkaTableTestBase {
    protected String rootPath;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/table/store/connector/ReadWriteTableTestBase$WatermarkSpec.class */
    public static class WatermarkSpec {
        String columnName;
        String expressionAsString;

        private WatermarkSpec(String str, String str2) {
            this.columnName = str;
            this.expressionAsString = str2;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public static WatermarkSpec of(String str, String str2) {
            return new WatermarkSpec(str, str2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkFileStorePath(StreamTableEnvironment streamTableEnvironment, String str, @Nullable String str2) {
        String relativeTablePath = FileStoreOptions.relativeTablePath(ObjectIdentifier.of(streamTableEnvironment.getCurrentCatalog(), streamTableEnvironment.getCurrentDatabase(), str));
        Assertions.assertThat(Paths.get(this.rootPath, relativeTablePath, "snapshot")).exists();
        Assertions.assertThat(Paths.get(this.rootPath, relativeTablePath, "manifest")).exists();
        if (str2 == null) {
            Assertions.assertThat(Paths.get(this.rootPath, relativeTablePath, "bucket-0")).exists();
        } else {
            ((List) Arrays.stream(str2.split(";")).map(str3 -> {
                return str3.replaceAll(":", "=");
            }).map(str4 -> {
                return str4.replaceAll(",", "/");
            }).map(str5 -> {
                return str5.replaceAll("null", "__DEFAULT_PARTITION__");
            }).collect(Collectors.toList())).forEach(str6 -> {
                Assertions.assertThat(Paths.get(this.rootPath, relativeTablePath, str6)).exists();
                Assertions.assertThat(Paths.get(this.rootPath, relativeTablePath, str6, "bucket-0")).exists();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static BlockingIterator<Row, Row> collectAndCheck(StreamTableEnvironment streamTableEnvironment, String str, Map<String, String> map, @Nullable String str2, List<Row> list) throws Exception {
        ArrayList arrayList = new ArrayList();
        BlockingIterator<Row, Row> collect = collect(streamTableEnvironment, str2 == null ? ShowCreateUtil.buildSimpleSelectQuery(str, map) : ShowCreateUtil.buildSelectQuery(str, map, str2, Collections.emptyList()), list.size(), arrayList);
        Assertions.assertThat(arrayList).containsExactlyInAnyOrderElementsOf(list);
        return collect;
    }

    protected static BlockingIterator<Row, Row> collect(StreamTableEnvironment streamTableEnvironment, String str, int i, List<Row> list) throws Exception {
        BlockingIterator<Row, Row> of = BlockingIterator.of(streamTableEnvironment.executeSql(str).collect());
        list.addAll(of.collect(i));
        return of;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void assertNoMoreRecords(BlockingIterator<Row, Row> blockingIterator) throws Exception {
        List emptyList = Collections.emptyList();
        try {
            emptyList = blockingIterator.collect(1, 5L, TimeUnit.SECONDS);
            blockingIterator.close();
        } catch (TimeoutException e) {
        }
        Assertions.assertThat(emptyList).isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String collectAndCheckBatchReadWrite(List<String> list, List<String> list2, @Nullable String str, List<String> list3, List<Row> list4) throws Exception {
        return (String) collectAndCheckUnderSameEnv(false, false, true, list, list2, Collections.emptyList(), null, true, Collections.emptyMap(), str, list3, list4).f0;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String collectAndCheckStreamingReadWriteWithClose(boolean z, List<String> list, List<String> list2, Map<String, String> map, @Nullable String str, List<String> list3, List<Row> list4) throws Exception {
        Tuple2<String, BlockingIterator<Row, Row>> collectAndCheckUnderSameEnv = collectAndCheckUnderSameEnv(true, z, false, list, list2, Collections.emptyList(), null, true, map, str, list3, list4);
        ((BlockingIterator) collectAndCheckUnderSameEnv.f1).close();
        return (String) collectAndCheckUnderSameEnv.f0;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Tuple2<String, BlockingIterator<Row, Row>> collectAndCheckStreamingReadWriteWithoutClose(List<String> list, List<String> list2, Map<String, String> map, @Nullable String str, List<String> list3, List<Row> list4) throws Exception {
        return collectAndCheckUnderSameEnv(true, true, false, list, list2, Collections.emptyList(), null, true, map, str, list3, list4);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void collectLatestLogAndCheck(boolean z, List<String> list, List<String> list2, @Nullable String str, List<String> list3, List<Row> list4) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("log." + LogOptions.SCAN.key(), LogOptions.LogStartupMode.LATEST.name().toLowerCase());
        ((BlockingIterator) collectAndCheckUnderSameEnv(true, true, z, list, list2, Collections.emptyList(), null, false, hashMap, str, list3, list4).f1).close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void collectChangelogFromTimestampAndCheck(boolean z, List<String> list, List<String> list2, long j, List<Row> list3) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("log." + LogOptions.SCAN.key(), "from-timestamp");
        hashMap.put("log." + LogOptions.SCAN_TIMESTAMP_MILLS.key(), String.valueOf(j));
        ((BlockingIterator) collectAndCheckUnderSameEnv(true, true, z, list, list2, Collections.emptyList(), null, true, hashMap, null, Collections.emptyList(), list3).f1).close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Tuple2<String, String> createSourceAndManagedTable(boolean z, boolean z2, boolean z3, List<String> list, List<String> list2, List<Tuple2<String, String>> list3, @Nullable WatermarkSpec watermarkSpec) throws Exception {
        String prepareHelperSourceWithInsertOnlyRecords;
        String prepareHelperSourceWithChangelogRecords;
        HashMap hashMap = new HashMap();
        this.rootPath = TEMPORARY_FOLDER.newFolder().getPath();
        hashMap.put(FileStoreOptions.PATH.key(), this.rootPath);
        if (z2) {
            hashMap.put(TableStoreFactoryOptions.LOG_SYSTEM.key(), "kafka");
            hashMap.put("log." + KafkaLogOptions.BOOTSTRAP_SERVERS.key(), getBootstrapServers());
        }
        String str = "source_table_" + UUID.randomUUID();
        String str2 = "managed_table_" + UUID.randomUUID();
        EnvironmentSettings.Builder inStreamingMode = EnvironmentSettings.newInstance().inStreamingMode();
        if (z) {
            if (z3) {
                prepareHelperSourceWithChangelogRecords = ReadWriteTableTestUtil.prepareHelperSourceWithInsertOnlyRecords(str, list, list2, watermarkSpec != null);
            } else {
                prepareHelperSourceWithChangelogRecords = ReadWriteTableTestUtil.prepareHelperSourceWithChangelogRecords(str, list, list2, watermarkSpec != null);
            }
            prepareHelperSourceWithInsertOnlyRecords = prepareHelperSourceWithChangelogRecords;
            this.env = buildStreamEnv();
            inStreamingMode.inStreamingMode();
        } else {
            prepareHelperSourceWithInsertOnlyRecords = ReadWriteTableTestUtil.prepareHelperSourceWithInsertOnlyRecords(str, list, list2, watermarkSpec != null);
            this.env = buildBatchEnv();
            inStreamingMode.inBatchMode();
        }
        this.tEnv = StreamTableEnvironment.create(this.env, inStreamingMode.build());
        this.tEnv.executeSql(prepareHelperSourceWithInsertOnlyRecords);
        if (list3.isEmpty()) {
            this.tEnv.executeSql(ShowCreateUtil.createTableLikeDDL(str, str2, hashMap, watermarkSpec));
        } else {
            String currentCatalog = this.tEnv.getCurrentCatalog();
            String currentDatabase = this.tEnv.getCurrentDatabase();
            ResolvedCatalogTable table = ((Catalog) this.tEnv.getCatalog(currentCatalog).get()).getTable(new ObjectPath(currentDatabase, str));
            Schema.Builder fromSchema = Schema.newBuilder().fromSchema(table.getUnresolvedSchema());
            list3.forEach(tuple2 -> {
                fromSchema.columnByExpression((String) tuple2.f0, (String) tuple2.f1);
            });
            if (watermarkSpec != null) {
                fromSchema.watermark(watermarkSpec.columnName, watermarkSpec.expressionAsString);
            }
            TableDescriptor.Builder schema = TableDescriptor.forManaged().partitionedBy((String[]) table.getPartitionKeys().toArray(new String[0])).schema(fromSchema.build());
            schema.getClass();
            hashMap.forEach(schema::option);
            this.tEnv.createTable(ObjectIdentifier.of(currentCatalog, currentDatabase, str2).asSerializableString(), schema.build());
        }
        return new Tuple2<>(str, str2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Tuple2<String, BlockingIterator<Row, Row>> collectAndCheckUnderSameEnv(boolean z, boolean z2, boolean z3, List<String> list, List<String> list2, List<Tuple2<String, String>> list3, @Nullable WatermarkSpec watermarkSpec, boolean z4, Map<String, String> map, @Nullable String str, List<String> list4, List<Row> list5) throws Exception {
        BlockingIterator of;
        Tuple2<String, String> createSourceAndManagedTable = createSourceAndManagedTable(z, z2, z3, list, list2, list3, watermarkSpec);
        String str2 = (String) createSourceAndManagedTable.f0;
        String str3 = (String) createSourceAndManagedTable.f1;
        String buildInsertIntoQuery = ShowCreateUtil.buildInsertIntoQuery(str2, str3);
        String buildSelectQuery = ShowCreateUtil.buildSelectQuery(str3, map, str, list4);
        if (z4) {
            this.tEnv.executeSql(buildInsertIntoQuery).await();
            of = BlockingIterator.of(this.tEnv.executeSql(buildSelectQuery).collect());
        } else {
            of = BlockingIterator.of(this.tEnv.executeSql(buildSelectQuery).collect());
            this.tEnv.executeSql(buildInsertIntoQuery).await();
        }
        if (list5.isEmpty()) {
            assertNoMoreRecords(of);
        } else {
            Assertions.assertThat(of.collect(list5.size(), 10L, TimeUnit.SECONDS)).containsExactlyInAnyOrderElementsOf(list5);
        }
        return Tuple2.of(str3, of);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void prepareEnvAndOverwrite(String str, Map<String, String> map, List<String[]> list) throws Exception {
        prepareEnvAndOverwrite(str, ShowCreateUtil.buildInsertOverwriteQuery(str, map, list));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void prepareEnvAndOverwrite(String str, String str2) throws Exception {
        StreamTableEnvironment create = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
        registerTable(create, str);
        create.executeSql(str2).await();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerTable(StreamTableEnvironment streamTableEnvironment, String str) throws Exception {
        String currentCatalog = this.tEnv.getCurrentCatalog();
        ObjectPath objectPath = new ObjectPath(this.tEnv.getCurrentDatabase(), str);
        ((Catalog) streamTableEnvironment.getCatalog(currentCatalog).get()).createTable(objectPath, ((Catalog) this.tEnv.getCatalog(currentCatalog).get()).getTable(objectPath), false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static StreamExecutionEnvironment buildStreamEnv() {
        return buildStreamEnv(2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static StreamExecutionEnvironment buildStreamEnv(int i) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        executionEnvironment.enableCheckpointing(100L);
        executionEnvironment.setParallelism(i);
        return executionEnvironment;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static StreamExecutionEnvironment buildBatchEnv() {
        return buildBatchEnv(2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static StreamExecutionEnvironment buildBatchEnv(int i) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        executionEnvironment.setParallelism(i);
        return executionEnvironment;
    }
}
