package org.apache.flink.table.store.kafka;

import java.util.Properties;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.log.LogOptions;
import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.store.sink.SinkRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/* loaded from: input_file:org/apache/flink/table/store/kafka/KafkaLogSinkProvider.class */
public class KafkaLogSinkProvider implements LogSinkProvider {
    private static final long serialVersionUID = 1;
    private final String topic;
    private final Properties properties;

    @Nullable
    private final SerializationSchema<RowData> primaryKeySerializer;
    private final SerializationSchema<RowData> valueSerializer;
    private final LogOptions.LogConsistency consistency;
    private final LogOptions.LogChangelogMode changelogMode;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.table.store.kafka.KafkaLogSinkProvider$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/table/store/kafka/KafkaLogSinkProvider$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$table$store$log$LogOptions$LogConsistency = new int[LogOptions.LogConsistency.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$table$store$log$LogOptions$LogConsistency[LogOptions.LogConsistency.TRANSACTIONAL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$table$store$log$LogOptions$LogConsistency[LogOptions.LogConsistency.EVENTUAL.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public KafkaLogSinkProvider(String str, Properties properties, @Nullable SerializationSchema<RowData> serializationSchema, SerializationSchema<RowData> serializationSchema2, LogOptions.LogConsistency logConsistency, LogOptions.LogChangelogMode logChangelogMode) {
        this.topic = str;
        this.properties = properties;
        this.primaryKeySerializer = serializationSchema;
        this.valueSerializer = serializationSchema2;
        this.consistency = logConsistency;
        this.changelogMode = logChangelogMode;
    }

    /* renamed from: createSink, reason: merged with bridge method [inline-methods] */
    public KafkaSink<SinkRecord> m2createSink() {
        KafkaSinkBuilder builder = KafkaSink.builder();
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$table$store$log$LogOptions$LogConsistency[this.consistency.ordinal()]) {
            case 1:
                builder.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("log-store-" + this.topic);
                break;
            case 2:
                if (this.primaryKeySerializer != null) {
                    builder.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE);
                    break;
                } else {
                    throw new IllegalArgumentException("Can not use EVENTUAL consistency mode for non-pk table.");
                }
        }
        return builder.setBootstrapServers(this.properties.get("bootstrap.servers").toString()).setKafkaProducerConfig(this.properties).setRecordSerializer(createSerializationSchema()).build();
    }

    public Consumer<RecordMetadata> createMetadataConsumer(LogSinkProvider.WriteCallback writeCallback) {
        return recordMetadata -> {
            writeCallback.onCompletion(recordMetadata.partition(), recordMetadata.offset());
        };
    }

    @VisibleForTesting
    KafkaLogSerializationSchema createSerializationSchema() {
        return new KafkaLogSerializationSchema(this.topic, this.primaryKeySerializer, this.valueSerializer, this.changelogMode);
    }
}
