package org.apache.flink.streaming.connectors.kafka.api;

import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.DefaultEncoder;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.api.config.PartitionerWrapper;
import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.NetUtils;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/api/KafkaSink.class */
public class KafkaSink<IN> extends RichSinkFunction<IN> {
    private static final long serialVersionUID = 1;
    private Producer<IN, byte[]> producer;
    private Properties props;
    private String topicId;
    private String zookeeperAddress;
    private SerializationSchema<IN, byte[]> schema;
    private SerializableKafkaPartitioner partitioner;
    private Class<? extends SerializableKafkaPartitioner> partitionerClass;

    public KafkaSink(String str, String str2, SerializationSchema<IN, byte[]> serializationSchema) {
        this(str, str2, serializationSchema, (Class<? extends SerializableKafkaPartitioner>) null);
    }

    public KafkaSink(String str, String str2, SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner serializableKafkaPartitioner) {
        this.partitionerClass = null;
        NetUtils.ensureCorrectHostnamePort(str);
        Preconditions.checkNotNull(str2, "TopicID not set");
        ClosureCleaner.ensureSerializable(serializableKafkaPartitioner);
        this.zookeeperAddress = str;
        this.topicId = str2;
        this.schema = serializationSchema;
        this.partitioner = serializableKafkaPartitioner;
    }

    public KafkaSink(String str, String str2, SerializationSchema<IN, byte[]> serializationSchema, Class<? extends SerializableKafkaPartitioner> cls) {
        this.partitionerClass = null;
        NetUtils.ensureCorrectHostnamePort(str);
        Preconditions.checkNotNull(str2, "TopicID not set");
        ClosureCleaner.ensureSerializable(cls);
        this.zookeeperAddress = str;
        this.topicId = str2;
        this.schema = serializationSchema;
        this.partitionerClass = cls;
    }

    public void open(Configuration configuration) {
        String leaderBrokerAddressForTopic = new KafkaTopicUtils(this.zookeeperAddress).getLeaderBrokerAddressForTopic(this.topicId);
        this.props = new Properties();
        this.props.put("metadata.broker.list", leaderBrokerAddressForTopic);
        this.props.put("request.required.acks", "1");
        this.props.put("serializer.class", DefaultEncoder.class.getCanonicalName());
        this.props.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
        if (this.partitioner != null) {
            this.props.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
            this.props.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, this.partitioner);
        }
        if (this.partitionerClass != null) {
            this.props.put("partitioner.class", this.partitionerClass);
        }
        try {
            this.producer = new Producer<>(new ProducerConfig(this.props));
        } catch (NullPointerException e) {
            throw new RuntimeException("Cannot connect to Kafka broker " + leaderBrokerAddressForTopic, e);
        }
    }

    public void invoke(IN in) {
        this.producer.send(new KeyedMessage(this.topicId, (Object) null, in, (byte[]) this.schema.serialize(in)));
    }

    public void close() {
        if (this.producer != null) {
            this.producer.close();
        }
    }
}
