/*
 * Decompiled with CFR 0.152.
 */
package org.apache.james.mailbox.kafka;

import java.util.Properties;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.james.mailbox.store.publisher.Publisher;
import org.apache.james.mailbox.store.publisher.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaPublisher
implements Publisher {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaPublisher.class);
    private Producer<String, byte[]> producer;
    private final int kafka_port;
    private final String kafka_ip;
    private boolean producerLaunched;

    public KafkaPublisher(String kafkaHostIpString, int kafka_port) {
        this.kafka_ip = kafkaHostIpString;
        this.kafka_port = kafka_port;
        this.producerLaunched = false;
    }

    @PostConstruct
    public void init() {
        if (!this.producerLaunched) {
            Properties props = new Properties();
            props.put("metadata.broker.list", this.kafka_ip + ":" + this.kafka_port);
            props.put("serializer.class", "kafka.serializer.DefaultEncoder");
            props.put("request.required.acks", "1");
            ProducerConfig config = new ProducerConfig(props);
            this.producer = new Producer(config);
            this.producerLaunched = true;
        } else {
            LOG.warn("Kafka producer was already instantiated");
        }
    }

    public void publish(Topic topic, byte[] message) {
        this.producer.send(new KeyedMessage(topic.getValue(), (Object)message));
    }

    @PreDestroy
    public void close() {
        this.producer.close();
    }
}

