package org.apache.flink.cdc.connectors.kafka.sink;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.connectors.kafka.shaded.org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.cdc.connectors.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.flink.cdc.connectors.kafka.shaded.org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.flink.cdc.connectors.kafka.shaded.org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.class */
public class PipelineKafkaRecordSerializationSchema implements KafkaRecordSerializationSchema<Event> {
    private final FlinkKafkaPartitioner<Event> partitioner;
    private final SerializationSchema<Event> valueSerialization;
    private final String unifiedTopic;
    private final boolean addTableToHeaderEnabled;
    public final Map<String, String> customHeaders = new HashMap();
    public static final String NAMESPACE_HEADER_KEY = "namespace";
    public static final String SCHEMA_NAME_HEADER_KEY = "schemaName";
    public static final String TABLE_NAME_HEADER_KEY = "tableName";

    /* JADX INFO: Access modifiers changed from: package-private */
    public PipelineKafkaRecordSerializationSchema(@Nullable FlinkKafkaPartitioner<Event> flinkKafkaPartitioner, SerializationSchema<Event> serializationSchema, String str, boolean z, String str2) {
        this.partitioner = flinkKafkaPartitioner;
        this.valueSerialization = (SerializationSchema) Preconditions.checkNotNull(serializationSchema);
        this.unifiedTopic = str;
        this.addTableToHeaderEnabled = z;
        if (str2.isEmpty()) {
            return;
        }
        for (String str3 : str2.split(";")) {
            String[] split = str3.split(":");
            if (split.length != 2) {
                throw new IllegalArgumentException(KafkaDataSinkOptions.SINK_CUSTOM_HEADER + " is malformed, please refer to the documents");
            }
            this.customHeaders.put(split[0].trim(), split[1].trim());
        }
    }

    @Override // org.apache.flink.cdc.connectors.kafka.shaded.org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
    public ProducerRecord<byte[], byte[]> serialize(Event event, KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext, Long l) {
        ChangeEvent changeEvent = (ChangeEvent) event;
        byte[] serialize = this.valueSerialization.serialize(event);
        if (event instanceof SchemaChangeEvent) {
            return null;
        }
        String tableId = this.unifiedTopic == null ? changeEvent.tableId().toString() : this.unifiedTopic;
        RecordHeaders recordHeaders = new RecordHeaders();
        if (this.addTableToHeaderEnabled) {
            recordHeaders.add(new RecordHeader(NAMESPACE_HEADER_KEY, (changeEvent.tableId().getNamespace() == null ? "" : changeEvent.tableId().getNamespace()).getBytes(StandardCharsets.UTF_8)));
            recordHeaders.add(new RecordHeader(SCHEMA_NAME_HEADER_KEY, (changeEvent.tableId().getSchemaName() == null ? "" : changeEvent.tableId().getSchemaName()).getBytes(StandardCharsets.UTF_8)));
            recordHeaders.add(new RecordHeader(TABLE_NAME_HEADER_KEY, changeEvent.tableId().getTableName().getBytes(StandardCharsets.UTF_8)));
        }
        if (!this.customHeaders.isEmpty()) {
            for (Map.Entry<String, String> entry : this.customHeaders.entrySet()) {
                recordHeaders.add(new RecordHeader(entry.getKey(), entry.getValue().getBytes(StandardCharsets.UTF_8)));
            }
        }
        return new ProducerRecord<>(tableId, extractPartition(changeEvent, serialize, kafkaSinkContext.getPartitionsForTopic(tableId)), null, null, serialize, recordHeaders);
    }

    @Override // org.apache.flink.cdc.connectors.kafka.shaded.org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
    public void open(SerializationSchema.InitializationContext initializationContext, KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext) throws Exception {
        if (this.partitioner != null) {
            this.partitioner.open(kafkaSinkContext.getParallelInstanceId(), kafkaSinkContext.getNumberOfParallelInstances());
        }
        this.valueSerialization.open(initializationContext);
    }

    private Integer extractPartition(ChangeEvent changeEvent, byte[] bArr, int[] iArr) {
        if (this.partitioner != null) {
            return Integer.valueOf(this.partitioner.partition(changeEvent, null, bArr, changeEvent.tableId().toString(), iArr));
        }
        return null;
    }
}
