package org.apache.storm.sql.kafka;

import com.google.common.base.Preconditions;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.ByteBufferSerializer;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.spout.Scheme;
import org.apache.storm.sql.runtime.DataSourcesProvider;
import org.apache.storm.sql.runtime.FieldInfo;
import org.apache.storm.sql.runtime.IOutputSerializer;
import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
import org.apache.storm.sql.runtime.utils.SerdeUtils;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

/* loaded from: input_file:org/apache/storm/sql/kafka/KafkaDataSourcesProvider.class */
public class KafkaDataSourcesProvider implements DataSourcesProvider {
    private static final String CONFIG_KEY_PRODUCER = "producer";
    private static final String URI_PARAMS_BOOTSTRAP_SERVERS = "bootstrap-servers";

    /* loaded from: input_file:org/apache/storm/sql/kafka/KafkaDataSourcesProvider$KafkaStreamsDataSource.class */
    private static class KafkaStreamsDataSource implements ISqlStreamsDataSource {
        private final KafkaSpoutConfig<ByteBuffer, ByteBuffer> kafkaSpoutConfig;
        private final String bootstrapServers;
        private final String topic;
        private final Properties props;
        private final IOutputSerializer serializer;

        public KafkaStreamsDataSource(KafkaSpoutConfig<ByteBuffer, ByteBuffer> kafkaSpoutConfig, String str, String str2, Properties properties, IOutputSerializer iOutputSerializer) {
            this.kafkaSpoutConfig = kafkaSpoutConfig;
            this.bootstrapServers = str;
            this.topic = str2;
            this.props = properties;
            this.serializer = iOutputSerializer;
        }

        public IRichSpout getProducer() {
            return new KafkaSpout(this.kafkaSpoutConfig);
        }

        public IRichBolt getConsumer() {
            Preconditions.checkArgument(!this.props.isEmpty(), "Writable Kafka table " + this.topic + " must contain producer config");
            this.props.putAll((HashMap) this.props.get(KafkaDataSourcesProvider.CONFIG_KEY_PRODUCER));
            Preconditions.checkState(!this.props.containsKey("bootstrap.servers"), "Writable Kafka table " + this.topic + " must not contain \"bootstrap.servers\" config, set it in the kafka URL instead");
            Preconditions.checkState(!this.props.containsKey("value.serializer"), "Writable Kafka table " + this.topic + "must not contain value.serializer, it will be hardcoded to be " + ByteBufferSerializer.class);
            this.props.put("value.serializer", ByteBufferSerializer.class);
            this.props.put("bootstrap.servers", this.bootstrapServers);
            return new KafkaBolt().withTopicSelector(new DefaultTopicSelector(this.topic)).withProducerProperties(this.props).withTupleToKafkaMapper(new SqlKafkaMapper(this.serializer));
        }
    }

    /* loaded from: input_file:org/apache/storm/sql/kafka/KafkaDataSourcesProvider$SqlKafkaMapper.class */
    private static class SqlKafkaMapper implements TupleToKafkaMapper<Object, ByteBuffer> {
        private final IOutputSerializer serializer;

        private SqlKafkaMapper(IOutputSerializer iOutputSerializer) {
            this.serializer = iOutputSerializer;
        }

        public Object getKeyFromTuple(Tuple tuple) {
            return tuple.getValue(0);
        }

        /* renamed from: getMessageFromTuple, reason: merged with bridge method [inline-methods] */
        public ByteBuffer m0getMessageFromTuple(Tuple tuple) {
            return this.serializer.write((Values) tuple.getValue(1), (ByteBuffer) null);
        }
    }

    public String scheme() {
        return "kafka";
    }

    public ISqlStreamsDataSource constructStreams(URI uri, String str, String str2, Properties properties, List<FieldInfo> list) {
        ArrayList arrayList = new ArrayList();
        int i = -1;
        for (int i2 = 0; i2 < list.size(); i2++) {
            FieldInfo fieldInfo = list.get(i2);
            arrayList.add(fieldInfo.name());
            if (fieldInfo.isPrimary()) {
                i = i2;
            }
        }
        Preconditions.checkState(i != -1, "Kafka stream table must have a primary key");
        Scheme scheme = SerdeUtils.getScheme(str, properties, arrayList);
        String str3 = parseUriParams(uri.getQuery()).get(URI_PARAMS_BOOTSTRAP_SERVERS);
        Preconditions.checkNotNull(str3, "bootstrap-servers must be specified");
        String host = uri.getHost();
        return new KafkaStreamsDataSource(new KafkaSpoutConfig.Builder(str3, new String[]{host}).setProp("key.deserializer", ByteBufferDeserializer.class).setProp("value.deserializer", ByteBufferDeserializer.class).setProp("group.id", "storm-sql-kafka-" + UUID.randomUUID().toString()).setRecordTranslator(new RecordTranslatorSchemeAdapter(scheme)).build(), str3, host, properties, SerdeUtils.getSerializer(str2, properties, arrayList));
    }

    private static Map<String, String> parseUriParams(String str) {
        HashMap hashMap = new HashMap();
        if (str == null) {
            return hashMap;
        }
        for (String str2 : str.split("&")) {
            String[] split = str2.split("=", 2);
            if (split.length > 1) {
                hashMap.put(split[0], split[1]);
            }
        }
        return hashMap;
    }
}
