package com.ibm.streamsx.topology.messaging.mqtt;

import com.ibm.streams.operator.OutputTuple;
import com.ibm.streams.operator.Tuple;
import com.ibm.streamsx.topology.TSink;
import com.ibm.streamsx.topology.TStream;
import com.ibm.streamsx.topology.TopologyElement;
import com.ibm.streamsx.topology.builder.BOperatorInvocation;
import com.ibm.streamsx.topology.builder.BOutputPort;
import com.ibm.streamsx.topology.function.BiFunction;
import com.ibm.streamsx.topology.function.Function;
import com.ibm.streamsx.topology.function.Supplier;
import com.ibm.streamsx.topology.spl.SPL;
import com.ibm.streamsx.topology.spl.SPLStream;
import com.ibm.streamsx.topology.spl.SPLStreams;
import com.ibm.streamsx.topology.tuple.Message;
import com.ibm.streamsx.topology.tuple.SimpleMessage;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/* loaded from: input_file:com/ibm/streamsx/topology/messaging/mqtt/MqttStreams.class */
public class MqttStreams {
    private final TopologyElement te;
    private final Map<String, Object> config = new HashMap();
    private int opCnt;

    public MqttStreams(TopologyElement topologyElement, Map<String, Object> map) {
        this.te = topologyElement;
        this.config.putAll(map);
    }

    public Map<String, Object> getConfig() {
        return Collections.unmodifiableMap(this.config);
    }

    public TSink publish(TStream<? extends Message> tStream) {
        return publish(tStream, null);
    }

    public TSink publish(TStream<? extends Message> tStream, Supplier<String> supplier) {
        String str;
        SPLStream convertStream = SPLStreams.convertStream(tStream.lowLatency2(), cvtMsgFunc(supplier), MqttSchemas.MQTT);
        HashMap hashMap = new HashMap();
        hashMap.put("reconnectionBound", -1);
        hashMap.put("qos", 0);
        hashMap.putAll(Util.configToSplParams(this.config));
        hashMap.remove("messageQueueSize");
        if (supplier == null) {
            hashMap.put("topicAttributeName", "topic");
        } else {
            hashMap.put("topic", supplier);
        }
        hashMap.put("dataAttributeName", "message");
        int i = this.opCnt + 1;
        this.opCnt = i;
        if (i > 1 && (str = (String) hashMap.get("clientID")) != null && str.length() > 0) {
            hashMap.put("clientID", this.opCnt + "-" + str);
        }
        TSink invokeSink = SPL.invokeSink("com.ibm.streamsx.messaging.mqtt::MQTTSink", convertStream, hashMap);
        SPL.tagOpAsJavaPrimitive(invokeSink.operator(), "com.ibm.streamsx.messaging.mqtt::MQTTSink", "com.ibm.streamsx.messaging.kafka.MqttSinkOperator");
        return invokeSink;
    }

    private static BiFunction<Message, OutputTuple, OutputTuple> cvtMsgFunc(final Supplier<String> supplier) {
        return new BiFunction<Message, OutputTuple, OutputTuple>() { // from class: com.ibm.streamsx.topology.messaging.mqtt.MqttStreams.1
            private static final long serialVersionUID = 1;

            @Override // com.ibm.streamsx.topology.function.BiFunction
            public OutputTuple apply(Message message, OutputTuple outputTuple) {
                outputTuple.setString("message", toSplValue(message.getMessage()));
                if (Supplier.this == null) {
                    outputTuple.setString("topic", toSplValue(message.getTopic()));
                }
                return outputTuple;
            }

            private String toSplValue(String str) {
                return str == null ? "" : str;
            }
        };
    }

    public TStream<Message> subscribe(Supplier<String> supplier) {
        String str;
        if (supplier == null) {
            throw new IllegalArgumentException("topic");
        }
        HashMap hashMap = new HashMap();
        hashMap.put("reconnectionBound", -1);
        hashMap.put("qos", 0);
        hashMap.putAll(Util.configToSplParams(this.config));
        hashMap.remove("retain");
        hashMap.put("topics", supplier);
        hashMap.put("topicOutAttrName", "topic");
        hashMap.put("dataAttributeName", "message");
        int i = this.opCnt + 1;
        this.opCnt = i;
        if (i > 1 && (str = (String) hashMap.get("clientID")) != null && str.length() > 0) {
            hashMap.put("clientID", this.opCnt + "-" + str);
        }
        SPLStream invokeSource = SPL.invokeSource(this.te, "com.ibm.streamsx.messaging.mqtt::MQTTSource", hashMap, MqttSchemas.MQTT);
        SPL.tagOpAsJavaPrimitive(toOp(invokeSource), "com.ibm.streamsx.messaging.mqtt::MQTTSource", "com.ibm.streamsx.messaging.mqtt.MqttSourceOperator");
        TStream<Message> messageStream = toMessageStream(invokeSource);
        messageStream.colocate(invokeSource);
        return messageStream;
    }

    private BOperatorInvocation toOp(SPLStream sPLStream) {
        return ((BOutputPort) sPLStream.output()).operator();
    }

    private static TStream<Message> toMessageStream(SPLStream sPLStream) {
        return sPLStream.convert(new Function<Tuple, Message>() { // from class: com.ibm.streamsx.topology.messaging.mqtt.MqttStreams.2
            private static final long serialVersionUID = 1;

            @Override // com.ibm.streamsx.topology.function.Function
            public Message apply(Tuple tuple) {
                return new SimpleMessage(tuple.getString("message"), null, tuple.getString("topic"));
            }
        });
    }
}
