package org.apache.hudi.table;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.util.AvroSchemaConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/table/HoodieTableFactory.class */
public class HoodieTableFactory implements TableSourceFactory<RowData>, TableSinkFactory<RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieTableFactory.class);
    public static final String FACTORY_ID = "hudi";

    public TableSource<RowData> createTableSource(TableSourceFactory.Context context) {
        Configuration fromMap = FlinkOptions.fromMap(context.getTable().getOptions());
        TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
        setupConfOptions(fromMap, context.getObjectIdentifier().getObjectName(), context.getTable(), physicalSchema);
        try {
            return new HoodieTableSource(physicalSchema, new Path((String) fromMap.getOptional(FlinkOptions.PATH).orElseThrow(() -> {
                return new ValidationException("Option [path] should not be empty.");
            })), context.getTable().getPartitionKeys(), fromMap.getString(FlinkOptions.PARTITION_DEFAULT_NAME), fromMap);
        } catch (Throwable th) {
            LOG.error("Create table source error", th);
            throw new HoodieException(th);
        }
    }

    public TableSink<RowData> createTableSink(TableSinkFactory.Context context) {
        Configuration fromMap = FlinkOptions.fromMap(context.getTable().getOptions());
        TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
        setupConfOptions(fromMap, context.getObjectIdentifier().getObjectName(), context.getTable(), physicalSchema);
        return new HoodieTableSink(fromMap, physicalSchema);
    }

    public Map<String, String> requiredContext() {
        HashMap hashMap = new HashMap();
        hashMap.put(FactoryUtil.CONNECTOR.key(), FACTORY_ID);
        return hashMap;
    }

    public List<String> supportedProperties() {
        return Collections.singletonList("*");
    }

    private static void setupConfOptions(Configuration configuration, String str, CatalogTable catalogTable, TableSchema tableSchema) {
        configuration.setString(FlinkOptions.TABLE_NAME.key(), str);
        setupHoodieKeyOptions(configuration, catalogTable);
        inferAvroSchema(configuration, tableSchema.toRowDataType().notNull().getLogicalType());
    }

    private static void setupHoodieKeyOptions(Configuration configuration, CatalogTable catalogTable) {
        List list = (List) catalogTable.getSchema().getPrimaryKey().map((v0) -> {
            return v0.getColumns();
        }).orElse(Collections.emptyList());
        if (list.size() > 0) {
            configuration.setString(FlinkOptions.RECORD_KEY_FIELD, String.join(",", list));
        }
        List partitionKeys = catalogTable.getPartitionKeys();
        if (partitionKeys.size() > 0) {
            configuration.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", partitionKeys));
        }
        if ((list.size() > 1 || partitionKeys.size() > 1) && FlinkOptions.isDefaultValueDefined(configuration, FlinkOptions.KEYGEN_CLASS)) {
            configuration.setString(FlinkOptions.KEYGEN_CLASS, ComplexAvroKeyGenerator.class.getName());
            LOG.info("Table option [{}] is reset to {} because record key or partition path has two or more fields", FlinkOptions.KEYGEN_CLASS.key(), ComplexAvroKeyGenerator.class.getName());
        }
    }

    private static void inferAvroSchema(Configuration configuration, LogicalType logicalType) {
        if (configuration.getOptional(FlinkOptions.READ_AVRO_SCHEMA_PATH).isPresent() || configuration.getOptional(FlinkOptions.READ_AVRO_SCHEMA).isPresent()) {
            return;
        }
        configuration.setString(FlinkOptions.READ_AVRO_SCHEMA, AvroSchemaConverter.convertToSchema(logicalType).toString());
    }
}
