package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;

import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.format.avro.AvroSerializationSchema;
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonSerializationSchema;
import org.apache.seatunnel.format.json.debezium.DebeziumJsonSerializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.json.maxwell.MaxWellJsonSerializationSchema;
import org.apache.seatunnel.format.json.ogg.OggJsonSerializationSchema;
import org.apache.seatunnel.format.text.TextSerializationSchema;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.class */
public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
    private final Function<SeaTunnelRow, String> topicExtractor;
    private final Function<SeaTunnelRow, Integer> partitionExtractor;
    private final Function<SeaTunnelRow, Long> timestampExtractor;
    private final Function<SeaTunnelRow, byte[]> keyExtractor;
    private final Function<SeaTunnelRow, byte[]> valueExtractor;
    private final Function<SeaTunnelRow, Iterable<Header>> headersExtractor;

    @Override // org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer
    public ProducerRecord serializeRow(SeaTunnelRow seaTunnelRow) {
        return new ProducerRecord(this.topicExtractor.apply(seaTunnelRow), this.partitionExtractor.apply(seaTunnelRow), this.timestampExtractor.apply(seaTunnelRow), this.keyExtractor.apply(seaTunnelRow), this.valueExtractor.apply(seaTunnelRow), this.headersExtractor.apply(seaTunnelRow));
    }

    public static DefaultSeaTunnelRowSerializer create(String str, SeaTunnelRowType seaTunnelRowType, MessageFormat messageFormat, String str2) {
        return new DefaultSeaTunnelRowSerializer(topicExtractor(str, seaTunnelRowType, messageFormat), partitionExtractor(null), timestampExtractor(), keyExtractor(null, seaTunnelRowType, messageFormat, str2), valueExtractor(seaTunnelRowType, messageFormat, str2), headersExtractor());
    }

    public static DefaultSeaTunnelRowSerializer create(String str, Integer num, SeaTunnelRowType seaTunnelRowType, MessageFormat messageFormat, String str2) {
        return new DefaultSeaTunnelRowSerializer(topicExtractor(str, seaTunnelRowType, messageFormat), partitionExtractor(num), timestampExtractor(), keyExtractor(null, seaTunnelRowType, messageFormat, str2), valueExtractor(seaTunnelRowType, messageFormat, str2), headersExtractor());
    }

    public static DefaultSeaTunnelRowSerializer create(String str, List<String> list, SeaTunnelRowType seaTunnelRowType, MessageFormat messageFormat, String str2) {
        return new DefaultSeaTunnelRowSerializer(topicExtractor(str, seaTunnelRowType, messageFormat), partitionExtractor(null), timestampExtractor(), keyExtractor(list, seaTunnelRowType, messageFormat, str2), valueExtractor(seaTunnelRowType, messageFormat, str2), headersExtractor());
    }

    private static Function<SeaTunnelRow, Integer> partitionExtractor(Integer num) {
        return seaTunnelRow -> {
            return num;
        };
    }

    private static Function<SeaTunnelRow, Long> timestampExtractor() {
        return seaTunnelRow -> {
            return null;
        };
    }

    private static Function<SeaTunnelRow, Iterable<Header>> headersExtractor() {
        return seaTunnelRow -> {
            return null;
        };
    }

    private static Function<SeaTunnelRow, String> topicExtractor(String str, SeaTunnelRowType seaTunnelRowType, MessageFormat messageFormat) {
        if (MessageFormat.COMPATIBLE_DEBEZIUM_JSON.equals(messageFormat)) {
            int indexOf = seaTunnelRowType.indexOf(CompatibleDebeziumJsonDeserializationSchema.FIELD_TOPIC);
            return seaTunnelRow -> {
                return seaTunnelRow.getField(indexOf).toString();
            };
        }
        Matcher matcher = Pattern.compile("\\$\\{(.*?)\\}", 32).matcher(str);
        if (!matcher.find()) {
            return seaTunnelRow2 -> {
                return str;
            };
        }
        String group = matcher.group(1);
        if (!Arrays.asList(seaTunnelRowType.getFieldNames()).contains(group)) {
            throw new KafkaConnectorException((SeaTunnelErrorCode) CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, String.format("Field name { %s } is not found!", str));
        }
        int indexOf2 = seaTunnelRowType.indexOf(group);
        return seaTunnelRow3 -> {
            Object field = seaTunnelRow3.getField(indexOf2);
            if (field == null) {
                throw new KafkaConnectorException((SeaTunnelErrorCode) CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "The column value is empty!");
            }
            return field.toString();
        };
    }

    private static Function<SeaTunnelRow, byte[]> keyExtractor(List<String> list, SeaTunnelRowType seaTunnelRowType, MessageFormat messageFormat, String str) {
        if (MessageFormat.COMPATIBLE_DEBEZIUM_JSON.equals(messageFormat)) {
            CompatibleDebeziumJsonSerializationSchema compatibleDebeziumJsonSerializationSchema = new CompatibleDebeziumJsonSerializationSchema(seaTunnelRowType, true);
            return seaTunnelRow -> {
                return compatibleDebeziumJsonSerializationSchema.serialize(seaTunnelRow);
            };
        }
        if (list == null || list.isEmpty()) {
            return seaTunnelRow2 -> {
                return null;
            };
        }
        SeaTunnelRowType createKeyType = createKeyType(list, seaTunnelRowType);
        Function<SeaTunnelRow, SeaTunnelRow> createKeyRowExtractor = createKeyRowExtractor(createKeyType, seaTunnelRowType);
        SerializationSchema createSerializationSchema = createSerializationSchema(createKeyType, messageFormat, str, true);
        return seaTunnelRow3 -> {
            return createSerializationSchema.serialize((SeaTunnelRow) createKeyRowExtractor.apply(seaTunnelRow3));
        };
    }

    private static Function<SeaTunnelRow, byte[]> valueExtractor(SeaTunnelRowType seaTunnelRowType, MessageFormat messageFormat, String str) {
        SerializationSchema createSerializationSchema = createSerializationSchema(seaTunnelRowType, messageFormat, str, false);
        return seaTunnelRow -> {
            return createSerializationSchema.serialize(seaTunnelRow);
        };
    }

    private static SeaTunnelRowType createKeyType(List<String> list, SeaTunnelRowType seaTunnelRowType) {
        int[] iArr = new int[list.size()];
        SeaTunnelDataType[] seaTunnelDataTypeArr = new SeaTunnelDataType[list.size()];
        for (int i = 0; i < list.size(); i++) {
            int indexOf = seaTunnelRowType.indexOf(list.get(i));
            iArr[i] = indexOf;
            seaTunnelDataTypeArr[i] = seaTunnelRowType.getFieldType(indexOf);
        }
        return new SeaTunnelRowType((String[]) list.toArray(new String[0]), seaTunnelDataTypeArr);
    }

    private static Function<SeaTunnelRow, SeaTunnelRow> createKeyRowExtractor(SeaTunnelRowType seaTunnelRowType, SeaTunnelRowType seaTunnelRowType2) {
        int[] iArr = new int[seaTunnelRowType.getTotalFields()];
        for (int i = 0; i < seaTunnelRowType.getTotalFields(); i++) {
            iArr[i] = seaTunnelRowType2.indexOf(seaTunnelRowType.getFieldName(i));
        }
        return seaTunnelRow -> {
            Object[] objArr = new Object[seaTunnelRowType.getTotalFields()];
            for (int i2 = 0; i2 < iArr.length; i2++) {
                objArr[i2] = seaTunnelRow.getField(iArr[i2]);
            }
            return new SeaTunnelRow(objArr);
        };
    }

    private static SerializationSchema createSerializationSchema(SeaTunnelRowType seaTunnelRowType, MessageFormat messageFormat, String str, boolean z) {
        switch (messageFormat) {
            case JSON:
                return new JsonSerializationSchema(seaTunnelRowType);
            case TEXT:
                return TextSerializationSchema.builder().seaTunnelRowType(seaTunnelRowType).delimiter(str).build();
            case CANAL_JSON:
                return new CanalJsonSerializationSchema(seaTunnelRowType);
            case OGG_JSON:
                return new OggJsonSerializationSchema(seaTunnelRowType);
            case DEBEZIUM_JSON:
                return new DebeziumJsonSerializationSchema(seaTunnelRowType);
            case MAXWELL_JSON:
                return new MaxWellJsonSerializationSchema(seaTunnelRowType);
            case COMPATIBLE_DEBEZIUM_JSON:
                return new CompatibleDebeziumJsonSerializationSchema(seaTunnelRowType, z);
            case AVRO:
                return new AvroSerializationSchema(seaTunnelRowType);
            default:
                throw new SeaTunnelJsonFormatException((SeaTunnelErrorCode) CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + messageFormat);
        }
    }

    public DefaultSeaTunnelRowSerializer(Function<SeaTunnelRow, String> function, Function<SeaTunnelRow, Integer> function2, Function<SeaTunnelRow, Long> function3, Function<SeaTunnelRow, byte[]> function4, Function<SeaTunnelRow, byte[]> function5, Function<SeaTunnelRow, Iterable<Header>> function6) {
        this.topicExtractor = function;
        this.partitionExtractor = function2;
        this.timestampExtractor = function3;
        this.keyExtractor = function4;
        this.valueExtractor = function5;
        this.headersExtractor = function6;
    }
}
