package org.apache.flink.table.store.connector;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.ManagedTableFactory;
import org.apache.flink.table.store.connector.sink.TableStoreSink;
import org.apache.flink.table.store.connector.source.TableStoreSource;
import org.apache.flink.table.store.connector.utils.TableConfigUtils;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import org.apache.flink.table.store.log.LogOptions;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.table.types.logical.RowType;

/* loaded from: input_file:org/apache/flink/table/store/connector/TableStoreFactory.class */
public class TableStoreFactory implements ManagedTableFactory, DynamicTableSourceFactory, DynamicTableSinkFactory {
    public Map<String, String> enrichOptions(DynamicTableFactory.Context context) {
        HashMap hashMap = new HashMap(context.getCatalogTable().getOptions());
        TableConfigUtils.extractConfiguration(context.getConfiguration()).toMap().forEach((str, str2) -> {
            if (str.startsWith("table-store.")) {
                hashMap.putIfAbsent(str.substring("table-store.".length()), str2);
            }
        });
        return hashMap;
    }

    public void onCreateTable(DynamicTableFactory.Context context, boolean z) {
        Map options = context.getCatalogTable().getOptions();
        Path path = FileStoreOptions.path(options, context.getObjectIdentifier());
        try {
            if (path.getFileSystem().exists(path) && !z) {
                throw new TableException(String.format("Failed to create file store path. Reason: directory %s exists for table %s. Suggestion: please try `DESCRIBE TABLE %s` to first check whether table exists in current catalog. If table exists in catalog, and data files under current path are valid, please use `CREATE TABLE IF NOT EXISTS` ddl instead. Otherwise, please choose another table name or manually delete the current path and try again.", path, context.getObjectIdentifier().asSerializableString(), context.getObjectIdentifier().asSerializableString()));
            }
            path.getFileSystem().mkdirs(path);
            createOptionalLogStoreFactory(context).ifPresent(logStoreTableFactory -> {
                logStoreTableFactory.onCreateTable(createLogContext(context), Integer.parseInt((String) options.getOrDefault(FileStoreOptions.BUCKET.key(), ((Integer) FileStoreOptions.BUCKET.defaultValue()).toString())), z);
            });
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public void onDropTable(DynamicTableFactory.Context context, boolean z) {
        Path path = FileStoreOptions.path(context.getCatalogTable().getOptions(), context.getObjectIdentifier());
        try {
            if (path.getFileSystem().exists(path)) {
                path.getFileSystem().delete(path, true);
            } else if (!z) {
                throw new TableException(String.format("Failed to delete file store path. Reason: directory %s doesn't exist for table %s. Suggestion: please try `DROP TABLE IF EXISTS` ddl instead.", path, context.getObjectIdentifier().asSerializableString()));
            }
            createOptionalLogStoreFactory(context).ifPresent(logStoreTableFactory -> {
                logStoreTableFactory.onDropTable(createLogContext(context), z);
            });
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public Map<String, String> onCompactTable(DynamicTableFactory.Context context, CatalogPartitionSpec catalogPartitionSpec) {
        throw new UnsupportedOperationException("Not implement yet");
    }

    /* renamed from: createDynamicTableSource, reason: merged with bridge method [inline-methods] */
    public TableStoreSource m1createDynamicTableSource(DynamicTableFactory.Context context) {
        return new TableStoreSource(buildTableStore(context), context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING, createLogContext(context), createOptionalLogStoreFactory(context).orElse(null));
    }

    /* renamed from: createDynamicTableSink, reason: merged with bridge method [inline-methods] */
    public TableStoreSink m2createDynamicTableSink(DynamicTableFactory.Context context) {
        return new TableStoreSink(buildTableStore(context), createLogContext(context), createOptionalLogStoreFactory(context).orElse(null));
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return Collections.emptySet();
    }

    public Set<ConfigOption<?>> optionalOptions() {
        Set<ConfigOption<?>> allOptions = FileStoreOptions.allOptions();
        allOptions.addAll(MergeTreeOptions.allOptions());
        allOptions.addAll(TableStoreFactoryOptions.allOptions());
        return allOptions;
    }

    private static Optional<LogStoreTableFactory> createOptionalLogStoreFactory(DynamicTableFactory.Context context) {
        Configuration configuration = new Configuration();
        Map options = context.getCatalogTable().getOptions();
        configuration.getClass();
        options.forEach(configuration::setString);
        if (configuration.get(TableStoreFactoryOptions.LOG_SYSTEM) != null) {
            return Optional.of(LogStoreTableFactory.discoverLogStoreFactory(context.getClassLoader(), (String) configuration.get(TableStoreFactoryOptions.LOG_SYSTEM)));
        }
        validateFileStoreContinuous(configuration);
        return Optional.empty();
    }

    private static void validateFileStoreContinuous(Configuration configuration) {
        DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration(configuration, "log.");
        if (((LogOptions.LogChangelogMode) delegatingConfiguration.get(LogOptions.CHANGELOG_MODE)) == LogOptions.LogChangelogMode.UPSERT) {
            throw new ValidationException("File store continuous reading dose not support upsert changelog mode.");
        }
        if (((LogOptions.LogConsistency) delegatingConfiguration.get(LogOptions.CONSISTENCY)) == LogOptions.LogConsistency.EVENTUAL) {
            throw new ValidationException("File store continuous reading dose not support eventual consistency mode.");
        }
        if (((LogOptions.LogStartupMode) delegatingConfiguration.get(LogOptions.SCAN)) == LogOptions.LogStartupMode.FROM_TIMESTAMP) {
            throw new ValidationException("File store continuous reading dose not support from_timestamp scan mode, you can add timestamp filters instead.");
        }
    }

    private static DynamicTableFactory.Context createLogContext(DynamicTableFactory.Context context) {
        return new FactoryUtil.DefaultDynamicTableContext(context.getObjectIdentifier(), context.getCatalogTable().copy(filterLogStoreOptions(context.getCatalogTable().getOptions())), filterLogStoreOptions(context.getEnrichmentOptions()), context.getConfiguration(), context.getClassLoader(), context.isTemporary());
    }

    @VisibleForTesting
    static Map<String, String> filterLogStoreOptions(Map<String, String> map) {
        return (Map) map.entrySet().stream().filter(entry -> {
            return !((String) entry.getKey()).equals(TableStoreFactoryOptions.LOG_SYSTEM.key());
        }).filter(entry2 -> {
            return ((String) entry2.getKey()).startsWith("log.");
        }).collect(Collectors.toMap(entry3 -> {
            return ((String) entry3.getKey()).substring("log.".length());
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    @VisibleForTesting
    TableStore buildTableStore(DynamicTableFactory.Context context) {
        ResolvedCatalogTable catalogTable = context.getCatalogTable();
        ResolvedSchema resolvedSchema = catalogTable.getResolvedSchema();
        RowType logicalType = resolvedSchema.toPhysicalRowDataType().getLogicalType();
        List partitionKeys = catalogTable.getPartitionKeys();
        int[] iArr = new int[0];
        if (resolvedSchema.getPrimaryKey().isPresent()) {
            Stream stream = ((UniqueConstraint) resolvedSchema.getPrimaryKey().get()).getColumns().stream();
            List fieldNames = logicalType.getFieldNames();
            fieldNames.getClass();
            iArr = stream.mapToInt((v1) -> {
                return r1.indexOf(v1);
            }).toArray();
        }
        TableStore withPrimaryKeys = new TableStore(Configuration.fromMap(catalogTable.getOptions())).withTableIdentifier(context.getObjectIdentifier()).withSchema(logicalType).withPrimaryKeys(iArr);
        Stream stream2 = partitionKeys.stream();
        List fieldNames2 = logicalType.getFieldNames();
        fieldNames2.getClass();
        return withPrimaryKeys.withPartitions(stream2.mapToInt((v1) -> {
            return r2.indexOf(v1);
        }).toArray());
    }
}
