package org.apache.paimon.flink.kafka;

import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.factories.FlinkFactoryUtil;
import org.apache.paimon.flink.log.LogStoreRegister;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.options.OptionsUtils;

/* loaded from: input_file:org/apache/paimon/flink/kafka/KafkaLogStoreFactory.class */
public class KafkaLogStoreFactory implements LogStoreTableFactory {
    public static final String IDENTIFIER = "kafka";
    public static final String KAFKA_PREFIX = "kafka.";

    @Override // org.apache.paimon.factories.Factory
    public String identifier() {
        return IDENTIFIER;
    }

    private String topic(DynamicTableFactory.Context context) {
        return (String) context.getCatalogTable().getOptions().get(KafkaLogOptions.TOPIC.key());
    }

    @Override // org.apache.paimon.flink.log.LogStoreTableFactory
    public KafkaLogSourceProvider createSourceProvider(DynamicTableFactory.Context context, DynamicTableSource.Context context2, @Nullable int[][] iArr) {
        FlinkFactoryUtil.FlinkTableFactoryHelper createFlinkTableFactoryHelper = FlinkFactoryUtil.createFlinkTableFactoryHelper(this, context);
        ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
        DataType physicalRowDataType = resolvedSchema.toPhysicalRowDataType();
        DeserializationSchema deserializationSchema = null;
        int[] primaryKeyIndexes = getPrimaryKeyIndexes(resolvedSchema);
        if (primaryKeyIndexes.length > 0) {
            deserializationSchema = (DeserializationSchema) LogStoreTableFactory.getKeyDecodingFormat(createFlinkTableFactoryHelper).createRuntimeDecoder(context2, DataTypeUtils.projectRow(physicalRowDataType, primaryKeyIndexes));
        }
        DeserializationSchema deserializationSchema2 = (DeserializationSchema) LogStoreTableFactory.getValueDecodingFormat(createFlinkTableFactoryHelper).createRuntimeDecoder(context2, physicalRowDataType);
        Options options = toOptions(createFlinkTableFactoryHelper.getOptions());
        return new KafkaLogSourceProvider(topic(context), toKafkaProperties(options), physicalRowDataType, primaryKeyIndexes, deserializationSchema, deserializationSchema2, iArr, (CoreOptions.LogConsistency) options.get(CoreOptions.LOG_CONSISTENCY), CoreOptions.startupMode(options), (Long) options.get(CoreOptions.SCAN_TIMESTAMP_MILLIS));
    }

    @Override // org.apache.paimon.flink.log.LogStoreTableFactory
    public KafkaLogSinkProvider createSinkProvider(DynamicTableFactory.Context context, DynamicTableSink.Context context2) {
        FlinkFactoryUtil.FlinkTableFactoryHelper createFlinkTableFactoryHelper = FlinkFactoryUtil.createFlinkTableFactoryHelper(this, context);
        ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
        DataType physicalRowDataType = resolvedSchema.toPhysicalRowDataType();
        SerializationSchema serializationSchema = null;
        int[] primaryKeyIndexes = getPrimaryKeyIndexes(resolvedSchema);
        if (primaryKeyIndexes.length > 0) {
            serializationSchema = (SerializationSchema) LogStoreTableFactory.getKeyEncodingFormat(createFlinkTableFactoryHelper).createRuntimeEncoder(context2, DataTypeUtils.projectRow(physicalRowDataType, primaryKeyIndexes));
        }
        SerializationSchema serializationSchema2 = (SerializationSchema) LogStoreTableFactory.getValueEncodingFormat(createFlinkTableFactoryHelper).createRuntimeEncoder(context2, physicalRowDataType);
        Options options = toOptions(createFlinkTableFactoryHelper.getOptions());
        return new KafkaLogSinkProvider(topic(context), toKafkaProperties(options), serializationSchema, serializationSchema2, (CoreOptions.LogConsistency) options.get(CoreOptions.LOG_CONSISTENCY), (CoreOptions.LogChangelogMode) options.get(CoreOptions.LOG_CHANGELOG_MODE));
    }

    @Override // org.apache.paimon.flink.log.LogStoreTableFactory
    public LogStoreRegister createRegister(LogStoreTableFactory.RegisterContext registerContext) {
        return new KafkaLogStoreRegister(registerContext);
    }

    private int[] getPrimaryKeyIndexes(ResolvedSchema resolvedSchema) {
        List columnNames = resolvedSchema.getColumnNames();
        return (int[]) resolvedSchema.getPrimaryKey().map((v0) -> {
            return v0.getColumns();
        }).map(list -> {
            Stream stream = list.stream();
            columnNames.getClass();
            return stream.mapToInt((v1) -> {
                return r1.indexOf(v1);
            }).toArray();
        }).orElseGet(() -> {
            return new int[0];
        });
    }

    public static Properties toKafkaProperties(Options options) {
        Properties properties = new Properties();
        properties.putAll(OptionsUtils.convertToPropertiesPrefixKey(options.toMap(), KAFKA_PREFIX));
        if (options.get(CoreOptions.LOG_CONSISTENCY) == CoreOptions.LogConsistency.TRANSACTIONAL) {
            properties.setProperty("isolation.level", "read_committed");
        }
        return properties;
    }

    private Options toOptions(ReadableConfig readableConfig) {
        Options options = new Options();
        Map map = ((Configuration) readableConfig).toMap();
        options.getClass();
        map.forEach(options::setString);
        return options;
    }
}
