package org.apache.flink.table.store.kafka;

import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.flink.table.store.shaded.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.types.RowKind;

/* loaded from: input_file:org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.class */
public class KafkaLogSerializationSchema implements KafkaSerializationSchema<SinkRecord> {
    private static final long serialVersionUID = 1;
    private final String topic;

    @Nullable
    private final SerializationSchema<RowData> primaryKeySerializer;
    private final SerializationSchema<RowData> valueSerializer;
    private final CoreOptions.LogChangelogMode changelogMode;

    public KafkaLogSerializationSchema(String str, @Nullable SerializationSchema<RowData> serializationSchema, SerializationSchema<RowData> serializationSchema2, CoreOptions.LogChangelogMode logChangelogMode) {
        this.topic = str;
        this.primaryKeySerializer = serializationSchema;
        this.valueSerializer = serializationSchema2;
        this.changelogMode = logChangelogMode;
        if (logChangelogMode == CoreOptions.LogChangelogMode.UPSERT && serializationSchema == null) {
            throw new IllegalArgumentException("Can not use upsert changelog mode for non-pk table.");
        }
    }

    @Override // org.apache.flink.table.store.shaded.streaming.connectors.kafka.KafkaSerializationSchema
    public void open(SerializationSchema.InitializationContext initializationContext) throws Exception {
        if (this.primaryKeySerializer != null) {
            this.primaryKeySerializer.open(initializationContext);
        }
        this.valueSerializer.open(initializationContext);
    }

    @Override // org.apache.flink.table.store.shaded.streaming.connectors.kafka.KafkaSerializationSchema
    public ProducerRecord<byte[], byte[]> serialize(SinkRecord sinkRecord, @Nullable Long l) {
        RowKind rowKind = sinkRecord.row().getRowKind();
        byte[] bArr = null;
        byte[] bArr2 = null;
        if (this.primaryKeySerializer != null) {
            bArr = this.primaryKeySerializer.serialize(sinkRecord.primaryKey());
            if (this.changelogMode == CoreOptions.LogChangelogMode.ALL || rowKind == RowKind.INSERT || rowKind == RowKind.UPDATE_AFTER) {
                bArr2 = this.valueSerializer.serialize(sinkRecord.row());
            }
        } else {
            bArr2 = this.valueSerializer.serialize(sinkRecord.row());
        }
        return new ProducerRecord<>(this.topic, Integer.valueOf(sinkRecord.bucket()), bArr, bArr2);
    }
}
