package com.ibm.streamsx.topology.messaging.kafka;

import com.ibm.streams.operator.OutputTuple;
import com.ibm.streamsx.topology.TSink;
import com.ibm.streamsx.topology.TStream;
import com.ibm.streamsx.topology.TopologyElement;
import com.ibm.streamsx.topology.function.BiFunction;
import com.ibm.streamsx.topology.function.Supplier;
import com.ibm.streamsx.topology.spl.SPL;
import com.ibm.streamsx.topology.spl.SPLStream;
import com.ibm.streamsx.topology.spl.SPLStreams;
import com.ibm.streamsx.topology.tuple.Message;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/* loaded from: input_file:com/ibm/streamsx/topology/messaging/kafka/KafkaProducer.class */
public class KafkaProducer {
    private static final String PROP_FILE_PARAM = "etc/kafkaStreams/emptyProducerProperties";
    private final TopologyElement te;
    private final Map<String, Object> config = new HashMap();
    private boolean addedFileDependency;

    void addPropertiesFile() {
        if (this.addedFileDependency) {
            return;
        }
        this.addedFileDependency = true;
        Util.addPropertiesFile(this.te, PROP_FILE_PARAM);
    }

    public KafkaProducer(TopologyElement topologyElement, Map<String, Object> map) {
        this.te = topologyElement;
        this.config.putAll(map);
    }

    public Map<String, Object> getConfig() {
        return Collections.unmodifiableMap(this.config);
    }

    public TSink publish(TStream<? extends Message> tStream) {
        return publish(tStream, null);
    }

    public TSink publish(TStream<? extends Message> tStream, Supplier<String> supplier) {
        SPLStream convertStream = SPLStreams.convertStream(tStream.lowLatency2(), cvtMsgFunc(supplier), KafkaSchemas.KAFKA);
        HashMap hashMap = new HashMap();
        if (!this.config.isEmpty()) {
            hashMap.put("kafkaProperty", Util.toKafkaProperty(this.config));
        }
        if (supplier == null) {
            hashMap.put("topicAttribute", "topic");
        } else {
            hashMap.put("topic", supplier);
        }
        hashMap.put("propertiesFile", PROP_FILE_PARAM);
        addPropertiesFile();
        TSink invokeSink = SPL.invokeSink("com.ibm.streamsx.messaging.kafka::KafkaProducer", convertStream, hashMap);
        SPL.tagOpAsJavaPrimitive(invokeSink.operator(), "com.ibm.streamsx.messaging.kafka::KafkaProducer", "com.ibm.streamsx.messaging.kafka.KafkaSink");
        return invokeSink;
    }

    private static BiFunction<Message, OutputTuple, OutputTuple> cvtMsgFunc(final Supplier<String> supplier) {
        return new BiFunction<Message, OutputTuple, OutputTuple>() { // from class: com.ibm.streamsx.topology.messaging.kafka.KafkaProducer.1
            private static final long serialVersionUID = 1;

            @Override // com.ibm.streamsx.topology.function.BiFunction
            public OutputTuple apply(Message message, OutputTuple outputTuple) {
                outputTuple.setString("key", toSplValue(message.getKey()));
                outputTuple.setString("message", toSplValue(message.getMessage()));
                if (Supplier.this == null || Supplier.this.get() == null) {
                    outputTuple.setString("topic", toSplValue(message.getTopic()));
                } else {
                    outputTuple.setString("topic", ((String) Supplier.this.get()).toString());
                }
                return outputTuple;
            }

            private String toSplValue(String str) {
                return str == null ? "" : str;
            }
        };
    }
}
