package org.apache.streampipes.sinks.brokers.jvm.kafka;

import java.util.Arrays;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslPlainConfig;
import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslSSLConfig;
import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedPlainConfig;
import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedSSLConfig;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;

/* loaded from: input_file:org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.class */
public class KafkaPublisher implements EventSink<KafkaParameters> {
    private SpKafkaProducer producer;
    private JsonDataFormatDefinition dataFormatDefinition = new JsonDataFormatDefinition();

    public void onInvocation(KafkaParameters kafkaParameters, EventSinkRuntimeContext eventSinkRuntimeContext) throws SpRuntimeException {
        KafkaSecuritySaslSSLConfig kafkaSecurityUnauthenticatedSSLConfig;
        if (kafkaParameters.getAuthentication().equals(KafkaController.getSaslAccessKey())) {
            kafkaSecurityUnauthenticatedSSLConfig = kafkaParameters.isUseSSL() ? new KafkaSecuritySaslSSLConfig(kafkaParameters.getUsername(), kafkaParameters.getPassword()) : new KafkaSecuritySaslPlainConfig(kafkaParameters.getUsername(), kafkaParameters.getPassword());
        } else {
            kafkaSecurityUnauthenticatedSSLConfig = kafkaParameters.isUseSSL() ? new KafkaSecurityUnauthenticatedSSLConfig() : new KafkaSecurityUnauthenticatedPlainConfig();
        }
        this.producer = new SpKafkaProducer(kafkaParameters.getKafkaHost() + ":" + kafkaParameters.getKafkaPort(), kafkaParameters.getTopic(), Arrays.asList(kafkaSecurityUnauthenticatedSSLConfig));
    }

    public void onEvent(Event event) {
        try {
            this.producer.publish(this.dataFormatDefinition.fromMap(event.getRaw()));
        } catch (SpRuntimeException e) {
            e.printStackTrace();
        }
    }

    public void onDetach() throws SpRuntimeException {
        this.producer.disconnect();
    }
}
