/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.connect.kafka;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.pulsar.common.util.KeyValue;
import org.apache.pulsar.connect.core.Sink;
import org.apache.pulsar.connect.kafka.KafkaSinkConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSink<K, V>
implements Sink<KeyValue<K, V>> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
    private Producer<K, V> producer;
    private Properties props = new Properties();
    private KafkaSinkConfig kafkaSinkConfig;

    public CompletableFuture<Void> write(KeyValue<K, V> message) {
        ProducerRecord record = new ProducerRecord(this.kafkaSinkConfig.getTopic(), message.getKey(), message.getValue());
        LOG.debug("Record sending to kafka, record={}.", (Object)record);
        Future f = this.producer.send(record);
        return CompletableFuture.supplyAsync(() -> {
            try {
                f.get();
                return null;
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
    }

    public void close() throws IOException {
        this.producer.close();
        LOG.info("Kafka sink stopped.");
    }

    public void open(Map<String, Object> config) throws Exception {
        this.kafkaSinkConfig = KafkaSinkConfig.load(config);
        if (this.kafkaSinkConfig.getTopic() == null || this.kafkaSinkConfig.getBootstrapServers() == null || this.kafkaSinkConfig.getAcks() == null || this.kafkaSinkConfig.getBatchSize() == 0L || this.kafkaSinkConfig.getMaxRequestSize() == 0L) {
            throw new IllegalArgumentException("Required property not set.");
        }
        this.props.put("bootstrap.servers", this.kafkaSinkConfig.getBootstrapServers());
        this.props.put("acks", this.kafkaSinkConfig.getAcks());
        this.props.put("batch.size", this.kafkaSinkConfig.getBatchSize().toString());
        this.props.put("max.request.size", this.kafkaSinkConfig.getMaxRequestSize().toString());
        this.props.put("key.serializer", this.kafkaSinkConfig.getKeySerializerClass());
        this.props.put("value.serializer", this.kafkaSinkConfig.getValueSerializerClass());
        this.producer = new KafkaProducer(this.props);
        LOG.info("Kafka sink started.");
    }
}

