package org.apache.brooklyn.entity.messaging.kafka;

import com.google.common.base.Optional;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import java.security.InvalidParameterException;
import java.util.Properties;
import org.apache.brooklyn.core.entity.EntityPredicates;
import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:org/apache/brooklyn/entity/messaging/kafka/KafkaSupport.class */
public class KafkaSupport {
    private final KafkaCluster cluster;

    public KafkaSupport(KafkaCluster kafkaCluster) {
        this.cluster = kafkaCluster;
    }

    public void sendMessage(String str, String str2) {
        Optional tryFind = Iterables.tryFind(this.cluster.getCluster().getChildren(), Predicates.and(Predicates.instanceOf(KafkaBroker.class), EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));
        if (!tryFind.isPresent()) {
            throw new InvalidParameterException("No kafka broker node found");
        }
        KafkaBroker kafkaBroker = (KafkaBroker) tryFind.get();
        Properties properties = new Properties();
        properties.put("metadata.broker.list", String.format("%s:%d", kafkaBroker.getAttribute(KafkaBroker.HOSTNAME), kafkaBroker.getKafkaPort()));
        properties.put("bootstrap.servers", String.format("%s:%d", kafkaBroker.getAttribute(KafkaBroker.HOSTNAME), kafkaBroker.getKafkaPort()));
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        this.cluster.getZooKeeper().createTopic(str);
        kafkaProducer.send(new ProducerRecord(str, str2));
        kafkaProducer.close();
    }

    public String getMessage(String str) {
        ZooKeeperNode zooKeeper = this.cluster.getZooKeeper();
        Optional tryFind = Iterables.tryFind(this.cluster.getCluster().getChildren(), Predicates.and(Predicates.instanceOf(KafkaBroker.class), EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));
        if (!tryFind.isPresent()) {
            throw new InvalidParameterException("No kafka broker node found");
        }
        KafkaBroker kafkaBroker = (KafkaBroker) tryFind.get();
        Properties properties = new Properties();
        properties.put("bootstrap.servers", String.format("%s:%d", kafkaBroker.getAttribute(KafkaBroker.HOSTNAME), kafkaBroker.getKafkaPort()));
        properties.put("zookeeper.connect", String.format(zooKeeper.getHostname(), zooKeeper.getZookeeperPort()));
        properties.put("group.id", "brooklyn");
        properties.put("partition.assignment.strategy", "RoundRobin");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        new KafkaConsumer(properties).subscribe(new String[]{str});
        return "TEST_MESSAGE";
    }
}
