package org.apache.streampipes.messaging.kafka;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.streampipes.commons.constants.Envs;
import org.apache.streampipes.messaging.EventProducer;
import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
import org.apache.streampipes.messaging.kafka.config.ProducerConfigFactory;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streampipes/messaging/kafka/SpKafkaProducer.class */
public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, Serializable {
    private static final String COLON = ":";
    private static final String SP_KAFKA_RETENTION_MS_DEFAULT = "600000";
    private String brokerUrl;
    private String topic;
    private Producer<String, byte[]> producer;
    private boolean connected;
    private static final Logger LOG = LoggerFactory.getLogger(SpKafkaProducer.class);

    public SpKafkaProducer() {
        this.connected = false;
    }

    public SpKafkaProducer(String str, String str2, List<KafkaConfigAppender> list) {
        this.connected = false;
        String[] split = str.split(COLON);
        KafkaTransportProtocol kafkaTransportProtocol = new KafkaTransportProtocol(split[0], Integer.parseInt(split[1]), str2);
        this.brokerUrl = str;
        this.topic = str2;
        this.producer = new KafkaProducer(makeProperties(kafkaTransportProtocol, list));
        this.connected = true;
    }

    public void publish(String str) {
        publish(str.getBytes());
    }

    public void publish(byte[] bArr) {
        if (this.connected) {
            this.producer.send(new ProducerRecord(this.topic, bArr));
        }
    }

    private Properties makeProperties(KafkaTransportProtocol kafkaTransportProtocol, List<KafkaConfigAppender> list) {
        return new ProducerConfigFactory(kafkaTransportProtocol).buildProperties(list);
    }

    public void connect(KafkaTransportProtocol kafkaTransportProtocol) {
        LOG.info("Kafka producer: Connecting to " + kafkaTransportProtocol.getTopicDefinition().getActualTopicName());
        this.brokerUrl = kafkaTransportProtocol.getBrokerHostname() + ":" + kafkaTransportProtocol.getKafkaPort();
        this.topic = kafkaTransportProtocol.getTopicDefinition().getActualTopicName();
        String str = kafkaTransportProtocol.getZookeeperHost() + ":" + kafkaTransportProtocol.getZookeeperPort();
        try {
            createKafaTopic(kafkaTransportProtocol);
        } catch (InterruptedException | ExecutionException e) {
            LOG.error("Could not create topic: " + this.topic + " on broker " + str);
        }
        this.producer = new KafkaProducer(makeProperties(kafkaTransportProtocol, Collections.emptyList()));
        this.connected = true;
        LOG.info("Successfully created Kafka producer for topic " + this.topic);
    }

    private void createKafaTopic(KafkaTransportProtocol kafkaTransportProtocol) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.brokerUrl);
        AdminClient create = KafkaAdminClient.create(properties);
        if (topicExists(create.listTopics())) {
            LOG.info("Topic " + this.topic + "already exists in the broker, skipping topic creation");
            return;
        }
        HashMap hashMap = new HashMap();
        hashMap.put("retention.ms", Envs.SP_KAFKA_RETENTION_MS.exists() ? Envs.SP_KAFKA_RETENTION_MS.getValue() : SP_KAFKA_RETENTION_MS_DEFAULT);
        NewTopic newTopic = new NewTopic(this.topic, 1, (short) 1);
        newTopic.configs(hashMap);
        ((KafkaFuture) create.createTopics(Collections.singleton(newTopic)).values().get(this.topic)).get();
        LOG.info("Successfully created Kafka topic " + this.topic);
    }

    public void disconnect() {
        LOG.info("Kafka producer: Disconnecting from " + this.topic);
        this.connected = false;
        this.producer.close();
    }

    public boolean isConnected() {
        return this.connected;
    }

    private boolean topicExists(ListTopicsResult listTopicsResult) {
        try {
            return ((Set) listTopicsResult.names().get()).stream().anyMatch(str -> {
                return str.equals(this.topic);
            });
        } catch (InterruptedException | ExecutionException e) {
            LOG.error("Could not fetch existing topics", e);
            return false;
        }
    }
}
