package org.apache.flink.table.store.connector;

import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.TableStoreTestBase;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.store.kafka.KafkaLogOptions;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.types.logical.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.class */
public class TableStoreManagedFactoryTest {
    private static final String CATALOG = "catalog";
    private static final String DATABASE = "database";
    private static final String TABLE = "table";
    private static final ObjectIdentifier TABLE_IDENTIFIER;
    private final TableStoreManagedFactory tableStoreManagedFactory = new TableStoreManagedFactory();

    @TempDir
    private static Path sharedTempDir;
    private DynamicTableFactory.Context context;
    static final /* synthetic */ boolean $assertionsDisabled;

    @MethodSource({"provideOptionsToEnrich"})
    @ParameterizedTest
    public void testEnrichOptions(Map<String, String> map, Map<String, String> map2, Map<String, String> map3) {
        this.context = createNonEnrichedContext(map, map2);
        Assertions.assertThat(this.tableStoreManagedFactory.enrichOptions(this.context)).containsExactlyInAnyOrderEntriesOf(map3);
    }

    @Test
    public void testErrorEnrichOptions() {
        HashMap hashMap = new HashMap();
        hashMap.put("table-store.root-path", "my_path");
        hashMap.put("table-store.path", "another_path");
        this.context = createNonEnrichedContext(hashMap, Collections.emptyMap());
        Assertions.assertThatThrownBy(() -> {
            this.tableStoreManagedFactory.enrichOptions(this.context);
        }).hasMessage("Managed table can not contain table path. You need to remove path in table options or session config.");
        this.context = createNonEnrichedContext(Collections.emptyMap(), Collections.emptyMap());
        Assertions.assertThatThrownBy(() -> {
            this.tableStoreManagedFactory.enrichOptions(this.context);
        }).hasMessage("Please specify a root path by setting session level configuration as `SET 'table-store.root-path' = '...'`.");
    }

    @Test
    public void testEnrichKafkaTopic() {
        HashMap hashMap = new HashMap();
        hashMap.put("table-store.root-path", "my_path");
        hashMap.put("table-store.log.system", "kafka");
        hashMap.put("table-store.kafka.topic", "my_topic");
        this.context = createNonEnrichedContext(hashMap, Collections.emptyMap());
        Assertions.assertThatThrownBy(() -> {
            this.tableStoreManagedFactory.enrichOptions(this.context);
        }).hasMessage("Managed table can not contain custom topic. You need to remove topic in table options or session config.");
        hashMap.remove("table-store.kafka.topic");
        this.context = createNonEnrichedContext(hashMap, Collections.emptyMap());
        Map enrichOptions = this.tableStoreManagedFactory.enrichOptions(this.context);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("path", "my_path/catalog.catalog/database.db/table");
        hashMap2.put("log.system", "kafka");
        hashMap2.put("kafka.topic", "catalog.database.table");
        Assertions.assertThat(enrichOptions).containsExactlyEntriesOf(hashMap2);
    }

    @MethodSource({"provideOptionsToCreate"})
    @ParameterizedTest
    public void testOnCreateTable(Map<String, String> map, boolean z) {
        this.context = createEnrichedContext(map);
        Path path = Paths.get(CoreOptions.path(map).getPath(), new String[0]);
        boolean exists = path.toFile().exists();
        if (!z && exists) {
            Assertions.assertThatThrownBy(() -> {
                this.tableStoreManagedFactory.onCreateTable(this.context, false);
            }).isInstanceOf(TableException.class).hasMessageContaining(String.format("Failed to create file store path. Reason: directory %s exists for table %s. Suggestion: please try `DESCRIBE TABLE %s` to first check whether table exists in current catalog. If table exists in catalog, and data files under current path are valid, please use `CREATE TABLE IF NOT EXISTS` ddl instead. Otherwise, please choose another table name or manually delete the current path and try again.", path, TABLE_IDENTIFIER.asSerializableString(), TABLE_IDENTIFIER.asSerializableString()));
        } else {
            this.tableStoreManagedFactory.onCreateTable(this.context, z);
            Assertions.assertThat(path).exists();
        }
    }

    @MethodSource({"provideOptionsToDrop"})
    @ParameterizedTest
    public void testOnDropTable(Map<String, String> map, boolean z) {
        this.context = createEnrichedContext(map);
        Path path = Paths.get(CoreOptions.path(map).getPath(), new String[0]);
        if (!path.toFile().exists() && !z) {
            Assertions.assertThatThrownBy(() -> {
                this.tableStoreManagedFactory.onDropTable(this.context, false);
            }).isInstanceOf(TableException.class).hasMessageContaining(String.format("Failed to delete file store path. Reason: directory %s doesn't exist for table %s. Suggestion: please try `DROP TABLE IF EXISTS` ddl instead.", path, TABLE_IDENTIFIER.asSerializableString()));
        } else {
            this.tableStoreManagedFactory.onDropTable(this.context, z);
            Assertions.assertThat(path).doesNotExist();
        }
    }

    @MethodSource({"provideResolvedTable"})
    @ParameterizedTest
    public void testCreateAndCheckTableStore(RowType rowType, List<String> list, List<String> list2, TableStoreTestBase.ExpectedResult expectedResult) {
        ResolvedCatalogTable createResolvedTable = TableStoreTestBase.createResolvedTable((Map<String, String>) Collections.singletonMap(CoreOptions.PATH.key(), sharedTempDir.toAbsolutePath() + "/" + UUID.randomUUID()), rowType, list, list2);
        this.context = createEnrichedContext(TABLE_IDENTIFIER, createResolvedTable);
        if (!expectedResult.success) {
            Assertions.assertThatThrownBy(() -> {
                this.tableStoreManagedFactory.onCreateTable(this.context, false);
            }).isInstanceOf(expectedResult.expectedType).hasMessageContaining(expectedResult.expectedMessage);
            return;
        }
        this.tableStoreManagedFactory.onCreateTable(this.context, false);
        FileStoreTable buildFileStoreTable = AbstractTableStoreFactory.buildFileStoreTable(this.context);
        Assertions.assertThat(buildFileStoreTable.schema().partitionKeys().size() > 0).isEqualTo(createResolvedTable.isPartitioned());
        Assertions.assertThat(buildFileStoreTable.schema().primaryKeys().size()).isEqualTo(createResolvedTable.getResolvedSchema().getPrimaryKeyIndexes().length);
        if (buildFileStoreTable.schema().partitionKeys().size() <= 0 || buildFileStoreTable.schema().primaryKeys().size() <= 0) {
            return;
        }
        Assertions.assertThat(buildFileStoreTable.schema().trimmedPrimaryKeys().stream().noneMatch(str -> {
            return buildFileStoreTable.schema().partitionKeys().contains(str);
        })).isTrue();
    }

    @ValueSource(ints = {0, 1, 2})
    @ParameterizedTest
    public void testOnCompactTable(int i) {
        this.context = createEnrichedContext(Collections.emptyMap());
        Map<String, String> emptyMap = i == 0 ? Collections.emptyMap() : i == 1 ? of("foo", "bar") : of("foo", "bar", "meow", "burr");
        Map onCompactTable = this.tableStoreManagedFactory.onCompactTable(this.context, new CatalogPartitionSpec(emptyMap));
        Assertions.assertThat(onCompactTable).containsEntry(FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED.key(), String.valueOf(true));
        Assertions.assertThat(onCompactTable).containsEntry(FlinkConnectorOptions.COMPACTION_PARTITION_SPEC.key(), JsonSerdeUtil.toJson(emptyMap));
    }

    private static ResolvedCatalogTable getDummyTable(Map<String, String> map) {
        return new ResolvedCatalogTable(CatalogTable.of(Schema.derived(), "a comment", Collections.emptyList(), map), ResolvedSchema.of(Collections.emptyList()));
    }

    private static DynamicTableFactory.Context createNonEnrichedContext(Map<String, String> map, Map<String, String> map2) {
        return new FactoryUtil.DefaultDynamicTableContext(TABLE_IDENTIFIER, getDummyTable(map2), Collections.emptyMap(), Configuration.fromMap(map), Thread.currentThread().getContextClassLoader(), false);
    }

    private static DynamicTableFactory.Context createEnrichedContext(Map<String, String> map) {
        return createEnrichedContext(TABLE_IDENTIFIER, getDummyTable(map));
    }

    private static DynamicTableFactory.Context createEnrichedContext(ObjectIdentifier objectIdentifier, ResolvedCatalogTable resolvedCatalogTable) {
        return new FactoryUtil.DefaultDynamicTableContext(objectIdentifier, resolvedCatalogTable, Collections.emptyMap(), Configuration.fromMap(Collections.emptyMap()), Thread.currentThread().getContextClassLoader(), false);
    }

    private static Stream<Arguments> provideOptionsToEnrich() {
        Map<String, String> of = of(CoreOptions.BUCKET.key(), ((Integer) CoreOptions.BUCKET.defaultValue()).toString(), FlinkConnectorOptions.ROOT_PATH.key(), sharedTempDir.toString(), KafkaLogOptions.BOOTSTRAP_SERVERS.key(), "localhost:9092", CoreOptions.LOG_CONSISTENCY.key(), ((CoreOptions.LogConsistency) CoreOptions.LOG_CONSISTENCY.defaultValue()).name());
        Arguments of2 = Arguments.of(new Object[]{addPrefix(of, "table-store.", str -> {
            return true;
        }), Collections.emptyMap(), generateTablePath(of)});
        Arguments of3 = Arguments.of(new Object[]{Collections.emptyMap(), of, generateTablePath(of)});
        HashMap hashMap = new HashMap(of);
        hashMap.remove(FlinkConnectorOptions.ROOT_PATH.key());
        hashMap.remove(CoreOptions.LOG_CONSISTENCY.key());
        Arguments of4 = Arguments.of(new Object[]{addPrefix(of, "table-store.", str2 -> {
            return !hashMap.containsKey(str2);
        }), hashMap, generateTablePath(of)});
        HashMap hashMap2 = new HashMap();
        hashMap2.put("table-store." + CoreOptions.BUCKET.key(), String.valueOf(((Integer) CoreOptions.BUCKET.defaultValue()).intValue() + 1));
        return Stream.of((Object[]) new Arguments[]{of2, of3, of4, Arguments.of(new Object[]{hashMap2, of, generateTablePath(of)})});
    }

    private static Map<String, String> generateTablePath(Map<String, String> map) {
        HashMap hashMap = new HashMap(map);
        String str = (String) hashMap.remove(FlinkConnectorOptions.ROOT_PATH.key());
        if (str != null) {
            hashMap.put(CoreOptions.PATH.key(), str + "/" + FlinkConnectorOptions.relativeTablePath(TABLE_IDENTIFIER));
        }
        return hashMap;
    }

    private static Stream<Arguments> provideOptionsToCreate() {
        Map<String, String> generateTablePath = generateTablePath(of(FlinkConnectorOptions.ROOT_PATH.key(), sharedTempDir.toAbsolutePath().toString()));
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{generateTablePath, false}), Arguments.of(new Object[]{generateTablePath, true}), Arguments.of(new Object[]{generateTablePath, false})});
    }

    private static Stream<Arguments> provideOptionsToDrop() {
        File file = Paths.get(sharedTempDir.toAbsolutePath().toString(), TABLE_IDENTIFIER.asSummaryString()).toFile();
        if (!file.exists()) {
            file.mkdirs();
        }
        Map<String, String> generateTablePath = generateTablePath(of(FlinkConnectorOptions.ROOT_PATH.key(), sharedTempDir.toAbsolutePath().toString()));
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{generateTablePath, false}), Arguments.of(new Object[]{generateTablePath, true}), Arguments.of(new Object[]{generateTablePath, false})});
    }

    private static Stream<Arguments> provideResolvedTable() {
        RowType rowType = TestKeyValueGenerator.DEFAULT_ROW_TYPE;
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{rowType, TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), TestKeyValueGenerator.getPrimaryKeys(TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), new TableStoreTestBase.ExpectedResult().success(true)}), Arguments.of(new Object[]{rowType, TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), Collections.singletonList("shopId"), new TableStoreTestBase.ExpectedResult().success(false).expectedType(IllegalStateException.class).expectedMessage("Primary key constraint [shopId] should include all partition fields [dt, hr]")}), Arguments.of(new Object[]{rowType, TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), new TableStoreTestBase.ExpectedResult().success(false).expectedType(IllegalStateException.class).expectedMessage("Primary key constraint [dt, hr] should not be same with partition fields [dt, hr], this will result in only one record in a partition")})});
    }

    private static Map<String, String> addPrefix(Map<String, String> map, String str, Predicate<String> predicate) {
        HashMap hashMap = new HashMap();
        map.forEach((str2, str3) -> {
            if (predicate.test(str2)) {
                hashMap.put(str + str2, str3);
            }
        });
        return hashMap;
    }

    private static Map<String, String> of(String... strArr) {
        if (!$assertionsDisabled && (strArr == null || strArr.length % 2 != 0)) {
            throw new AssertionError();
        }
        HashMap hashMap = new HashMap();
        for (int i = 0; i < strArr.length - 1; i += 2) {
            hashMap.put(strArr[i], strArr[i + 1]);
        }
        return hashMap;
    }

    static {
        $assertionsDisabled = !TableStoreManagedFactoryTest.class.desiredAssertionStatus();
        TABLE_IDENTIFIER = ObjectIdentifier.of("catalog", "database", TABLE);
    }
}
