package com.ibm.streamsx.topology.messaging.kafka;

import com.ibm.streams.operator.Tuple;
import com.ibm.streamsx.topology.TStream;
import com.ibm.streamsx.topology.TopologyElement;
import com.ibm.streamsx.topology.builder.BOperatorInvocation;
import com.ibm.streamsx.topology.builder.BOutputPort;
import com.ibm.streamsx.topology.function.Function;
import com.ibm.streamsx.topology.function.Supplier;
import com.ibm.streamsx.topology.logic.Value;
import com.ibm.streamsx.topology.spl.SPL;
import com.ibm.streamsx.topology.spl.SPLStream;
import com.ibm.streamsx.topology.tuple.Message;
import com.ibm.streamsx.topology.tuple.SimpleMessage;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/* loaded from: input_file:com/ibm/streamsx/topology/messaging/kafka/KafkaConsumer.class */
public class KafkaConsumer {
    private static final String PROP_FILE_PARAM = "etc/kafkaStreams/emptyConsumerProperties";
    private final TopologyElement te;
    private final Map<String, Object> config = new HashMap();
    private boolean addedFileDependency;

    void addPropertiesFile() {
        if (this.addedFileDependency) {
            return;
        }
        this.addedFileDependency = true;
        Util.addPropertiesFile(this.te, PROP_FILE_PARAM);
    }

    public KafkaConsumer(TopologyElement topologyElement, Map<String, Object> map) {
        this.te = topologyElement;
        this.config.putAll(map);
    }

    public Map<String, Object> getConfig() {
        return Collections.unmodifiableMap(this.config);
    }

    public TStream<Message> subscribe(Supplier<String> supplier) {
        return subscribe(new Value(1), supplier);
    }

    public TStream<Message> subscribe(Supplier<Integer> supplier, Supplier<String> supplier2) {
        if (supplier2 == null) {
            throw new IllegalArgumentException("topic");
        }
        if (supplier == null || (supplier.get() != null && supplier.get().intValue() <= 0)) {
            throw new IllegalArgumentException("threadsPerTopic");
        }
        HashMap hashMap = new HashMap();
        hashMap.put("topic", supplier2);
        if (!(supplier instanceof Value) || supplier.get().intValue() != 1) {
            hashMap.put("threadsPerTopic", supplier);
        }
        if (!this.config.isEmpty()) {
            hashMap.put("kafkaProperty", Util.toKafkaProperty(this.config));
        }
        hashMap.put("propertiesFile", PROP_FILE_PARAM);
        addPropertiesFile();
        SPLStream invokeSource = SPL.invokeSource(this.te, "com.ibm.streamsx.messaging.kafka::KafkaConsumer", hashMap, KafkaSchemas.KAFKA);
        SPL.tagOpAsJavaPrimitive(toOp(invokeSource), "com.ibm.streamsx.messaging.kafka::KafkaConsumer", "com.ibm.streamsx.messaging.kafka.KafkaSource");
        TStream<Message> messageStream = toMessageStream(invokeSource);
        messageStream.colocate(invokeSource);
        return messageStream.isolate2();
    }

    private BOperatorInvocation toOp(SPLStream sPLStream) {
        return ((BOutputPort) sPLStream.output()).operator();
    }

    private static TStream<Message> toMessageStream(SPLStream sPLStream) {
        return sPLStream.convert(new Function<Tuple, Message>() { // from class: com.ibm.streamsx.topology.messaging.kafka.KafkaConsumer.1
            private static final long serialVersionUID = 1;

            @Override // com.ibm.streamsx.topology.function.Function
            public Message apply(Tuple tuple) {
                return new SimpleMessage(tuple.getString("message"), fromSplValue(tuple.getString("key")), tuple.getString("topic"));
            }

            private String fromSplValue(String str) {
                if (str == null || str.isEmpty()) {
                    return null;
                }
                return str;
            }
        });
    }
}
