package org.apache.flink.table.store.kafka;

import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.store.shaded.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.store.table.sink.LogSinkFunction;

/* loaded from: input_file:org/apache/flink/table/store/kafka/KafkaLogSinkProvider.class */
public class KafkaLogSinkProvider implements LogSinkProvider {
    private static final long serialVersionUID = 1;
    private final String topic;
    private final Properties properties;

    @Nullable
    private final SerializationSchema<RowData> primaryKeySerializer;
    private final SerializationSchema<RowData> valueSerializer;
    private final CoreOptions.LogConsistency consistency;
    private final CoreOptions.LogChangelogMode changelogMode;

    public KafkaLogSinkProvider(String str, Properties properties, @Nullable SerializationSchema<RowData> serializationSchema, SerializationSchema<RowData> serializationSchema2, CoreOptions.LogConsistency logConsistency, CoreOptions.LogChangelogMode logChangelogMode) {
        this.topic = str;
        this.properties = properties;
        this.primaryKeySerializer = serializationSchema;
        this.valueSerializer = serializationSchema2;
        this.consistency = logConsistency;
        this.changelogMode = logChangelogMode;
    }

    @Override // org.apache.flink.table.store.log.LogSinkProvider
    public LogSinkFunction createSink() {
        FlinkKafkaProducer.Semantic semantic;
        switch (this.consistency) {
            case TRANSACTIONAL:
                semantic = FlinkKafkaProducer.Semantic.EXACTLY_ONCE;
                break;
            case EVENTUAL:
                semantic = FlinkKafkaProducer.Semantic.AT_LEAST_ONCE;
                break;
            default:
                throw new IllegalArgumentException("Unsupported: " + this.consistency);
        }
        return new KafkaSinkFunction(this.topic, createSerializationSchema(), this.properties, semantic);
    }

    @VisibleForTesting
    KafkaLogSerializationSchema createSerializationSchema() {
        return new KafkaLogSerializationSchema(this.topic, this.primaryKeySerializer, this.valueSerializer, this.changelogMode);
    }
}
