package org.apache.flink.table.store.connector;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.sink.TableStoreSink;
import org.apache.flink.table.store.connector.source.TableStoreSource;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/store/connector/AbstractTableStoreFactory.class */
public abstract class AbstractTableStoreFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        return new TableStoreSource(context.getObjectIdentifier(), buildFileStoreTable(context), context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING, context, createOptionalLogStoreFactory(context).orElse(null));
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        return new TableStoreSink(context.getObjectIdentifier(), buildFileStoreTable(context), context, createOptionalLogStoreFactory(context).orElse(null));
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return Collections.emptySet();
    }

    public Set<ConfigOption<?>> optionalOptions() {
        HashSet hashSet = new HashSet();
        hashSet.addAll(FlinkConnectorOptions.getOptions());
        hashSet.addAll(CoreOptions.getOptions());
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Optional<LogStoreTableFactory> createOptionalLogStoreFactory(DynamicTableFactory.Context context) {
        return createOptionalLogStoreFactory(context.getClassLoader(), context.getCatalogTable().getOptions());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Optional<LogStoreTableFactory> createOptionalLogStoreFactory(ClassLoader classLoader, Map<String, String> map) {
        Configuration configuration = new Configuration();
        configuration.getClass();
        map.forEach(configuration::setString);
        if (!((String) configuration.get(FlinkConnectorOptions.LOG_SYSTEM)).equalsIgnoreCase(FlinkConnectorOptions.NONE)) {
            return Optional.of(LogStoreTableFactory.discoverLogStoreFactory(classLoader, (String) configuration.get(FlinkConnectorOptions.LOG_SYSTEM)));
        }
        validateFileStoreContinuous(configuration);
        return Optional.empty();
    }

    private static void validateFileStoreContinuous(Configuration configuration) {
        if (((CoreOptions.LogChangelogMode) configuration.get(CoreOptions.LOG_CHANGELOG_MODE)) == CoreOptions.LogChangelogMode.UPSERT) {
            throw new ValidationException("File store continuous reading dose not support upsert changelog mode.");
        }
        if (((CoreOptions.LogConsistency) configuration.get(CoreOptions.LOG_CONSISTENCY)) == CoreOptions.LogConsistency.EVENTUAL) {
            throw new ValidationException("File store continuous reading dose not support eventual consistency mode.");
        }
        if (((CoreOptions.LogStartupMode) configuration.get(CoreOptions.LOG_SCAN)) == CoreOptions.LogStartupMode.FROM_TIMESTAMP) {
            throw new ValidationException("File store continuous reading dose not support from_timestamp scan mode, you can add timestamp filters instead.");
        }
    }

    static FileStoreTable buildFileStoreTable(DynamicTableFactory.Context context) {
        FileStoreTable create = FileStoreTableFactory.create(Configuration.fromMap(context.getCatalogTable().getOptions()));
        TableSchema schema = create.schema();
        UpdateSchema fromCatalogTable = UpdateSchema.fromCatalogTable(context.getCatalogTable());
        RowType rowType = fromCatalogTable.rowType();
        List partitionKeys = fromCatalogTable.partitionKeys();
        List primaryKeys = fromCatalogTable.primaryKeys();
        Preconditions.checkArgument(schemaEquals(schema.logicalRowType(), rowType), "Flink schema and store schema are not the same, store schema is %s, Flink schema is %s", new Object[]{schema.logicalRowType(), rowType});
        Preconditions.checkArgument(schema.partitionKeys().equals(partitionKeys), "Flink partitionKeys and store partitionKeys are not the same, store partitionKeys is %s, Flink partitionKeys is %s", new Object[]{schema.partitionKeys(), partitionKeys});
        Preconditions.checkArgument(schema.primaryKeys().equals(primaryKeys), "Flink primaryKeys and store primaryKeys are not the same, store primaryKeys is %s, Flink primaryKeys is %s", new Object[]{schema.primaryKeys(), primaryKeys});
        return create;
    }

    @VisibleForTesting
    static boolean schemaEquals(RowType rowType, RowType rowType2) {
        List fields = rowType.getFields();
        List fields2 = rowType2.getFields();
        if (fields.size() != fields2.size()) {
            return false;
        }
        for (int i = 0; i < fields.size(); i++) {
            RowType.RowField rowField = (RowType.RowField) fields.get(i);
            RowType.RowField rowField2 = (RowType.RowField) fields2.get(i);
            if (!rowField.getName().equals(rowField2.getName()) || !rowField.getType().equals(rowField2.getType())) {
                return false;
            }
        }
        return true;
    }
}
