package brooklyn.entity.messaging.kafka;

import brooklyn.entity.basic.Attributes;
import brooklyn.entity.zookeeper.ZooKeeperNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaMessageStream;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.message.Message;
import kafka.producer.ProducerConfig;
import org.testng.Assert;

/* loaded from: input_file:brooklyn/entity/messaging/kafka/KafkaSupport.class */
public class KafkaSupport {
    private final KafkaCluster cluster;

    public KafkaSupport(KafkaCluster kafkaCluster) {
        this.cluster = kafkaCluster;
    }

    public void sendMessage(String str, String str2) {
        ZooKeeperNode zooKeeper = this.cluster.getZooKeeper();
        Properties properties = new Properties();
        properties.put("zk.connect", String.format("%s:%d", zooKeeper.getAttribute(Attributes.HOSTNAME), zooKeeper.getZookeeperPort()));
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        Producer producer = new Producer(new ProducerConfig(properties));
        producer.send(new ProducerData(str, str2));
        producer.close();
    }

    public String getMessage(String str) {
        ZooKeeperNode zooKeeper = this.cluster.getZooKeeper();
        Properties properties = new Properties();
        properties.put("zk.connect", String.format("%s:%d", zooKeeper.getAttribute(Attributes.HOSTNAME), zooKeeper.getZookeeperPort()));
        properties.put("zk.connectiontimeout.ms", "120000");
        properties.put("groupid", "brooklyn");
        Message message = (Message) ((KafkaMessageStream) Iterables.getOnlyElement((List) Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)).createMessageStreams(ImmutableMap.of(str, 1)).get(str))).iterator().next();
        Assert.assertTrue(message.isValid());
        ByteBuffer payload = message.payload();
        byte[] bArr = new byte[payload.remaining()];
        payload.get(bArr);
        return new String(bArr);
    }
}
