package org.apache.flink.streaming.connectors.kafka;

import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.util.SerializationSchema;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaSink.class */
public class KafkaSink<IN, OUT> extends RichSinkFunction<IN> {
    private static final long serialVersionUID = 1;
    private Producer<Integer, OUT> producer;
    private Properties props;
    private String topicId;
    private String brokerAddr;
    private boolean initDone = false;
    private SerializationSchema<IN, OUT> scheme;

    public KafkaSink(String str, String str2, SerializationSchema<IN, OUT> serializationSchema) {
        this.topicId = str;
        this.brokerAddr = str2;
        this.scheme = serializationSchema;
    }

    public void initialize() {
        this.props = new Properties();
        this.props.put("metadata.broker.list", this.brokerAddr);
        this.props.put("serializer.class", "kafka.serializer.StringEncoder");
        this.props.put("request.required.acks", "1");
        this.producer = new Producer<>(new ProducerConfig(this.props));
        this.initDone = true;
    }

    public void invoke(IN in) {
        if (!this.initDone) {
            initialize();
        }
        this.producer.send(new KeyedMessage(this.topicId, this.scheme.serialize(in)));
    }

    public void close() {
        this.producer.close();
    }
}
