package org.apache.flink.streaming.connectors.kafka.api;

import java.util.Map;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.DefaultEncoder;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.api.config.PartitionerWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/api/KafkaSink.class */
public class KafkaSink<IN> extends RichSinkFunction<IN> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
    private Producer<IN, byte[]> producer;
    private Properties userDefinedProperties;
    private String topicId;
    private String brokerList;
    private SerializationSchema<IN, byte[]> schema;
    private SerializableKafkaPartitioner partitioner;
    private Class<? extends SerializableKafkaPartitioner> partitionerClass;

    public KafkaSink(String str, String str2, SerializationSchema<IN, byte[]> serializationSchema) {
        this(str, str2, new Properties(), serializationSchema);
    }

    public KafkaSink(String str, String str2, Properties properties, SerializationSchema<IN, byte[]> serializationSchema) {
        this.partitionerClass = null;
        for (String str3 : str.split(",")) {
            NetUtils.ensureCorrectHostnamePort(str3);
        }
        Preconditions.checkNotNull(str2, "TopicID not set");
        this.brokerList = str;
        this.topicId = str2;
        this.schema = serializationSchema;
        this.partitionerClass = null;
        this.userDefinedProperties = properties;
    }

    public KafkaSink(String str, String str2, SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner serializableKafkaPartitioner) {
        this(str, str2, serializationSchema);
        ClosureCleaner.ensureSerializable(serializableKafkaPartitioner);
        this.partitioner = serializableKafkaPartitioner;
    }

    public KafkaSink(String str, String str2, SerializationSchema<IN, byte[]> serializationSchema, Class<? extends SerializableKafkaPartitioner> cls) {
        this(str, str2, serializationSchema);
        this.partitionerClass = cls;
    }

    public void open(Configuration configuration) {
        Properties properties = new Properties();
        properties.put("metadata.broker.list", this.brokerList);
        properties.put("request.required.acks", "-1");
        properties.put("message.send.max.retries", "10");
        properties.put("serializer.class", DefaultEncoder.class.getCanonicalName());
        properties.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
        for (Map.Entry entry : this.userDefinedProperties.entrySet()) {
            properties.put(entry.getKey(), entry.getValue());
        }
        if (this.partitioner != null) {
            properties.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
            properties.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, this.partitioner);
        }
        if (this.partitionerClass != null) {
            properties.put("partitioner.class", this.partitionerClass);
        }
        try {
            this.producer = new Producer<>(new ProducerConfig(properties));
        } catch (NullPointerException e) {
            throw new RuntimeException("Cannot connect to Kafka broker " + this.brokerList, e);
        }
    }

    public void invoke(IN in) {
        this.producer.send(new KeyedMessage(this.topicId, (Object) null, in, (byte[]) this.schema.serialize(in)));
    }

    public void close() {
        if (this.producer != null) {
            this.producer.close();
        }
    }
}
